Streaming RDBMS Tables
Download Kafka
$ wget http://mirror.fibergrid.in/apache/kafka/0.10.0.1/kafka_2.10-0.10.0.1.tgz
$ tar -zxf kafka_2.10-0.10.0.1.tgz
$ mv kafka_2.10-0.10.0.1 /usr/lib/
Set KAFKA_HOME for all users.
$ sudo vi /etc/profile
$ export KAFKA_HOME=/usr/lib/kafka_2.10-0.10.0.1
$ source /etc/profile
$ cd $KAFKA_HOME
Create KAFKA log directories. 3 log directories for 3 brokers set up.
$ sudo mkdir -p /var/log/kafka
$ sudo chmod 777 /var/log/kafka
$ mkdir /var/log/kafka/logs-0
$ mkdir /var/log/kafka/logs-1
$ mkdir /var/log/kafka/logs-2
Directory structure
/var/log/kafka
├── logs-0
├── logs-1
└── logs-2
Assuming, you already have Zookeeper configured on your server. If zookeeper server is not running, start it.
$ sudo service zookeeper-server start
Copy existing server.properties and create 2 more. We are going to set up 3 brokers on a single machine.
$ cp config/server.properties config/server-1.properties
$ cp config/server.properties config/server-2.properties
$ mv config/server.properties config/server-3.properties
Start server
$ env JMX_PORT=9999 bin/kafka-server-start.sh config/server-1.properties &
$ env JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &
$ env JMX_PORT=9997 bin/kafka-server-start.sh config/server-3.properties &
Verify processes. Jps commands should display 3 Kafka processes among other things
$ sudo jps -l
16259 org.apache.zookeeper.server.quorum.QuorumPeerMain
3500 kafka.Kafka
3218 kafka.Kafka
2935 kafka.Kafka
...
Create topic
$ bin/kafka-topics.sh --create --zookeeper server01:2181/kafka --replication-factor 2 --partitions 1 --topic stockprice
View existing topics
$ bin/kafka-topics.sh --list --zookeeper server01:2181/kafka
Test your KAFKA by sending some test message on the stockprice topic
$ bin/kafka-console-producer.sh --broker-list server01:9092,server01:9093,server01:9094 --topic stockprice
Verify the sent message by starting a test consumer
$ bin/kafka-console-consumer.sh --zookeeper server01:2181/kafka --topic stockprice --from-beginning
Now type in some message in the consumer terminal and press enter. You should find the message in the consumer terminal.
Describe the stockprice topic
$ bin/kafka-topics.sh --describe --zookeeper server01:2181/kafka --topic stockprice
Find consumer groups
$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper server01:2181/kafka --list
Check consumer position for each topic for a consumer group
$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper server01:2181/kafka --describe --group <consumer group id>
Find offset range for a topic
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server01:9092,server01:9093,server01:9094 --topic stockprice --time -1
If you want to kill the Kafka server process, run the following commands
$ sudo ps -ef | grep server.properties
$ kill -9 <pid>
OR use stop-server.sh
$ bin/kafka-server-stop.sh
Processing Streaming Data Using Spark Streaming
$ /usr/lib/spark/bin/run-example streaming.KafkaWordCount server02:2181/kafka spark-stream-consumer stockprice 1
Stream producer using Spark streaming
$ /usr/lib/spark/bin/run-example streaming.KafkaWordCountProducer server01:9092 stockprice 10 5
Monitoring KAFKA server process using JMX
Launch jconsole or jvisualvm and connect to the remote JMX thread
Kafka Connect
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis. Kafka Connect features include:
A common framework for Kafka connectors - Kafka Connect standardizes integration of other data systems with Kafka, simplifying connector development, deployment, and management
Distributed and standalone modes - scale up to a large, centrally managed service supporting an entire organization or scale down to development, testing, and small production deployments
REST interface - submit and manage connectors to your Kafka Connect cluster via an easy to use REST API
Automatic offset management - with just a little information from connectors, Kafka Connect can manage the offset commit process automatically so connector developers do not need to worry about this error prone part of connector development
Distributed and scalable by default - Kafka Connect builds on the existing group management protocol. More workers can be added to scale up a Kafka Connect cluster.
Streaming/batch integration - leveraging Kafka's existing capabilities, Kafka Connect is an ideal solution for bridging streaming and batch data systems
Since we are going to use JDBC as source, for demo, we will use mysql.
mysql> create database sample;
mysql> use sample;
mysql> CREATE TABLE deal(
id BIGINT NOT NULL AUTO_INCREMENT,
name VARCHAR(100),
amount FLOAT,
deleted TINYINT(1) DEFAULT 0,
lastmodified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
mysql> INSERT INTO deal (name, amount) VALUES
("Deal 1", 1000),
("Deal 2", 2000);
mysql> select * from deal;
Download confluent. We will use kafka jdbc connector of confluent package.
$ wget http://packages.confluent.io/archive/2.0/confluent-2.0.0-2.11.7.tar.gz
$ tar zxf confluent-2.0.0-2.11.7.tar.gz
$ sudo mv confluent-2.0.0 /usr/lib/
Create properties file for mysql source.
$ vi config/mysql-source.properties
Add the following content to the file
name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://einext02:3306/sample?user=root&password=root
table.whitelist=deal
mode=timestamp+incrementing
timestamp.column.name=lastmodified
incrementing.column.name=id
topic.prefix=sample-
Add required jar files to the classpath
$ export CLASSPATH=/usr/share/java/mysql-connector-java.jar:/usr/lib/confluent-2.0.0/share/java/confluent-common/common-config-2.0.0.jar:/usr/lib/confluent-2.0.0/share/java/confluent-common/common-utils-2.0.0.jar:/usr/lib/confluent-2.0.0/share/java/kafka-connect-jdbc/kafka-connect-jdbc-2.0.0.jar
Now start the Kafka connector source for mysql
$ bin/connect-standalone.sh config/connect-standalone.properties config/mysql-source.properties
You can launch a consumer to verify the new data.
$ bin/kafka-console-consumer.sh --zookeeper einext02:2181/kafka --topic sample-deal --from-beginning