Apache Kafka

Verify Zookeeper

sudo jps -l

sudo service zookeeper-server status

Dowbload kafka from http://kafka.apache.org/downloads. Here I used version 2.11-

Extract the binaries

tar -xf kafka_2.11-

sudo mv kafka_2.11- /usr/lib/

cd /usr/lib/kafka_2.11-

Set KAFKA_HOME and ZOOKEEPER_HOME in /etc/profile

sudo vi /etc/profile

Set environment variables for easy access

export KAFKA_HOME=/usr/lib/kafka_2.11-

export ZOOKEEPER_HOME=/usr/lib/zookeeper

Run the following command to take the above environment variables in effect

source /etc/profile

Scenario: Single Broker, Single Producer and Single Consumer

Modify config. The config is used by kafka broker.

vi config/server.properties


Start kafka broker

bin/kafka-server-start.sh config/server.properties

Open a separate window for Zookeeper client

cd /usr/lib/zookeeper


zkCli> ls /

Open another terminal

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka

You should see no topics since we have not yet created any topics.

Now create a topic. You can create as many partitions as you want, but replication-factor must be less than or equal to number of brokers.

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 2 --topic demo

Describe the topics.

$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181/kafka demo

In the output Isr stands for "In Sync Replica".

Now let's look at the kafka logs

Open config/server.properties and look for log.dirs property. Default value is /tmp/kafka-logs

Check the log.dirs directory, assuming /tmp/kafka-logs is the log directory

ls -l /tmp/kafka-logs

For each partition of a topic, you will find one directory. For demo, since we created the topic with 2 partitions, you will see 2 folders. Let's go inside one of them.

ls -l /tmp/kafka-logs/demo-0/

total 0

-rw-rw-r-- 1 cloudera cloudera 10485760 Oct 20 03:19 00000000000000000000.index

-rw-rw-r-- 1 cloudera cloudera 0 Oct 20 03:19 00000000000000000000.log

Messages are stored in the .log file. Index are stored in .index file. Index file offset of the message for a given offset value.

Start a producer

Open a new terminal for producer

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo

Note that after starting, the terminal will wait for your message followed by <enter>.

Start the consumer

Open a new terminal for consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

On the producer window, type a message followed by enter. You should be able to view the message in the consumer terminal.

Switch to the terminal that opened the kafka-logs directory, and now view the directory content. Notice that .log file now has new data. Earlier it was a zero byte.

ls -l /tmp/kafka-logs/demo-0/

Check which process and user is accessing the .log file.

lsof | grep /tmp/kafka-logs/demo-0/00000000000000000000.log

Collect the process id from the output of the above command, and view the process details.

$ ps -C -p <pid e.g. 12552>

Switch zookeeper terminal and explore consumer offset for topic consumer combination. Note, in your system consumer group id may be different.

zkCli> ls /kafka/consumers/console-consumer-76673/offsets/demo

zkCli> get /kafka/consumers/console-consumer-76673/offsets/demo/0

Now, let's increase the number of partitions to 3. Kafka does not allow to decrease the number of partitions for topic.

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic demo --partitions 3

Describe the partitions to view the new partitions created

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic demo

Topic:demo PartitionCount:3 ReplicationFactor:1 Configs:

Topic: demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Topic: demo Partition: 1 Leader: 0 Replicas: 0 Isr: 0

Topic: demo Partition: 2 Leader: 0 Replicas: 0 Isr: 0

Verify that a new kafka-log directory has been created for demo topic. In total it should show 3 directory

ls -l /tmp/kafka-logs

drwxrwxr-x 2 cloudera cloudera 4096 Oct 20 03:19 demo-0

drwxrwxr-x 2 cloudera cloudera 4096 Oct 20 04:03 demo-1

drwxrwxr-x 2 cloudera cloudera 4096 Oct 20 04:03 demo-2

Verify the log file size under each partition to indicate which log file contains data.

ls -l /tmp/kafka-logs/demo-0/00000000000000000000.log

ls -l /tmp/kafka-logs/demo-1/00000000000000000000.log

ls -l /tmp/kafka-logs/demo-2/00000000000000000000.log

Only first partition log should have some data. The other 2 do not.

Switch to producer terminal, and publish a few new messages and check the log files. You might notice that the new messages will go to one of the partition and all new messages will move to the same partition.

Delete A Topic

By default kafka does not allow to delete. Enable delete at the server.properties file by setting delete.topic.enable=true. You have to restart the kafka-brokers to take the configuration change in effect.

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic demo --delete

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list

When you delete a topic, it is marked for deletion and the flag is saved in Zookeeper. You can switch to Zookeeper terminal and view it.

zkCli> ls /kafka/admin/delete_topics

If you want to undelete it. Makes

zkCli> delete /admin/delete_topics/$YOUR_TOPIC_NAME

After you delete verify that the topic has been deleted and kafka logs are also deleted.

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list

ls -l /tmp/kafka-logs/

Find the list of consumer groups

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Find the lags for by topic partition for a given consumer group

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-45444

View the message from a kafka-log

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /tmp/kafka-logs/demo-0/00000000000000000000.log

Starting offset: 0

offset: 0 position: 0 CreateTime: 1538635114452 isvalid: true keysize: -1 valuesize: 4 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: abul

offset: 1 position: 72 CreateTime: 1538635390773 isvalid: true keysize: -1 valuesize: 5 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: basar

offset: 2 position: 145 CreateTime: 1538635845223 isvalid: true keysize: -1 valuesize: 120 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: {"id": "123", "merchant_id": "m123", "customer_id": "c345", "amount": 100.1, "category": "pos", "timestamp": 1538635845}

Scenario 2: Single producer, Single Consumer and multiple Brokers

Start 8 terminals that we will refer in this doc by the following name.

  • Broker-0

  • Broker-1

  • Broker-2

  • Broker-4

  • Producer

  • Consumer

  • Zookeeper

  • Shell

Go to $KAFKA_HOME and copy config/server.properties to the following files.


cp config/server.properties config/server-0.properties

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

cp config/server.properties config/server-3.properties

Update 3 properties on each property file, as show below

Start the brokers on separate terminals.

Terminal 1:

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-0.properties

Terminal 2:

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-1.properties

Terminal 3:

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-2.properties

Terminal 4:

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server-3.properties

Open Zookeeper Terminal and launch zookeeper cli



zkCli> get /kafka/brokers/ids/0

zkCli> get /kafka/brokers/ids/1

zkCli> get /kafka/brokers/ids/2

zkCli> get /kafka/brokers/ids/3

Examine the id, host and port for each broker id. It may indicate to you that connecting to Zookeeper can help you discover the Kafka cluster.

View the kafka logs

tree /tmp/kafka-logs-*

Create a new topic T2 with replication factor 3 and partition 1. To create the topic, you can use Shell terminal.


bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic T2 --create --partitions 2 --replication-factor 3

Describe the topic

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic T2 --describe

View the kafka logs after creating the topics. It should reflect that new log directories have been created for the new topic T2

tree /tmp/kafka-logs-*

Start Kafka console producer in the Producer terminal.


bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic T2

Start a Kafka console consumer in the Consumer terminal.


bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic T2

Scenario 3: Single Broker, Multi-Consumer, Multi Broker

Goal: create 2 consumers under a common consumer group id.

Create a config file for consumer


vi config/console-consumer-group.properties


Create a topic with multiple partitions.

$ bin/kafka-topics.sh --zookeeper localhost:2181/kafka --topic t3 --create --partitions 4 --replication-factor 1

Start the producer for the new topic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t3

Start the first consumer with consumer config

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic t3 --consumer.config config/console-consumer-group.properties

Start second consumer for consumer group "console-consumer-group"

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic t3 --consumer.config config/console-consumer-group.properties

Now post a new few new messages. Observe that the message goes to one of the consumers, but not both. If you shutdown one consumer, the broker will reassign the partitions to the active consumer, so you should see the messages on the other consumer.

Save Kafka Topic To HDFS using Flume

Create flume configuration file called flume.conf

tier1.sources = source1

tier1.channels = channel1

tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource

tier1.sources.source1.kafka.bootstrap.servers = localhost:9092

tier1.sources.source1.kafka.topics = demo

tier1.sources.source1.kafka.consumer.group.id = flume

tier1.sources.source1.channels = channel1

tier1.sources.source1.interceptors = i1

tier1.sources.source1.interceptors.i1.type = timestamp

tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = memory

tier1.channels.channel1.capacity = 10000

tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs

tier1.sinks.sink1.hdfs.path = /user/cloudera/kafka/%{topic}/%Y-%m-%d

tier1.sinks.sink1.hdfs.rollInterval = 5

tier1.sinks.sink1.hdfs.rollSize = 0

tier1.sinks.sink1.hdfs.rollCount = 0

tier1.sinks.sink1.hdfs.fileType = DataStream

# tier1.sinks.sink1.hdfs.fileType = CompressedStream

tier1.sinks.sink1.channel = channel1

HDFS sink properties


Kafka source properties


Start flume agent

$ flume-ng agent -n tier1 -f flume.conf

View the data saves in HDFS