Twitter live streaming (python)


Create a script for streaming twitter, stream_twitter.py

#!/usr/bin/env python


"""

Pre-requisites:

For this application we are using tweepy. Install it if required.

$ pip install tweepy


Sign up for an app in https://apps.twitter.com/

"""


import tweepy

import json


# Retrieve the consumer keys and secrets from apps.twitter.com

consumer_key = "6IqHdMv..."

consumer_secret = "uvgQG..."


access_token = "15101127-AY4CW..."

access_token_secret = "DEPgC5Ue..."


data_dir = "/home/mapr/workspaces/tweets/"

# Create the above directory if not exists


auth = tweepy.OAuthHandler(consumer_key, consumer_secret)

auth.set_access_token(access_token, access_token_secret)


api = tweepy.API(auth)


class MyStreamListener(tweepy.StreamListener):

def on_status(self, status):

print(status.user.name + ": " + status.text)

message = json.dumps(status._json).encode("utf-8")

with open(data_dir + str(status.id) + ".json", "wb") as f:

f.write(message)

def on_error(self, status_code):

print("Error code: ", status_code)

myStreamListener = MyStreamListener()

myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)


search_terms = ["trump"]

myStream.filter(track=search_terms)


Run the script

$ mkdir tweets

$ python stream_twitter.py

View the output.

$ ls -1 tweets/ | head

1013899705682546689.json

1013899855318650880.json

1013899931181076480.json

1013899931512238082.json

1013899931541737472.json

1013899931952787456.json

1013899932372172805.json

1013899932426817536.json

1013899932565155841.json

1013899932867223586.json

...

Note: this application create a lot of small files. In a small time, it will create a of files. It is not a scalable solution. A better solution is to push the twitter messages to Kafka.


Process the tweets with Spark streaming

Save the following code as spark_streaming_tweets_file_source.py

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.storagelevel import StorageLevel

import json


input_path = "file:///home/mapr/workspaces/tweets/"

batch_interval = 3


sc = SparkContext()

ssc = StreamingContext(sc, batch_interval)


def extract_fields(tweet):

j = json.loads(tweet)

return j["text"], j["source"]


lines = ssc.textFileStream(input_path).map(extract_fields)


# Print the raw dstream

lines.pprint()


ssc.start()

ssc.awaitTermination()

Run streaming application

$ /opt/mapr/spark/spark-2.2.1/bin/spark-submit spark_streaming_tweets_file_source.py


View the latest script here.

https://github.com/abulbasar/pyspark-examples/blob/master/twitter_stream_api.py