Kafka Connect


Start Kafka connect worker

$ bin/connect-distributed.sh config/connect-distributed.properties


Check whether end point is up

$ curl localhost:8083
{"version":"2.5.0","commit":"66563e712b0b9f84","kafka_cluster_id":"qj4V0zd7QHGbz2EWj4yAdQ"}


Find. available plugins

$ curl -s http://localhost:8083/connector-plugins | jq -r
[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.5.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.5.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]


Next we will create a file source and file sink using out of the connector.


Create a source file. We will append to this this. The new content will be appended to the kafka topic that will ultimately saved to a copy file.

touch /tmp/source.txt


Create file source connector

echo '{
    "name": "load-sample-data",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "file": "/tmp/source.txt",
        "topic": "sample-topic", 
        "tasks.max": 1
    }
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" | jq -r



View existing connectors.

$ curl -s http://localhost:8083/connectors | jq -r
[
  "load-sample-data"
]



Check the connector config.

$ curl -s http://localhost:8083/connectors/load-sample-data/config | jq -r
{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
  "file": "/tmp/source.txt",
  "name": "load-sample-data",
  "topic": "sample-topic"
}




Append a few lines to the source file.

echo "line 1" >> /tmp/source.txt
echo "line 2" >> /tmp/source.txt
echo "line 3" >> /tmp/source.txt
echo "line 4" >> /tmp/source.txt


Check the status of the connector.

$ curl -s http://localhost:8083/connectors/load-sample-data/status | jq -r
{
  "name": "load-sample-data",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}



Find how many tasks are running for the connector.

$ curl -s http://localhost:8083/connectors/load-sample-data/tasks | jq -r
[
  {
    "id": {
      "connector": "load-sample-data",
      "task": 0
    },
    "config": {
      "file": "/tmp/source.txt",
      "task.class": "org.apache.kafka.connect.file.FileStreamSourceTask",
      "batch.size": "2000",
      "topic": "sample-topic"
    }
  }
]



You can pause or resume the tasks if you want.

$ curl -s -X PUT http://localhost:8083/connectors/load-sample-data/pause | jq -r
$ curl -s -X GET http://localhost:8083/connectors/load-sample-data/status | jq -r
{
  "name": "load-sample-data",
  "connector": {
    "state": "PAUSED",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "PAUSED",
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}


Resumed the paused task.

$ curl -s -X PUT http://localhost:8083/connectors/load-sample-data/resume | jq -r
$ curl -s http://localhost:8083/connectors/load-sample-data/status | jq -r
{
  "name": "load-sample-data",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}


You can restart the connector.

$ curl -s -X POST http://localhost:8083/connectors/load-sample-data/restart | jq -r
$ curl -s http://localhost:8083/connectors/load-sample-data/status | jq -r


Check the target topic for the source connector. Source connector always one topic, sink connector may be associated with. multiple topic.

$ curl -s http://localhost:8083/connectors/load-sample-data/topics | jq -r
{
  "load-sample-data": {
    "topics": [
      "sample-topic"
    ]
  }
}


Create sink connector.

echo '{
    "name": "dump-sample-data",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "file": "/tmp/sample-sink-output.text",
        "topics": "sample-topic"
    }
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" | jq -r


You should see output /tmp/sample-sink-output.text in the file. Append a few more lines in the source.txt, and check whether or not those lines are appearing in this output file.


Delete the connector.

curl -X DELETE http://localhost:8083/connectors/dump-sample-data



Next will will create JDBC source connector and Elasticsearch sink connector.


Create a directory and build the projects. Build the projects in sequence. If any one errors out, check the error. You may have to update the version of the parent project.

mkdir ~/kafka-connectors && cd ~/kafka-connectors

git clone https://github.com/confluentinc/kafka.git && cd kafka
./gradlew install -x test 

cd ~/kafka-connectors
git clone git@github.com:confluentinc/common.git 
mvn install -DskipTests

cd ~/kafka-connectors
git clone git@github.com:confluentinc/kafka-connect-jdbc.git
cd kafka-connect-jdbc/
mvn install -DskipTests

cd ~/kafka-connectors
git  clone git@github.com:confluentinc/kafka-connect-elasticsearch.git
cd kafka-connect-elasticsearch/
mvn install -DskipTests

cp target/kafka-connect-elasticsearch-6.1.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/* ~/Downloads/Flipkart/kafka_2.12-2.5.0/libs/


Download mysql jdbc driver to Kafka libs dir

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar 
wget https://repo1.maven.org/maven2/com/google/guava/guava/22.0/guava-22.0.jar


Restart connect

$ bin/connect-distributed.sh config/connect-distributed.properties 



You should see new connector plugins available to Kafka connect.

$ curl -s http://localhost:8083/connector-plugins | jq -r
[
  {
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",
    "version": "6.1.0-SNAPSHOT"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "6.1.0-SNAPSHOT"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "6.1.0-SNAPSHOT"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.5.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.5.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]


In Mysql create a database, table and a few sample records. Install mysql server if you do not already have one.

$ mysql.server restart
mysql> create database training;
Query OK, 1 row affected (0.00 sec)
mysql> use training;
Database changed
mysql> create table login (username varchar(30), login_time datetime); Query OK, 0 rows affected (0.02 sec)
mysql> insert into login values ('gwenshap', now()); Query OK, 1 row affected (0.01 sec)
mysql> insert into login values ('tpalino', now()); Query OK, 1 row affected (0.00 sec)
mysql> commit;
Query OK, 0 rows affected (0.01 sec)


To create user account in mysql

mysql> SELECT user, host FROM mysql.user;
mysql> CREATE USER 'training'@'%' IDENTIFIED BY 'Training@123';
mysql> GRANT ALL PRIVILEGES ON *.* TO 'training'@'%';
mysql> FLUSH PRIVILEGES;



Create mysql source connector

echo '{
    "name": "mysql-login-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://einext02:3306/training",
        "connection.user": "training",
        "connection.password": "Training@123",
        "mode": "timestamp",
        "table.whitelist": "login",
        "validate.non.null": false,
        "timestamp.column.name": "login_time",
        "topic.prefix": "mysql."
    }
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" | jq -r


Document on JDBC connector properties: https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html#jdbc-source-configs




Check whether the records are appearing in the kafka topic.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql.login --from-beginning
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"gwenshap","login_time":1592853019000}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"tpalino","login_time":1592853026000}}



While the connector is running, if you insert additional rows in the login table, you should immediately see them reflected in the mysql.login topic


Create elasticsearch sink connector. Install elasticsearch if required.

echo '{
    "name": "elastic-login-connector",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://localhost:9200",
        "type.name": "mysql-data",
        "topics": "mysql.login",
        "key.ignore": true
    }
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" | jq -r



Check whether appear in elasticsearch

curl 'localhost:9200/_cat/indices?v'
health status index       uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   mysql.login 1r5KxVbKS1ua7R7OiBls3g   1   1          4            0     10.6kb         10.6kb
$ curl -s -X "GET" "http://localhost:9200/mysql.login/_search?pretty=true"
{
  "took" : 45,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mysql.login",
        "_type" : "mysql-data",
        "_id" : "mysql.login+0+1",
        "_score" : 1.0,
        "_source" : {
          "username" : "tpalino",
          "login_time" : 1592853026000
        }
      },
      ...
    ]
  }
}