Twitter live streaming (python)
Create a script for streaming twitter, stream_twitter.py
#!/usr/bin/env python
"""
Pre-requisites:
For this application we are using tweepy. Install it if required.
$ pip install tweepy
Sign up for an app in https://apps.twitter.com/
"""
import tweepy
import json
# Retrieve the consumer keys and secrets from apps.twitter.com
consumer_key = "6IqHdMv..."
consumer_secret = "uvgQG..."
access_token = "15101127-AY4CW..."
access_token_secret = "DEPgC5Ue..."
data_dir = "/home/mapr/workspaces/tweets/"
# Create the above directory if not exists
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
print(status.user.name + ": " + status.text)
message = json.dumps(status._json).encode("utf-8")
with open(data_dir + str(status.id) + ".json", "wb") as f:
f.write(message)
def on_error(self, status_code):
print("Error code: ", status_code)
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
search_terms = ["trump"]
myStream.filter(track=search_terms)
Run the script
$ mkdir tweets
$ python stream_twitter.py
View the output.
$ ls -1 tweets/ | head
1013899705682546689.json
1013899855318650880.json
1013899931181076480.json
1013899931512238082.json
1013899931541737472.json
1013899931952787456.json
1013899932372172805.json
1013899932426817536.json
1013899932565155841.json
1013899932867223586.json
...
Note: this application create a lot of small files. In a small time, it will create a of files. It is not a scalable solution. A better solution is to push the twitter messages to Kafka.
Process the tweets with Spark streaming
Save the following code as spark_streaming_tweets_file_source.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.storagelevel import StorageLevel
import json
input_path = "file:///home/mapr/workspaces/tweets/"
batch_interval = 3
sc = SparkContext()
ssc = StreamingContext(sc, batch_interval)
def extract_fields(tweet):
j = json.loads(tweet)
return j["text"], j["source"]
lines = ssc.textFileStream(input_path).map(extract_fields)
# Print the raw dstream
lines.pprint()
ssc.start()
ssc.awaitTermination()
Run streaming application
$ /opt/mapr/spark/spark-2.2.1/bin/spark-submit spark_streaming_tweets_file_source.py
View the latest script here.
https://github.com/abulbasar/pyspark-examples/blob/master/twitter_stream_api.py