Twitter Stream as Kafka Source
Goal: we want to live stream twitter feed using twitter api and publish the stream to kafka.
We will use java on jdk 1.8. to build this project. Full source code is available at github.
Create a java project and enable maven. Add the following maven dependencies.
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>[4.0,)</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>[4.0,)</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>
Here is a main class that works a starting point.
package com.einext;
import twitter4j.*;
import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class Stream {
private static Logger logger = Logger.getLogger(Stream.class);
private static Twitter twitter = TwitterFactory.getSingleton();
public static void main(String[] args) throws Exception {
boolean skipKafka = Arrays.asList(args).contains("skip-kafka");
System.out.println("skip-kafka: " + skipKafka);
AppStatusListener.setSkipKafka(skipKafka);
System.out.println("Twitter screen name of current user: " + twitter.getScreenName());
long[] friends = twitter.friendsFollowers().getFriendsIDs(twitter.getId(), -1).getIDs();
System.out.println("No of friends: " + friends.length);
List<String> terms = new ArrayList<String>();
File f = new File("terms.txt");
if(f.isFile() && f.canRead()){
System.out.println("Tracking terms found in " + f.getAbsolutePath());
BufferedReader br = new BufferedReader(new FileReader(f.getAbsolutePath()));
String term;
while((term = br.readLine()) != null){
terms.add(term);
System.out.println(term);
}
}
if(terms.size() == 0){
System.out.println("Specify the terms to track in terms.txt file");
System.exit(0);
}
String[] searchTerms = terms.toArray(new String[0]);
TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new AppStatusListener());
FilterQuery filterQuery = new FilterQuery();
//filterQuery.follow(friends);
filterQuery.language("en");
filterQuery.track(searchTerms);
twitterStream.filter(filterQuery);
logger.info("Simple message");
}
}
AppStatusLister is the class that contain method onStatus change the send the tweets to Kafka as Kafka producer.
package com.einext;
import org.apache.log4j.Logger;
import twitter4j.*;
public class AppStatusListener implements StatusListener {
private Logger logger = Logger.getLogger(AppStatusListener.class);
private static boolean skipKafka = false;
public static void setSkipKafka(boolean value){
skipKafka = value;
}
public void onStatus(Status status) {
System.out.println("@" + status.getUser().getScreenName() + " - " + status.getText());
String rawJson = TwitterObjectFactory.getRawJSON(status);
logger.info(rawJson);
if(!skipKafka) {
KafkaSource.send(null, rawJson);
}
}
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
//System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
//System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
}
public void onScrubGeo(long userId, long upToStatusId) {
//System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
}
public void onStallWarning(StallWarning warning) {
//System.out.println("Got stall warning:" + warning);
}
public void onException(Exception ex) {
//ex.printStackTrace();
}
}
Kafka producer class is written in in the following class.
package com.einext;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaSource {
private static Producer<String, String> producer;
private static String topic;
{
Properties properties = PropertiesLoader.getKafkaProperties();
topic = properties.getProperty("topic");
properties.remove("topic");
producer = new KafkaProducer<>(properties);
}
public static void send(String key, String message){
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, message);
producer.send(producerRecord);
}
}
Steps
This application requires java 1.8
$ export JAVA_HOME=/usr/java/jdk1.8.0_112/
Make sure you select the right folder location as appropriate on your system.
Download the code from git repo
$ git clone https://github.com/abulbasar/twitter-kafka-source.git
$ cd twitter-kafka-source
Update with your credentials in src/main/resources/twitter4j.properties file as below.
Below are just example, you must use yours:
oauth.consumerKey=WyMCcCcNn57XfM3jcyzwVF7l0
oauth.consumerSecret=nW3TyBcx2AZKuMMcLJhO7mbdqjNRWQecbiiRcTUMEJrC6SIxQA
oauth.accessToken=15101127-lk1sO1v1XoNe48IiJuR1JckSISelFt38jy9ncxvpq
oauth.accessTokenSecret=iKyz3q69tPz8FZQRLWDD8h2TzRmDAaQ8d9XWxKs38zKlj
To sign up for twitter dev account at https://apps.twitter.com, you can find the instructions here.
Optionally you can update terms.txt file to add your search terms in twitter.
Ensure that java 8 is enabled
$ $JAVA_HOME/bin/java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
Run the streaming
$ mvn exec:java -Dexec.mainClass=com.einext.Stream
Run without sending to Kafka
$ mvn exec:java -Dexec.mainClass=com.einext.Stream -Dexec.args=skip-kafka
Alternatively, use maven package command to create a fat jar and run the jar
$ mvn package
$ $JAVA_HOME/bin/java -jar target/TwitterSource-1.0-SNAPSHOT-jar-with-dependencies.jar
Or
$ java -jar target/TwitterSource-1.0-SNAPSHOT-jar-with-dependencies.jar skip-kafka