MapR Stream to MaprDB

Goal: using spark streaming save the data from a file streaming source to a maprdb table.

Environment: Mapr Sandbox 6.0.1, programming language: python

Create a source

Here, we will use a json file dump as a source to simulate a twitter streaming. If you want a sample to stream directly from twitter using streaming api, please refer to this blog.

Use the following command to read up to 10 lines randomly from tweeet.small.json and save them in a new file under /user/mapr/tweets_raw/ which is used a file streaming source. The new files will picked up by Spark streaming application.

$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json

Create a table in Mapr DB

$ mapr dbshell
maprdb mapr:> create /tables/tweets

Build the spark streaming Job

Below is the script for spark streaming code, saved it as ~/

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

Start the file source: 
$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json

Above command randomly selects upti 10 lines from the ~/tweets.small.json and save them in a new file
under /user/mapr/tweets_raw/

The tweets.small.json can be found in 


# Create spark session
spark = (SparkSession

input_path = "file:///user/mapr/tweets_raw"

# Schema of the input messages
schema = StructType([
    StructField("id", LongType(), True),
    StructField("lang", StringType(), True),
    StructField("text", StringType(), True),
    StructField("source", StringType(), True),
    StructField("created_at", StringType(), True)

#File Source: json input data
raw = spark.readStream.format("json").schema(schema).load(input_path)

# Transformation
transformed = (df#.orderBy("id")
          .withColumn("id", expr("cast(id as string)"))
          .withColumnRenamed("id", "_id"))

checkpoint_path = "/user/mapr/checkpoint"
maprdb_table = "/tables/tweets"
# Send the stream to MaprDB sink
  .option("checkpointLocation", checkpoint_dir)
  .option("tablePath", maprdb_table)
  #.option("idFieldPath", "_id")
  #.option("createTable", True)
  .option("bulkMode", True)
  .option("sampleSize", 1000)

# Wait for any sink to terminate to terminate the program

Run the spark streaming application

Create the input directory and run the script

$ mkdir -p /user/mapr/tweets_raw
$ /opt/mapr/spark/spark-2.2.1/bin/spark-submit --verbose ~/

The screen usually stays silent as streaming happens. If you terminate the shell, the streaming job is also terminated. If there is any error, that will show up.

Put sample new files in the streaming source

Open another terminal to drop a new file. You may it a few times.

$ shuf -n $(($RANDOM % 10)) ~/tweets.small.json > /user/mapr/tweets_raw/$(date +%s).json

Verify data in Mapr DB

In maprdb shell. verify that you see some data.

maprdb mapr:> find /tables/tweets --limit 10
{"_id":"843439457042862080","created_at":"Sun Mar 19 12:30:11 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone</a>","text":"#Blockchain: A new hope, or just hype?"}
{"_id":"843439586340564992","created_at":"Sun Mar 19 12:30:42 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">AllforrmZ</a>","text":"New post: Proof of Work vs Proof of Stake: Basic Mining Guide #blockchain"}
{"_id":"843439663930986496","created_at":"Sun Mar 19 12:31:00 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">Tweet Old Post</a>","text":"EgyptAir crash: Explosives traces found Happening Now #PresidentElectTrump #MAGA #privacy"}
{"_id":"843439903417548800","created_at":"Sun Mar 19 12:31:57 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@Ayaan @richyh5712 I don't have any guilt in fact I'm very embarrassed by weak white liberals...especially the soppy cunts with beards"}
{"_id":"843439975710494720","created_at":"Sun Mar 19 12:32:15 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">IFTTT</a>","text":"Machines replacing workers: fed budget to look at historic economic challenge #robots #ai #ml #ar #robot #drone #r…"}
{"_id":"843440003002728449","created_at":"Sun Mar 19 12:32:21 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@realDonaldTrump another con job by this despicable mother fucker #PutinsPuppet"}
{"_id":"843440034124652545","created_at":"Sun Mar 19 12:32:29 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\"></a>","text":"The latest The Strategic Leadership Daily! Thanks to @garybcohen #leadership #ai"}
{"_id":"843440235333718017","created_at":"Sun Mar 19 12:33:17 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@nytimes Trump and nepotism? You shock me! This is so out of wait, scratch that"}
{"_id":"843440266879156224","created_at":"Sun Mar 19 12:33:24 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone</a>","text":"@realDonaldTrump you clearly have no idea how NATO funding is structured, you're ruining our countries place in the world."}
{"_id":"843440300227936260","created_at":"Sun Mar 19 12:33:32 +0000 2017","lang":"en","source":"<a href=\"\" rel=\"nofollow\">Twitter for Android</a>","text":"@nytimes I don't care if this sounds shallow...but, quite possibly the ugliest power couple in a while. Put the pics in a spoiler FFS"}
10 document(s) found.
maprdb mapr:>

Latest code is maintained here