Kafka Connect
Start Kafka connect worker
$ bin/connect-distributed.sh config/connect-distributed.properties
Check whether end point is up
$ curl localhost:8083
{"version":"2.5.0","commit":"66563e712b0b9f84","kafka_cluster_id":"qj4V0zd7QHGbz2EWj4yAdQ"}
Find. available plugins
$ curl -s http://localhost:8083/connector-plugins | jq -r
[
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.5.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.5.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
Next we will create a file source and file sink using out of the connector.
Create a source file. We will append to this this. The new content will be appended to the kafka topic that will ultimately saved to a copy file.
touch /tmp/source.txt
Create file source connector
echo '{
"name": "load-sample-data",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/source.txt",
"topic": "sample-topic",
"tasks.max": 1
}
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" | jq -r
View existing connectors.
$ curl -s http://localhost:8083/connectors | jq -r
[
"load-sample-data"
]
Check the connector config.
$ curl -s http://localhost:8083/connectors/load-sample-data/config | jq -r
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/source.txt",
"name": "load-sample-data",
"topic": "sample-topic"
}
Append a few lines to the source file.
echo "line 1" >> /tmp/source.txt
echo "line 2" >> /tmp/source.txt
echo "line 3" >> /tmp/source.txt
echo "line 4" >> /tmp/source.txt
Check the status of the connector.
$ curl -s http://localhost:8083/connectors/load-sample-data/status | jq -r
{
"name": "load-sample-data",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
Find how many tasks are running for the connector.
$ curl -s http://localhost:8083/connectors/load-sample-data/tasks | jq -r
[
{
"id": {
"connector": "load-sample-data",
"task": 0
},
"config": {
"file": "/tmp/source.txt",
"task.class": "org.apache.kafka.connect.file.FileStreamSourceTask",
"batch.size": "2000",
"topic": "sample-topic"
}
}
]
You can pause or resume the tasks if you want.
$ curl -s -X PUT http://localhost:8083/connectors/load-sample-data/pause | jq -r
$ curl -s -X GET http://localhost:8083/connectors/load-sample-data/status | jq -r
{
"name": "load-sample-data",
"connector": {
"state": "PAUSED",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "PAUSED",
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
Resumed the paused task.
$ curl -s -X PUT http://localhost:8083/connectors/load-sample-data/resume | jq -r
$ curl -s http://localhost:8083/connectors/load-sample-data/status | jq -r
{
"name": "load-sample-data",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
You can restart the connector.
$ curl -s -X POST http://localhost:8083/connectors/load-sample-data/restart | jq -r
$ curl -s http://localhost:8083/connectors/load-sample-data/status | jq -r
Check the target topic for the source connector. Source connector always one topic, sink connector may be associated with. multiple topic.
$ curl -s http://localhost:8083/connectors/load-sample-data/topics | jq -r
{
"load-sample-data": {
"topics": [
"sample-topic"
]
}
}
Create sink connector.
echo '{
"name": "dump-sample-data",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"file": "/tmp/sample-sink-output.text",
"topics": "sample-topic"
}
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" | jq -r
You should see output /tmp/sample-sink-output.text in the file. Append a few more lines in the source.txt, and check whether or not those lines are appearing in this output file.
Delete the connector.
curl -X DELETE http://localhost:8083/connectors/dump-sample-data
Next will will create JDBC source connector and Elasticsearch sink connector.
Create a directory and build the projects. Build the projects in sequence. If any one errors out, check the error. You may have to update the version of the parent project.
mkdir ~/kafka-connectors && cd ~/kafka-connectors
git clone https://github.com/confluentinc/kafka.git && cd kafka
./gradlew install -x test
cd ~/kafka-connectors
git clone git@github.com:confluentinc/common.git
mvn install -DskipTests
cd ~/kafka-connectors
git clone git@github.com:confluentinc/kafka-connect-jdbc.git
cd kafka-connect-jdbc/
mvn install -DskipTests
cd ~/kafka-connectors
git clone git@github.com:confluentinc/kafka-connect-elasticsearch.git
cd kafka-connect-elasticsearch/
mvn install -DskipTests
cp target/kafka-connect-elasticsearch-6.1.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/* ~/Downloads/Flipkart/kafka_2.12-2.5.0/libs/
Download mysql jdbc driver to Kafka libs dir
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar
wget https://repo1.maven.org/maven2/com/google/guava/guava/22.0/guava-22.0.jar
Restart connect
$ bin/connect-distributed.sh config/connect-distributed.properties
You should see new connector plugins available to Kafka connect.
$ curl -s http://localhost:8083/connector-plugins | jq -r
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "6.1.0-SNAPSHOT"
},
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "6.1.0-SNAPSHOT"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "6.1.0-SNAPSHOT"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.5.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.5.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
In Mysql create a database, table and a few sample records. Install mysql server if you do not already have one.
$ mysql.server restart
mysql> create database training;
Query OK, 1 row affected (0.00 sec)
mysql> use training;
Database changed
mysql> create table login (username varchar(30), login_time datetime); Query OK, 0 rows affected (0.02 sec)
mysql> insert into login values ('gwenshap', now()); Query OK, 1 row affected (0.01 sec)
mysql> insert into login values ('tpalino', now()); Query OK, 1 row affected (0.00 sec)
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
To create user account in mysql
mysql> SELECT user, host FROM mysql.user;
mysql> CREATE USER 'training'@'%' IDENTIFIED BY 'Training@123';
mysql> GRANT ALL PRIVILEGES ON *.* TO 'training'@'%';
mysql> FLUSH PRIVILEGES;
Create mysql source connector
echo '{
"name": "mysql-login-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://einext02:3306/training",
"connection.user": "training",
"connection.password": "Training@123",
"mode": "timestamp",
"table.whitelist": "login",
"validate.non.null": false,
"timestamp.column.name": "login_time",
"topic.prefix": "mysql."
}
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" | jq -r
Document on JDBC connector properties: https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html#jdbc-source-configs
Check whether the records are appearing in the kafka topic.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql.login --from-beginning
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"gwenshap","login_time":1592853019000}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"tpalino","login_time":1592853026000}}
While the connector is running, if you insert additional rows in the login table, you should immediately see them reflected in the mysql.login topic
Create elasticsearch sink connector. Install elasticsearch if required.
echo '{
"name": "elastic-login-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://localhost:9200",
"type.name": "mysql-data",
"topics": "mysql.login",
"key.ignore": true
}
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" | jq -r
Check whether appear in elasticsearch
curl 'localhost:9200/_cat/indices?v'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open mysql.login 1r5KxVbKS1ua7R7OiBls3g 1 1 4 0 10.6kb 10.6kb
$ curl -s -X "GET" "http://localhost:9200/mysql.login/_search?pretty=true"
{
"took" : 45,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "mysql.login",
"_type" : "mysql-data",
"_id" : "mysql.login+0+1",
"_score" : 1.0,
"_source" : {
"username" : "tpalino",
"login_time" : 1592853026000
}
},
...
]
}
}