MapR Stream to MaprDB
Goal: using spark streaming save the data from a file streaming source to a maprdb table.
Environment: Mapr Sandbox 6.0.1, programming language: python
Create a source
Here, we will use a json file dump as a source to simulate a twitter streaming. If you want a sample to stream directly from twitter using streaming api, please refer to this blog.
Use the following command to read up to 10 lines randomly from tweeet.small.json and save them in a new file under /user/mapr/tweets_raw/ which is used a file streaming source. The new files will picked up by Spark streaming application.
$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json
Create a table in Mapr DB
$ mapr dbshell
maprdb mapr:> create /tables/tweets
Build the spark streaming Job
Below is the script for spark streaming code, saved it as ~/structured_streaming_file_to_maprdb.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
"""
Start the file source:
$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json
Above command randomly selects upti 10 lines from the ~/tweets.small.json and save them in a new file
under /user/mapr/tweets_raw/
The tweets.small.json can be found in
https://raw.githubusercontent.com/abulbasar/data/master/tweets.small.json
"""
# Create spark session
spark = (SparkSession
.builder
.appName("StructuredStreamingWithFileSourceMaprDBSink")
.getOrCreate())
input_path = "file:///user/mapr/tweets_raw"
# Schema of the input messages
schema = StructType([
StructField("id", LongType(), True),
StructField("lang", StringType(), True),
StructField("text", StringType(), True),
StructField("source", StringType(), True),
StructField("created_at", StringType(), True)
])
#File Source: json input data
raw = spark.readStream.format("json").schema(schema).load(input_path)
# Transformation
transformed = (df#.orderBy("id")
.withColumn("id", expr("cast(id as string)"))
.withColumnRenamed("id", "_id"))
checkpoint_path = "/user/mapr/checkpoint"
maprdb_table = "/tables/tweets"
# Send the stream to MaprDB sink
(transformed
.writeStream
.outputMode("append")
.format("com.mapr.db.spark.streaming")
.option("checkpointLocation", checkpoint_dir)
.option("tablePath", maprdb_table)
#.option("idFieldPath", "_id")
#.option("createTable", True)
.option("bulkMode", True)
.option("sampleSize", 1000)
).start()
# Wait for any sink to terminate to terminate the program
spark.streams.awaitAnyTermination()
Run the spark streaming application
Create the input directory and run the script
$ mkdir -p /user/mapr/tweets_raw
$ /opt/mapr/spark/spark-2.2.1/bin/spark-submit --verbose ~/structured_streaming_file_to_maprdb.py
The screen usually stays silent as streaming happens. If you terminate the shell, the streaming job is also terminated. If there is any error, that will show up.
Put sample new files in the streaming source
Open another terminal to drop a new file. You may it a few times.
$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json
Verify data in Mapr DB
In maprdb shell. verify that you see some data.
maprdb mapr:> find /tables/tweets --limit 10
{"_id":"843439457042862080","created_at":"Sun Mar 19 12:30:11 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"#Blockchain: A new hope, or just hype? https://t.co/LgXl3xvzY9"}
{"_id":"843439586340564992","created_at":"Sun Mar 19 12:30:42 +0000 2017","lang":"en","source":"<a href=\"http://www.allformz.com/\" rel=\"nofollow\">AllforrmZ</a>","text":"New post: Proof of Work vs Proof of Stake: Basic Mining Guide https://t.co/7lY2NKfpSY #blockchain https://t.co/pU7WFNyyGu"}
{"_id":"843439663930986496","created_at":"Sun Mar 19 12:31:00 +0000 2017","lang":"en","source":"<a href=\"http://www.ajaymatharu.com/\" rel=\"nofollow\">Tweet Old Post</a>","text":"EgyptAir crash: Explosives traces found https://t.co/NPuMxtWHXQ Happening Now #PresidentElectTrump #MAGA #privacy"}
{"_id":"843439903417548800","created_at":"Sun Mar 19 12:31:57 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@Ayaan @richyh5712 I don't have any guilt in fact I'm very embarrassed by weak white liberals...especially the soppy cunts with beards"}
{"_id":"843439975710494720","created_at":"Sun Mar 19 12:32:15 +0000 2017","lang":"en","source":"<a href=\"https://ifttt.com\" rel=\"nofollow\">IFTTT</a>","text":"Machines replacing workers: fed budget to look at historic economic challenge https://t.co/39ymHGbYBn #robots #ai #ml #ar #robot #drone #r…"}
{"_id":"843440003002728449","created_at":"Sun Mar 19 12:32:21 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@realDonaldTrump another con job by this despicable mother fucker #PutinsPuppet https://t.co/M9Plz5W76A"}
{"_id":"843440034124652545","created_at":"Sun Mar 19 12:32:29 +0000 2017","lang":"en","source":"<a href=\"http://paper.li\" rel=\"nofollow\">Paper.li</a>","text":"The latest The Strategic Leadership Daily! https://t.co/NmCj5BQWA5 Thanks to @garybcohen #leadership #ai"}
{"_id":"843440235333718017","created_at":"Sun Mar 19 12:33:17 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@nytimes Trump and nepotism? You shock me! This is so out of character...no wait, scratch that"}
{"_id":"843440266879156224","created_at":"Sun Mar 19 12:33:24 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@realDonaldTrump you clearly have no idea how NATO funding is structured, you're ruining our countries place in the world."}
{"_id":"843440300227936260","created_at":"Sun Mar 19 12:33:32 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>","text":"@nytimes I don't care if this sounds shallow...but, quite possibly the ugliest power couple in a while. Put the pics in a spoiler FFS"}
10 document(s) found.
maprdb mapr:>
Latest code is maintained here
https://github.com/abulbasar/pyspark-examples/blob/master/structured_streaming_file_to_maprdb.py