Stream Processing RDBMS

Check java version

$ java -version

java version "1.7.0_67"

Java(TM) SE Runtime Environment (build 1.7.0_67-b01)

Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)

[cloudera@quickstart ~]$

Download confluent version of Kafka in the ~/Downloads folder.

Untar the binary

$ tar xf confluent-oss-4.1.0-2.11.tar.gz

Check the version of confluent.

$ bin/confluent version

Confluent Open Source: 4.1.0

Start the broker

$ cd confluent-4.1.0/

$ bin/kafka-server-start etc/kafka/server.properties

Open another terminal and create a topic, called demo.

$ cd Downloads/confluent-4.1.0/

$ bin/kafka-topics --zookeeper localhost:2181 --topic demo --create --replication-factor 1 --partitions 1

Created topic "demo".

Verify that the topic just created shows up in the list of registered topics.

$ bin/kafka-topics --zookeeper localhost:2181 --list

__confluent.support.metrics

demo

Describe the topic in Kafka

$ bin/kafka-topics --zookeeper localhost:2181 --topic demo --describe

Topic:demo PartitionCount:1 ReplicationFactor:1 Configs:

Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Open a new terminal and start a console consumer. This consumer terminal wait to receive message from the demo topic. Next you will create a producer and send message to the topic.

$ cd ~/Downloads/confluent-4.1.0/

$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic demo

Open a new terminal a start a console producer. Once you start the producer, it wait for the user input. You can type in a few messages. The messages should show up in the consumer terminal.

$ cd ~/Downloads/confluent-4.1.0/

$ bin/kafka-console-producer --broker-list localhost:9092 --topic demo

>message 1

>message 2

>message 3

>message 4

So, the Kafka test has been successful.

Kafka JDBC Connector

Download the mysql jdbc driver from this link and save in ~/Downloads/confluent-4.1.0/share/java directory.

Start the schema registry in a new terminal.

$ cd ~/Downloads/confluent-4.1.0/

$ bin/schema-registry-start etc/schema-registry/schema-registry.properties

Create a simple table in mysql with an auto incremental field.

mysql> CREATE TABLE user (

id int NOT NULL AUTO_INCREMENT,

name varchar(50),

PRIMARY KEY (id)

);

Create a new JDBC configuration file ~/Downloads/confluent-4.1.0/etc/kafka-connect-jdbc/source-mysql.properties with the following content

name=test-source-mysql-jdbc-autoincrement

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=1

connection.url=jdbc:mysql://127.0.0.1:3306/retail_db?user=root&password=cloudera

connection.user=root

connection.password=cloudera

mode=incrementing

#mode=timestamp+incrementing

table.whitelist=user

#timestamp.column.name=last_modified_date

incrementing.column.name=id

topic.prefix=mysql-

Start the JDBC connect service.

$ bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/source-mysql.properties

Run the spark streaming application

$ spark-2.1.1-bin-hadoop2.7/bin/spark-submit \

--class main.scala.StructuredKafkaStreamAvro \

--packages org.apache.avro:avro:1.8.0,com.twitter:bijection-avro_2.11:0.9.6,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 \

SparkKafkaScala-1.0-SNAPSHOT.jar

Start another terminal with avro console reader to read form mysql-user topic.

$ bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic mysql-user

Now, insert a few records in myql table - user. These updates should show up in console sink of spark streaming application as well as in the above avro console.