Stream Processing RDBMS

Check java version

$ java -version
java version "1.7.0_67"
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)
[cloudera@quickstart ~]$ 

Download confluent version of Kafka in the ~/Downloads folder.

Untar the binary

$ tar xf confluent-oss-4.1.0-2.11.tar.gz

Check the version of confluent.

$ bin/confluent version
Confluent Open Source: 4.1.0

Start the broker

$ cd confluent-4.1.0/
$ bin/kafka-server-start etc/kafka/server.properties

Open another terminal and create a topic, called demo.

$ cd Downloads/confluent-4.1.0/
$ bin/kafka-topics --zookeeper localhost:2181 --topic demo --create --replication-factor 1 --partitions 1
Created topic "demo".

Verify that the topic just created shows up in the list of registered topics.

$ bin/kafka-topics --zookeeper localhost:2181 --list
__confluent.support.metrics
demo

Describe the topic in Kafka

$ bin/kafka-topics --zookeeper localhost:2181 --topic demo --describe

Topic:demo PartitionCount:1 ReplicationFactor:1 Configs:

Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Open a new terminal and start a console consumer. This consumer terminal wait to receive message from the demo topic. Next you will create a producer and send message to the topic.

$ cd ~/Downloads/confluent-4.1.0/
$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic demo

Open a new terminal a start a console producer. Once you start the producer, it wait for the user input. You can type in a few messages. The messages should show up in the consumer terminal.

$ cd ~/Downloads/confluent-4.1.0/
$ bin/kafka-console-producer --broker-list localhost:9092 --topic demo
>message 1
>message 2
>message 3
>message 4

So, the Kafka test has been successful.

Kafka JDBC Connector

Download the mysql jdbc driver from this link and save in ~/Downloads/confluent-4.1.0/share/java directory.

Start the schema registry in a new terminal.

$ cd ~/Downloads/confluent-4.1.0/
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties

Create a simple table in mysql with an auto incremental field.

mysql> CREATE TABLE user (
    id int NOT NULL AUTO_INCREMENT, 
    name varchar(50), 
    PRIMARY KEY (id)
);

Create a new JDBC configuration file ~/Downloads/confluent-4.1.0/etc/kafka-connect-jdbc/source-mysql.properties with the following content

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/retail_db?user=root&password=cloudera
connection.user=root
connection.password=cloudera
mode=incrementing
#mode=timestamp+incrementing
table.whitelist=user
#timestamp.column.name=last_modified_date
incrementing.column.name=id
topic.prefix=mysql-

Start the JDBC connect service.

$ bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/source-mysql.properties

Run the spark streaming application

$ spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class main.scala.StructuredKafkaStreamAvro \
--packages org.apache.avro:avro:1.8.0,com.twitter:bijection-avro_2.11:0.9.6,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 \
SparkKafkaScala-1.0-SNAPSHOT.jar

Start another terminal with avro console reader to read form mysql-user topic.

$ bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic mysql-user

Now, insert a few records in myql table - user. These updates should show up in console sink of spark streaming application as well as in the above avro console.