Apache Kafka
Verify Zookeeper
sudo jps -l
sudo service zookeeper-server status
Dowbload kafka from http://kafka.apache.org/downloads. Here I used version 2.11-0.10.1.0.
Extract the binaries
tar -xf kafka_2.11-0.10.1.0.tgz
sudo mv kafka_2.11-0.10.1.0 /usr/lib/
cd /usr/lib/kafka_2.11-0.10.1.0
Set KAFKA_HOME and ZOOKEEPER_HOME in /etc/profile
sudo vi /etc/profile
Set environment variables for easy access
export KAFKA_HOME=/usr/lib/kafka_2.11-0.10.1.0
export ZOOKEEPER_HOME=/usr/lib/zookeeper
Run the following command to take the above environment variables in effect
source /etc/profile
Scenario: Single Broker, Single Producer and Single Consumer
Modify config. The config is used by kafka broker.
vi config/server.properties
zookeeper.connect=localhost:2181/kafka
Start kafka broker
bin/kafka-server-start.sh config/server.properties
Open a separate window for Zookeeper client
cd /usr/lib/zookeeper
bin/zkCli.sh
zkCli> ls /
Open another terminal
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka
You should see no topics since we have not yet created any topics.
Now create a topic. You can create as many partitions as you want, but replication-factor must be less than or equal to number of brokers.
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 2 --topic demo
Describe the topics.
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181/kafka demo
In the output Isr stands for "In Sync Replica".
Now let's look at the kafka logs
Open config/server.properties and look for log.dirs property. Default value is /tmp/kafka-logs
Check the log.dirs directory, assuming /tmp/kafka-logs is the log directory
ls -l /tmp/kafka-logs
For each partition of a topic, you will find one directory. For demo, since we created the topic with 2 partitions, you will see 2 folders. Let's go inside one of them.
ls -l /tmp/kafka-logs/demo-0/
total 0
-rw-rw-r-- 1 cloudera cloudera 10485760 Oct 20 03:19 00000000000000000000.index
-rw-rw-r-- 1 cloudera cloudera 0 Oct 20 03:19 00000000000000000000.log
Messages are stored in the .log file. Index are stored in .index file. Index file offset of the message for a given offset value.
Start a producer
Open a new terminal for producer
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
Note that after starting, the terminal will wait for your message followed by <enter>.
Start the consumer
Open a new terminal for consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
On the producer window, type a message followed by enter. You should be able to view the message in the consumer terminal.
Switch to the terminal that opened the kafka-logs directory, and now view the directory content. Notice that .log file now has new data. Earlier it was a zero byte.
ls -l /tmp/kafka-logs/demo-0/
Check which process and user is accessing the .log file.
lsof | grep /tmp/kafka-logs/demo-0/00000000000000000000.log
Collect the process id from the output of the above command, and view the process details.
$ ps -C -p <pid e.g. 12552>
Switch zookeeper terminal and explore consumer offset for topic consumer combination. Note, in your system consumer group id may be different.
zkCli> ls /kafka/consumers/console-consumer-76673/offsets/demo
zkCli> get /kafka/consumers/console-consumer-76673/offsets/demo/0
Now, let's increase the number of partitions to 3. Kafka does not allow to decrease the number of partitions for topic.
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic demo --partitions 3
Describe the partitions to view the new partitions created
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic demo
Topic:demo PartitionCount:3 ReplicationFactor:1 Configs:
Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: demo Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: demo Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Verify that a new kafka-log directory has been created for demo topic. In total it should show 3 directory
ls -l /tmp/kafka-logs
drwxrwxr-x 2 cloudera cloudera 4096 Oct 20 03:19 demo-0
drwxrwxr-x 2 cloudera cloudera 4096 Oct 20 04:03 demo-1
drwxrwxr-x 2 cloudera cloudera 4096 Oct 20 04:03 demo-2
Verify the log file size under each partition to indicate which log file contains data.
ls -l /tmp/kafka-logs/demo-0/00000000000000000000.log
ls -l /tmp/kafka-logs/demo-1/00000000000000000000.log
ls -l /tmp/kafka-logs/demo-2/00000000000000000000.log
Only first partition log should have some data. The other 2 do not.
Switch to producer terminal, and publish a few new messages and check the log files. You might notice that the new messages will go to one of the partition and all new messages will move to the same partition.
Delete A Topic
By default kafka does not allow to delete. Enable delete at the server.properties file by setting delete.topic.enable=true. You have to restart the kafka-brokers to take the configuration change in effect.
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic demo --delete
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list
When you delete a topic, it is marked for deletion and the flag is saved in Zookeeper. You can switch to Zookeeper terminal and view it.
zkCli> ls /kafka/admin/delete_topics
If you want to undelete it. Makes
zkCli> delete /admin/delete_topics/$YOUR_TOPIC_NAME
After you delete verify that the topic has been deleted and kafka logs are also deleted.
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list
ls -l /tmp/kafka-logs/
Find the list of consumer groups
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Find the lags for by topic partition for a given consumer group
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-45444
View the message from a kafka-log
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /tmp/kafka-logs/demo-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1538635114452 isvalid: true keysize: -1 valuesize: 4 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: abul
offset: 1 position: 72 CreateTime: 1538635390773 isvalid: true keysize: -1 valuesize: 5 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: basar
offset: 2 position: 145 CreateTime: 1538635845223 isvalid: true keysize: -1 valuesize: 120 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: {"id": "123", "merchant_id": "m123", "customer_id": "c345", "amount": 100.1, "category": "pos", "timestamp": 1538635845}
Scenario 2: Single producer, Single Consumer and multiple Brokers
Start 8 terminals that we will refer in this doc by the following name.
Broker-0
Broker-1
Broker-2
Broker-4
Producer
Consumer
Zookeeper
Shell
Go to $KAFKA_HOME and copy config/server.properties to the following files.
cd $KAFKA_HOME
cp config/server.properties config/server-0.properties
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties
Update 3 properties on each property file, as show below
Start the brokers on separate terminals.
Terminal 1:
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-0.properties
Terminal 2:
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
Terminal 3:
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-2.properties
Terminal 4:
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-3.properties
Open Zookeeper Terminal and launch zookeeper cli
cd $ZOOKEEPER_HOME
bin/zkCli.sh
zkCli> get /kafka/brokers/ids/0
zkCli> get /kafka/brokers/ids/1
zkCli> get /kafka/brokers/ids/2
zkCli> get /kafka/brokers/ids/3
Examine the id, host and port for each broker id. It may indicate to you that connecting to Zookeeper can help you discover the Kafka cluster.
View the kafka logs
tree /tmp/kafka-logs-*
Create a new topic T2 with replication factor 3 and partition 1. To create the topic, you can use Shell terminal.
cd $KAFKA_HOME
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic T2 --create --partitions 2 --replication-factor 3
Describe the topic
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic T2 --describe
View the kafka logs after creating the topics. It should reflect that new log directories have been created for the new topic T2
tree /tmp/kafka-logs-*
Start Kafka console producer in the Producer terminal.
cd $KAFKA_HOME
bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic T2
Start a Kafka console consumer in the Consumer terminal.
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic T2
Scenario 3: Single Broker, Multi-Consumer, Multi Broker
Goal: create 2 consumers under a common consumer group id.
Create a config file for consumer
cd $KAFKA_HOME
vi config/console-consumer-group.properties
group.id=console-consumer-group
Create a topic with multiple partitions.
$ bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic t3 --create --partitions 4 --replication-factor 1
Start the producer for the new topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t3
Start the first consumer with consumer config
bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic t3 --consumer.config config/console-consumer-group.properties
Start second consumer for consumer group "console-consumer-group"
bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic t3 --consumer.config config/console-consumer-group.properties
Now post a new few new messages. Observe that the message goes to one of the consumers, but not both. If you shutdown one consumer, the broker will reassign the partitions to the active consumer, so you should see the messages on the other consumer.
Save Kafka Topic To HDFS using Flume
Create flume configuration file called flume.conf
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = demo
tier1.sources.source1.kafka.consumer.group.id = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/cloudera/kafka/%{topic}/%Y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
# tier1.sinks.sink1.hdfs.fileType = CompressedStream
tier1.sinks.sink1.channel = channel1
HDFS sink properties
https://flume.apache.org/FlumeUserGuide.html#hdfs-sink
Kafka source properties
https://flume.apache.org/FlumeUserGuide.html#kafka-source
Start flume agent
$ flume-ng agent -n tier1 -f flume.conf
View the data saves in HDFS