Mapr Streaming


Example of MapR streaming with java consumer and producer



[Following example has been carried out in MapR Sandbox version 6.0.1]

Create a stream. Stream is a collection topics in MapR

$ maprcli stream create -path /user/mapr/streams

Help document on create stream command

$ maprcli stream create

stream create

-path Stream Path

[ -ttl Time to live in seconds. default:604800 ]

[ -autocreate Auto create topics. default:true ]

[ -defaultpartitions Default partitions per topic. default:1 ]

[ -compression off|lz4|lzf|zlib. default:inherit from parent directory ]

[ -produceperm Producer access control expression. default u:creator ]

[ -consumeperm Consumer access control expression. default u:creator ]

[ -topicperm Topic CRUD access control expression. default u:creator ]

[ -copyperm Stream copy access control expression. default u:creator ]

[ -adminperm Stream administration access control expression. default u:creator ]

[ -copymetafrom Stream to copy attributes from. default:none ]

[ -ischangelog Stream to store changelog. default:false ]

[ -defaulttimestamptype timestamp type: createtime | logappendtime. default: createtime ]


Set permission on the stream. Open up the permission to all users.

$ maprcli stream edit -path /user/mapr/streams -produceperm p -consumeperm p -topicperm p


Create two topics - one for raw stream and other for enriched stream.

  • raw - this will store the raw messages

  • enriched - this will store the summary on the streaming data

$ maprcli stream topic create -path /user/mapr/streams -topic raw

$ maprcli stream topic create -path /user/mapr/streams -topic summary


Help document on create topic command

$ maprcli stream topic create

-path Stream Path

-topic Topic Name

[ -partitions Number of partitions. default: attribute defaultpartitions on the stream ]

[ -timestamptype Timestamp type: createtime | logappendtime default: createtime ]



Verify the topics created

$ maprcli stream topic list -path /user/mapr/streams/

partitions maxlag logicalsize topic consumers physicalsize

1 0 0 raw 0 0

1 0 0 enriched 0 0


If you do not have maven to compile the java code, run the following commands to install maven.

$ sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo

$ sudo yum install -y apache-maven

$ sudo yum install -y git

$ export JAVA_HOME=/usr/lib/jvm/java-1.8.0/

$ mvn -version


Download the java application from github. Start the consumer and producer in two terminal and see the messages.

$ mkdir ~/workspaces

$ cd ~/workspaces

$ git clone https://github.com/abulbasar/MaprStreamingExample.git

$ cd MaprStreamingExample

$ mvn clean package



Start the consumer

$ cd ~/workspaces/MaprStreamingExample

$ java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run consumer

Open another terminal and start the producer

$ cd ~/workspaces/MaprStreamingExample

$ java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run producer


Create a maprDB JSON table

$ maprcli table create -path /tables/raw -tabletype json


Start DB Consumer

$ java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run dbconsumer


Verify data in MaprDB

$ mapr dbshell

maprdb mapr:> list /tables

/tables/raw

1 table(s) found.

maprdb mapr:> find /tables/raw --limit 10

{"_id":"0","k":{"$numberLong":0},"t":53669.809,"type":"test"}

{"_id":"1","k":{"$numberLong":1},"t":53672.055,"type":"test"}

{"_id":"10","k":{"$numberLong":10},"t":53672.056,"type":"test"}

{"_id":"100","k":{"$numberLong":100},"t":53672.07,"type":"test"}

{"_id":"1000","k":{"$numberLong":1000},"t":53672.122,"type":"test"}

{"_id":"10000","k":{"$numberLong":10000},"t":53672.567,"type":"test"}

{"_id":"100000","k":{"$numberLong":100000},"t":53674.397,"type":"test"}

{"_id":"100001","k":{"$numberLong":100001},"t":53674.408,"type":"test"}

{"_id":"100002","k":{"$numberLong":100002},"t":53674.408,"type":"test"}

{"_id":"100003","k":{"$numberLong":100003},"t":53674.408,"type":"test"}

10 document(s) found.

maprdb mapr:>


Find info on streams

$ maprcli stream topic info -path /user/mapr/streams -topic raw -json

{

"timestamp":1530231866316,

"timeofday":"2018-06-28 05:24:26.316 GMT-0700 PM",

"status":"OK",

"total":1,

"data":[

{

"partitionid":0,

"physicalsize":82575360,

"logicalsize":374898688,

"maxoffset":6758753,

"minoffsetacrossconsumers":1529285,

"mintimestamp":"2018-06-28T04:35:31.258-0700 PM",

"maxtimestamp":"2018-06-28T05:02:38.130-0700 PM",

"mintimestampacrossconsumers":"2018-06-28T04:40:53.770-0700 PM",

"fid":"2306.32.131194",

"master":"maprdemo:5660",

"servers":"maprdemo:5660",

"timestamptype":"CreateTime"

}

]

}


Query the data in MapR-DB using Drill

$ export DRILL_HOME=/opt/mapr/drill/drill-1.11.0

$ cd $DRILL_HOME/jars/3rdparty

$ wget https://repository.mapr.com/nexus/content/groups/mapr-public/org/ojai/ojai/2.0.1-mapr-1804/ojai-2.0.1-mapr-1804.jar --no-check-certificate

$ $DRILL_HOME/bin/drill-embedded


0: jdbc:drill:zk=local> select * from dfs.root.`/tables/raw` limit 10;

+---------+---------+----------+-------+

| _id | k | t | type |

+---------+---------+----------+-------+

| 0 | 0 | 518.308 | test |

| 1 | 1 | 520.653 | test |

| 10 | 10 | 520.654 | test |

| 100 | 100 | 520.662 | test |

| 1000 | 1000 | 520.737 | test |

| 10000 | 10000 | 521.237 | test |

| 100000 | 100000 | 523.458 | test |

| 100001 | 100001 | 523.469 | test |

| 100002 | 100002 | 523.469 | test |

| 100003 | 100003 | 523.469 | test |

+---------+---------+----------+-------+


Process the stream using pySpark

Save the following spark code as structured_streaming_kafka.py

from pyspark.sql import SparkSession

from pyspark.sql.functions import explode

from pyspark.sql.functions import split


"""

Launch spark application

$ /usr/lib/spark-2.2.0-bin-hadoop2.7/bin/spark-submit \

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.xerial.snappy:snappy-java:1.1.4 \

structured_streaming_kafka.py

"""

spark = (SparkSession

.builder

.appName("StructuredStreamingWithKafka")

.getOrCreate())


topics = "/user/mapr/streams:raw"


# Source: subscribe to 1 topic in Kafka

raw = (spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "localhost:9092")

.option("subscribe", topics)

.option("startingOffsets", "earliest")

.option("maxOffsetsPerTrigger", "100")

.load())


raw.printSchema()


# Sink: defined console sink

query = (raw

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")

.writeStream

.format("console")

.option("truncate", False)

.option("numRows", 10)

.start())


spark.streams.awaitAnyTermination()



Submit the job

$ /opt/mapr/spark/spark-*/bin/spark-submit --verbose --master local[2] structured_streaming_kafka.py

For the storing the output of Spark streaming job to MaprDB, following this article.

https://blog.einext.com/mapr-xd/maprstream-to-maprdb


Clean up

Drop the streams and mapr DB table

$ maprcli stream delete -path /user/mapr/streams

$ maprcli table delete -path /tables/raw