MapR Stream to MaprDB



Goal: using spark streaming save the data from a file streaming source to a maprdb table.

Environment: Mapr Sandbox 6.0.1, programming language: python


Create a source

Here, we will use a json file dump as a source to simulate a twitter streaming. If you want a sample to stream directly from twitter using streaming api, please refer to this blog.

Use the following command to read up to 10 lines randomly from tweeet.small.json and save them in a new file under /user/mapr/tweets_raw/ which is used a file streaming source. The new files will picked up by Spark streaming application.

$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json

Create a table in Mapr DB

$ mapr dbshell

maprdb mapr:> create /tables/tweets


Build the spark streaming Job

Below is the script for spark streaming code, saved it as ~/structured_streaming_file_to_maprdb.py

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *


"""

Start the file source:

$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json


Above command randomly selects upti 10 lines from the ~/tweets.small.json and save them in a new file

under /user/mapr/tweets_raw/


The tweets.small.json can be found in

https://raw.githubusercontent.com/abulbasar/data/master/tweets.small.json


"""


# Create spark session

spark = (SparkSession

.builder

.appName("StructuredStreamingWithFileSourceMaprDBSink")

.getOrCreate())


input_path = "file:///user/mapr/tweets_raw"


# Schema of the input messages

schema = StructType([

StructField("id", LongType(), True),

StructField("lang", StringType(), True),

StructField("text", StringType(), True),

StructField("source", StringType(), True),

StructField("created_at", StringType(), True)

])


#File Source: json input data

raw = spark.readStream.format("json").schema(schema).load(input_path)


# Transformation

transformed = (df#.orderBy("id")

.withColumn("id", expr("cast(id as string)"))

.withColumnRenamed("id", "_id"))


checkpoint_path = "/user/mapr/checkpoint"

maprdb_table = "/tables/tweets"

# Send the stream to MaprDB sink

(transformed

.writeStream

.outputMode("append")

.format("com.mapr.db.spark.streaming")

.option("checkpointLocation", checkpoint_dir)

.option("tablePath", maprdb_table)

#.option("idFieldPath", "_id")

#.option("createTable", True)

.option("bulkMode", True)

.option("sampleSize", 1000)

).start()


# Wait for any sink to terminate to terminate the program

spark.streams.awaitAnyTermination()


Run the spark streaming application

Create the input directory and run the script

$ mkdir -p /user/mapr/tweets_raw

$ /opt/mapr/spark/spark-2.2.1/bin/spark-submit --verbose ~/structured_streaming_file_to_maprdb.py

The screen usually stays silent as streaming happens. If you terminate the shell, the streaming job is also terminated. If there is any error, that will show up.


Put sample new files in the streaming source

Open another terminal to drop a new file. You may it a few times.

$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json

Verify data in Mapr DB

In maprdb shell. verify that you see some data.

maprdb mapr:> find /tables/tweets --limit 10

{"_id":"843439457042862080","created_at":"Sun Mar 19 12:30:11 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"#Blockchain: A new hope, or just hype? https://t.co/LgXl3xvzY9"}

{"_id":"843439586340564992","created_at":"Sun Mar 19 12:30:42 +0000 2017","lang":"en","source":"<a href=\"http://www.allformz.com/\" rel=\"nofollow\">AllforrmZ</a>","text":"New post: Proof of Work vs Proof of Stake: Basic Mining Guide https://t.co/7lY2NKfpSY #blockchain https://t.co/pU7WFNyyGu"}

{"_id":"843439663930986496","created_at":"Sun Mar 19 12:31:00 +0000 2017","lang":"en","source":"<a href=\"http://www.ajaymatharu.com/\" rel=\"nofollow\">Tweet Old Post</a>","text":"EgyptAir crash: Explosives traces found https://t.co/NPuMxtWHXQ Happening Now #PresidentElectTrump #MAGA #privacy"}

{"_id":"843439903417548800","created_at":"Sun Mar 19 12:31:57 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@Ayaan @richyh5712 I don't have any guilt in fact I'm very embarrassed by weak white liberals...especially the soppy cunts with beards"}

{"_id":"843439975710494720","created_at":"Sun Mar 19 12:32:15 +0000 2017","lang":"en","source":"<a href=\"https://ifttt.com\" rel=\"nofollow\">IFTTT</a>","text":"Machines replacing workers: fed budget to look at historic economic challenge https://t.co/39ymHGbYBn #robots #ai #ml #ar #robot #drone #r…"}

{"_id":"843440003002728449","created_at":"Sun Mar 19 12:32:21 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@realDonaldTrump another con job by this despicable mother fucker #PutinsPuppet https://t.co/M9Plz5W76A"}

{"_id":"843440034124652545","created_at":"Sun Mar 19 12:32:29 +0000 2017","lang":"en","source":"<a href=\"http://paper.li\" rel=\"nofollow\">Paper.li</a>","text":"The latest The Strategic Leadership Daily! https://t.co/NmCj5BQWA5 Thanks to @garybcohen #leadership #ai"}

{"_id":"843440235333718017","created_at":"Sun Mar 19 12:33:17 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@nytimes Trump and nepotism? You shock me! This is so out of character...no wait, scratch that"}

{"_id":"843440266879156224","created_at":"Sun Mar 19 12:33:24 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@realDonaldTrump you clearly have no idea how NATO funding is structured, you're ruining our countries place in the world."}

{"_id":"843440300227936260","created_at":"Sun Mar 19 12:33:32 +0000 2017","lang":"en","source":"<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>","text":"@nytimes I don't care if this sounds shallow...but, quite possibly the ugliest power couple in a while. Put the pics in a spoiler FFS"}

10 document(s) found.

maprdb mapr:>


Latest code is maintained here

https://github.com/abulbasar/pyspark-examples/blob/master/structured_streaming_file_to_maprdb.py