Stream Processing RDBMS
Check java version
$ java -version
java version "1.7.0_67"
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)
[cloudera@quickstart ~]$
Download confluent version of Kafka in the ~/Downloads folder.
Untar the binary
$ tar xf confluent-oss-4.1.0-2.11.tar.gz
Check the version of confluent.
$ bin/confluent version
Confluent Open Source: 4.1.0
Start the broker
$ cd confluent-4.1.0/
$ bin/kafka-server-start etc/kafka/server.properties
Open another terminal and create a topic, called demo.
$ cd Downloads/confluent-4.1.0/
$ bin/kafka-topics --zookeeper localhost:2181 --topic demo --create --replication-factor 1 --partitions 1
Created topic "demo".
Verify that the topic just created shows up in the list of registered topics.
$ bin/kafka-topics --zookeeper localhost:2181 --list
__confluent.support.metrics
demo
Describe the topic in Kafka
$ bin/kafka-topics --zookeeper localhost:2181 --topic demo --describe
Topic:demo PartitionCount:1 ReplicationFactor:1 Configs:
Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Open a new terminal and start a console consumer. This consumer terminal wait to receive message from the demo topic. Next you will create a producer and send message to the topic.
$ cd ~/Downloads/confluent-4.1.0/
$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic demo
Open a new terminal a start a console producer. Once you start the producer, it wait for the user input. You can type in a few messages. The messages should show up in the consumer terminal.
$ cd ~/Downloads/confluent-4.1.0/
$ bin/kafka-console-producer --broker-list localhost:9092 --topic demo
>message 1
>message 2
>message 3
>message 4
So, the Kafka test has been successful.
Kafka JDBC Connector
Download the mysql jdbc driver from this link and save in ~/Downloads/confluent-4.1.0/share/java directory.
Start the schema registry in a new terminal.
$ cd ~/Downloads/confluent-4.1.0/
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
Create a simple table in mysql with an auto incremental field.
mysql> CREATE TABLE user (
id int NOT NULL AUTO_INCREMENT,
name varchar(50),
PRIMARY KEY (id)
);
Create a new JDBC configuration file ~/Downloads/confluent-4.1.0/etc/kafka-connect-jdbc/source-mysql.properties with the following content
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/retail_db?user=root&password=cloudera
connection.user=root
connection.password=cloudera
mode=incrementing
#mode=timestamp+incrementing
table.whitelist=user
#timestamp.column.name=last_modified_date
incrementing.column.name=id
topic.prefix=mysql-
Start the JDBC connect service.
$ bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/source-mysql.properties
Run the spark streaming application
$ spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class main.scala.StructuredKafkaStreamAvro \
--packages org.apache.avro:avro:1.8.0,com.twitter:bijection-avro_2.11:0.9.6,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 \
SparkKafkaScala-1.0-SNAPSHOT.jar
Start another terminal with avro console reader to read form mysql-user topic.
$ bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic mysql-user
Now, insert a few records in myql table - user. These updates should show up in console sink of spark streaming application as well as in the above avro console.