Mapr Streaming


Example of MapR streaming with java consumer and producer



[Following example has been carried out in MapR Sandbox version 6.0.1]

Create a stream. Stream is a collection topics in MapR

$ maprcli stream create -path /user/mapr/streams 

Help document on create stream command

$ maprcli stream create
stream create
         -path Stream Path
        [ -ttl Time to live in seconds. default:604800 ]
        [ -autocreate Auto create topics. default:true ]
        [ -defaultpartitions Default partitions per topic. default:1 ]
        [ -compression off|lz4|lzf|zlib. default:inherit from parent directory ]
        [ -produceperm Producer access control expression. default u:creator ]
        [ -consumeperm Consumer access control expression. default u:creator ]
        [ -topicperm Topic CRUD access control expression. default u:creator ]
        [ -copyperm Stream copy access control expression. default u:creator ]
        [ -adminperm Stream administration access control expression. default u:creator ]
        [ -copymetafrom Stream to copy attributes from. default:none ]
        [ -ischangelog Stream to store changelog. default:false ]
        [ -defaulttimestamptype timestamp type: createtime | logappendtime. default: createtime ]


Set permission on the stream. Open up the permission to all users.

$ maprcli stream edit -path /user/mapr/streams -produceperm p -consumeperm p -topicperm p


Create two topics - one for raw stream and other for enriched stream.

  • raw - this will store the raw messages
  • enriched - this will store the summary on the streaming data
$ maprcli stream topic create -path /user/mapr/streams  -topic raw
$ maprcli stream topic create -path /user/mapr/streams  -topic summary


Help document on create topic command

$ maprcli stream topic create
         -path Stream Path
         -topic Topic Name
        [ -partitions Number of partitions. default: attribute defaultpartitions on the stream ]
        [ -timestamptype Timestamp type: createtime | logappendtime default: createtime ]


Verify the topics created

$ maprcli stream topic list -path /user/mapr/streams/
partitions  maxlag  logicalsize  topic     consumers  physicalsize
1           0       0            raw       0          0
1           0       0            enriched  0          0


If you do not have maven to compile the java code, run the following commands to install maven.

$ sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
$ sudo yum install -y apache-maven
$ export JAVA_HOME=/usr/lib/jvm/java-1.8.0/
$ mvn -version


Download the java application from github. Start the consumer and producer in two terminal and see the messages.

$ mkdir ~/workspaces
$ cd ~/workspaces
$ git clone https://github.com/abulbasar/MaprStreamingExample.git
$ cd MaprStreamingExample
$ mvn clean package



Start the consumer

$ cd ~/workspaces/MaprStreamingExample
$ java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run consumer

Open another terminal and start the producer

$ cd ~/workspaces/MaprStreamingExample
$ java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run producer


Create a maprDB JSON table

$ maprcli table create -path /tables/raw -tabletype json


Start DB Consumer

$  java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run dbconsumer


Verify data in MaprDB

$ mapr dbshell
maprdb mapr:> list /tables
/tables/raw
1 table(s) found.
maprdb mapr:> find /tables/raw --limit 10
{"_id":"0","k":{"$numberLong":0},"t":53669.809,"type":"test"}
{"_id":"1","k":{"$numberLong":1},"t":53672.055,"type":"test"}
{"_id":"10","k":{"$numberLong":10},"t":53672.056,"type":"test"}
{"_id":"100","k":{"$numberLong":100},"t":53672.07,"type":"test"}
{"_id":"1000","k":{"$numberLong":1000},"t":53672.122,"type":"test"}
{"_id":"10000","k":{"$numberLong":10000},"t":53672.567,"type":"test"}
{"_id":"100000","k":{"$numberLong":100000},"t":53674.397,"type":"test"}
{"_id":"100001","k":{"$numberLong":100001},"t":53674.408,"type":"test"}
{"_id":"100002","k":{"$numberLong":100002},"t":53674.408,"type":"test"}
{"_id":"100003","k":{"$numberLong":100003},"t":53674.408,"type":"test"}
10 document(s) found.
maprdb mapr:>


Find info on streams

$ maprcli stream topic info -path /user/mapr/streams -topic raw -json
{
        "timestamp":1530231866316,
        "timeofday":"2018-06-28 05:24:26.316 GMT-0700 PM",
        "status":"OK",
        "total":1,
        "data":[
                {
                        "partitionid":0,
                        "physicalsize":82575360,
                        "logicalsize":374898688,
                        "maxoffset":6758753,
                        "minoffsetacrossconsumers":1529285,
                        "mintimestamp":"2018-06-28T04:35:31.258-0700 PM",
                        "maxtimestamp":"2018-06-28T05:02:38.130-0700 PM",
                        "mintimestampacrossconsumers":"2018-06-28T04:40:53.770-0700 PM",
                        "fid":"2306.32.131194",
                        "master":"maprdemo:5660",
                        "servers":"maprdemo:5660",
                        "timestamptype":"CreateTime"
                }
        ]
}


Query the data in MapR-DB using Drill

$ export DRILL_HOME=/opt/mapr/drill/drill-1.11.0
$ cd $DRILL_HOME/jars/3rdparty
$ wget https://repository.mapr.com/nexus/content/groups/mapr-public/org/ojai/ojai/2.0.1-mapr-1804/ojai-2.0.1-mapr-1804.jar --no-check-certificate
$ $DRILL_HOME/bin/drill-embedded

0: jdbc:drill:zk=local> select * from dfs.root.`/tables/raw` limit 10;
+---------+---------+----------+-------+
|   _id   |    k    |    t     | type  |
+---------+---------+----------+-------+
| 0       | 0       | 518.308  | test  |
| 1       | 1       | 520.653  | test  |
| 10      | 10      | 520.654  | test  |
| 100     | 100     | 520.662  | test  |
| 1000    | 1000    | 520.737  | test  |
| 10000   | 10000   | 521.237  | test  |
| 100000  | 100000  | 523.458  | test  |
| 100001  | 100001  | 523.469  | test  |
| 100002  | 100002  | 523.469  | test  |
| 100003  | 100003  | 523.469  | test  |
+---------+---------+----------+-------+


Process the stream using pySpark

Save the following spark code as structured_streaming_kafka.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

"""
Launch spark application
$ /usr/lib/spark-2.2.0-bin-hadoop2.7/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.xerial.snappy:snappy-java:1.1.4 \
structured_streaming_kafka.py
"""
spark = (SparkSession
    .builder
    .appName("StructuredStreamingWithKafka")
    .getOrCreate())

topics = "/user/mapr/streams:raw"

# Source: subscribe to 1 topic in Kafka
raw = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", topics)
  .load())

# Sink: defined console sink
query = (raw
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .writeStream
    .format("console")
    .option("truncate", False)
    .option("numRows", 1000)
    .start())

spark.streams.awaitAnyTermination()


Submit the job

$ /opt/mapr/spark/spark-2.2.1/bin/spark-submit --verbose  structured_streaming_kafka.py

For the storing the output of Spark streaming job to MaprDB, following this article.

https://blog.einext.com/mapr-xd/maprstream-to-maprdb


Clean up

Drop the streams and mapr DB table

$ maprcli stream delete -path /user/mapr/streams
$ maprcli table delete -path /tables/raw