Streaming RDBMS Tables

Download Kafka

$ wget http://mirror.fibergrid.in/apache/kafka/0.10.0.1/kafka_2.10-0.10.0.1.tgz
$ tar -zxf kafka_2.10-0.10.0.1.tgz 
$ mv kafka_2.10-0.10.0.1 /usr/lib/

Set KAFKA_HOME for all users.

$ sudo vi /etc/profile

$ export KAFKA_HOME=/usr/lib/kafka_2.10-0.10.0.1

$ source /etc/profile

$ cd $KAFKA_HOME

Create KAFKA log directories. 3 log directories for 3 brokers set up.

$ sudo mkdir -p /var/log/kafka
$ sudo chmod 777 /var/log/kafka
$ mkdir /var/log/kafka/logs-0
$ mkdir /var/log/kafka/logs-1
$ mkdir /var/log/kafka/logs-2

Directory structure

/var/log/kafka

├── logs-0

├── logs-1

└── logs-2

Assuming, you already have Zookeeper configured on your server. If zookeeper server is not running, start it.

$ sudo service zookeeper-server start

Copy existing server.properties and create 2 more. We are going to set up 3 brokers on a single machine.

$ cp config/server.properties config/server-1.properties

$ cp config/server.properties config/server-2.properties 

$ mv config/server.properties config/server-3.properties

Start server

$ env JMX_PORT=9999 bin/kafka-server-start.sh config/server-1.properties &
$ env JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &
$ env JMX_PORT=9997 bin/kafka-server-start.sh config/server-3.properties &

Verify processes. Jps commands should display 3 Kafka processes among other things

$ sudo jps -l 

16259 org.apache.zookeeper.server.quorum.QuorumPeerMain

3500 kafka.Kafka

3218 kafka.Kafka

2935 kafka.Kafka

...

Create topic

$ bin/kafka-topics.sh --create --zookeeper server01:2181/kafka --replication-factor 2 --partitions 1 --topic stockprice

View existing topics

$ bin/kafka-topics.sh --list --zookeeper server01:2181/kafka

Test your KAFKA by sending some test message on the stockprice topic

$ bin/kafka-console-producer.sh --broker-list server01:9092,server01:9093,server01:9094 --topic stockprice

Verify the sent message by starting a test consumer

$ bin/kafka-console-consumer.sh --zookeeper server01:2181/kafka --topic stockprice --from-beginning

Now type in some message in the consumer terminal and press enter. You should find the message in the consumer terminal.

Describe the stockprice topic

$ bin/kafka-topics.sh --describe --zookeeper server01:2181/kafka --topic stockprice

Find consumer groups

$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper server01:2181/kafka --list

Check consumer position for each topic for a consumer group

$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper server01:2181/kafka --describe --group <consumer group id>

Find offset range for a topic

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server01:9092,server01:9093,server01:9094 --topic stockprice --time -1

If you want to kill the Kafka server process, run the following commands

$ sudo ps -ef | grep server.properties
$ kill -9 <pid>

OR use stop-server.sh

$ bin/kafka-server-stop.sh

Processing Streaming Data Using Spark Streaming

$ /usr/lib/spark/bin/run-example streaming.KafkaWordCount server02:2181/kafka spark-stream-consumer stockprice 1

Stream producer using Spark streaming

$ /usr/lib/spark/bin/run-example streaming.KafkaWordCountProducer server01:9092 stockprice 10 5

Source Code: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

Monitoring KAFKA server process using JMX

Launch jconsole or jvisualvm and connect to the remote JMX thread

Kafka Connect

  • Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis. Kafka Connect features include:
  • A common framework for Kafka connectors - Kafka Connect standardizes integration of other data systems with Kafka, simplifying connector development, deployment, and management
  • Distributed and standalone modes - scale up to a large, centrally managed service supporting an entire organization or scale down to development, testing, and small production deployments
  • REST interface - submit and manage connectors to your Kafka Connect cluster via an easy to use REST API
  • Automatic offset management - with just a little information from connectors, Kafka Connect can manage the offset commit process automatically so connector developers do not need to worry about this error prone part of connector development
  • Distributed and scalable by default - Kafka Connect builds on the existing group management protocol. More workers can be added to scale up a Kafka Connect cluster.
  • Streaming/batch integration - leveraging Kafka's existing capabilities, Kafka Connect is an ideal solution for bridging streaming and batch data systems

Since we are going to use JDBC as source, for demo, we will use mysql.

mysql> create database sample;
mysql> use sample;
mysql> CREATE TABLE deal(
 id BIGINT NOT NULL AUTO_INCREMENT,
 name VARCHAR(100),
 amount FLOAT,
 deleted TINYINT(1) DEFAULT 0,
 lastmodified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
 PRIMARY KEY (id)
);
mysql> INSERT INTO deal (name, amount) VALUES 
("Deal 1", 1000), 
("Deal 2", 2000);
mysql> select * from deal;

Download confluent. We will use kafka jdbc connector of confluent package.

$ wget http://packages.confluent.io/archive/2.0/confluent-2.0.0-2.11.7.tar.gz
$ tar zxf confluent-2.0.0-2.11.7.tar.gz 
$ sudo mv confluent-2.0.0 /usr/lib/

Create properties file for mysql source.

$ vi config/mysql-source.properties

Add the following content to the file

name=mysql-whitelist-timestamp-source

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=10

connection.url=jdbc:mysql://einext02:3306/sample?user=root&password=root

table.whitelist=deal

mode=timestamp+incrementing

timestamp.column.name=lastmodified

incrementing.column.name=id

topic.prefix=sample-

Add required jar files to the classpath

$ export CLASSPATH=/usr/share/java/mysql-connector-java.jar:/usr/lib/confluent-2.0.0/share/java/confluent-common/common-config-2.0.0.jar:/usr/lib/confluent-2.0.0/share/java/confluent-common/common-utils-2.0.0.jar:/usr/lib/confluent-2.0.0/share/java/kafka-connect-jdbc/kafka-connect-jdbc-2.0.0.jar

Now start the Kafka connector source for mysql

$ bin/connect-standalone.sh config/connect-standalone.properties config/mysql-source.properties

You can launch a consumer to verify the new data.

$ bin/kafka-console-consumer.sh --zookeeper einext02:2181/kafka --topic sample-deal --from-beginning