Twitter live streaming (python)


Create a script for streaming twitter, stream_twitter.py

#!/usr/bin/env python

"""
Pre-requisites:
For this application we are using tweepy. Install it if required. 
$ pip install tweepy

Sign up for an app in https://apps.twitter.com/
"""

import tweepy
import json

# Retrieve the consumer keys and secrets from apps.twitter.com
consumer_key = "6IqHdMv..."
consumer_secret = "uvgQG..."

access_token = "15101127-AY4CW..."
access_token_secret = "DEPgC5Ue..."

data_dir = "/home/mapr/workspaces/tweets/"
# Create the above directory if not exists

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

class MyStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        print(status.user.name + ": " + status.text)
        message = json.dumps(status._json).encode("utf-8")
        with open(data_dir + str(status.id) + ".json", "wb") as f:
          f.write(message)
        
    def on_error(self, status_code):
        print("Error code: ", status_code)
        
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)

search_terms = ["trump"]
myStream.filter(track=search_terms)


Run the script

$ mkdir tweets
$ python stream_twitter.py

View the output.

$ ls -1 tweets/ | head
1013899705682546689.json
1013899855318650880.json
1013899931181076480.json
1013899931512238082.json
1013899931541737472.json
1013899931952787456.json
1013899932372172805.json
1013899932426817536.json
1013899932565155841.json
1013899932867223586.json
...

Note: this application create a lot of small files. In a small time, it will create a of files. It is not a scalable solution. A better solution is to push the twitter messages to Kafka.


Process the tweets with Spark streaming

Save the following code as spark_streaming_tweets_file_source.py

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.storagelevel import StorageLevel
import json

input_path = "file:///home/mapr/workspaces/tweets/"
batch_interval = 3

sc = SparkContext()
ssc = StreamingContext(sc, batch_interval)

def extract_fields(tweet):
        j = json.loads(tweet)
        return j["text"], j["source"]

lines = ssc.textFileStream(input_path).map(extract_fields)

# Print the raw dstream
lines.pprint()

ssc.start()
ssc.awaitTermination()

Run streaming application

$  /opt/mapr/spark/spark-2.2.1/bin/spark-submit spark_streaming_tweets_file_source.py


View the latest script here.

https://github.com/abulbasar/pyspark-examples/blob/master/twitter_stream_api.py