Mapr Streaming
Example of MapR streaming with java consumer and producer
[Following example has been carried out in MapR Sandbox version 6.0.1]
Create a stream. Stream is a collection topics in MapR
$ maprcli stream create -path /user/mapr/streams
Help document on create stream command
$ maprcli stream create
stream create
-path Stream Path
[ -ttl Time to live in seconds. default:604800 ]
[ -autocreate Auto create topics. default:true ]
[ -defaultpartitions Default partitions per topic. default:1 ]
[ -compression off|lz4|lzf|zlib. default:inherit from parent directory ]
[ -produceperm Producer access control expression. default u:creator ]
[ -consumeperm Consumer access control expression. default u:creator ]
[ -topicperm Topic CRUD access control expression. default u:creator ]
[ -copyperm Stream copy access control expression. default u:creator ]
[ -adminperm Stream administration access control expression. default u:creator ]
[ -copymetafrom Stream to copy attributes from. default:none ]
[ -ischangelog Stream to store changelog. default:false ]
[ -defaulttimestamptype timestamp type: createtime | logappendtime. default: createtime ]
Set permission on the stream. Open up the permission to all users.
$ maprcli stream edit -path /user/mapr/streams -produceperm p -consumeperm p -topicperm p
Create two topics - one for raw stream and other for enriched stream.
raw - this will store the raw messages
enriched - this will store the summary on the streaming data
$ maprcli stream topic create -path /user/mapr/streams -topic raw
$ maprcli stream topic create -path /user/mapr/streams -topic summary
Help document on create topic command
$ maprcli stream topic create
-path Stream Path
-topic Topic Name
[ -partitions Number of partitions. default: attribute defaultpartitions on the stream ]
[ -timestamptype Timestamp type: createtime | logappendtime default: createtime ]
Verify the topics created
$ maprcli stream topic list -path /user/mapr/streams/
partitions maxlag logicalsize topic consumers physicalsize
1 0 0 raw 0 0
1 0 0 enriched 0 0
If you do not have maven to compile the java code, run the following commands to install maven.
$ sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
$ sudo yum install -y apache-maven
$ sudo yum install -y git
$ export JAVA_HOME=/usr/lib/jvm/java-1.8.0/
$ mvn -version
Download the java application from github. Start the consumer and producer in two terminal and see the messages.
$ mkdir ~/workspaces
$ cd ~/workspaces
$ git clone https://github.com/abulbasar/MaprStreamingExample.git
$ cd MaprStreamingExample
$ mvn clean package
Start the consumer
$ cd ~/workspaces/MaprStreamingExample
$ java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run consumer
Open another terminal and start the producer
$ cd ~/workspaces/MaprStreamingExample
$ java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run producer
Create a maprDB JSON table
$ maprcli table create -path /tables/raw -tabletype json
Start DB Consumer
$ java -cp $(mapr classpath):target/mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run dbconsumer
Verify data in MaprDB
$ mapr dbshell
maprdb mapr:> list /tables
/tables/raw
1 table(s) found.
maprdb mapr:> find /tables/raw --limit 10
{"_id":"0","k":{"$numberLong":0},"t":53669.809,"type":"test"}
{"_id":"1","k":{"$numberLong":1},"t":53672.055,"type":"test"}
{"_id":"10","k":{"$numberLong":10},"t":53672.056,"type":"test"}
{"_id":"100","k":{"$numberLong":100},"t":53672.07,"type":"test"}
{"_id":"1000","k":{"$numberLong":1000},"t":53672.122,"type":"test"}
{"_id":"10000","k":{"$numberLong":10000},"t":53672.567,"type":"test"}
{"_id":"100000","k":{"$numberLong":100000},"t":53674.397,"type":"test"}
{"_id":"100001","k":{"$numberLong":100001},"t":53674.408,"type":"test"}
{"_id":"100002","k":{"$numberLong":100002},"t":53674.408,"type":"test"}
{"_id":"100003","k":{"$numberLong":100003},"t":53674.408,"type":"test"}
10 document(s) found.
maprdb mapr:>
Find info on streams
$ maprcli stream topic info -path /user/mapr/streams -topic raw -json
{
"timestamp":1530231866316,
"timeofday":"2018-06-28 05:24:26.316 GMT-0700 PM",
"status":"OK",
"total":1,
"data":[
{
"partitionid":0,
"physicalsize":82575360,
"logicalsize":374898688,
"maxoffset":6758753,
"minoffsetacrossconsumers":1529285,
"mintimestamp":"2018-06-28T04:35:31.258-0700 PM",
"maxtimestamp":"2018-06-28T05:02:38.130-0700 PM",
"mintimestampacrossconsumers":"2018-06-28T04:40:53.770-0700 PM",
"fid":"2306.32.131194",
"master":"maprdemo:5660",
"servers":"maprdemo:5660",
"timestamptype":"CreateTime"
}
]
}
Query the data in MapR-DB using Drill
$ export DRILL_HOME=/opt/mapr/drill/drill-1.11.0
$ cd $DRILL_HOME/jars/3rdparty
$ wget https://repository.mapr.com/nexus/content/groups/mapr-public/org/ojai/ojai/2.0.1-mapr-1804/ojai-2.0.1-mapr-1804.jar --no-check-certificate
$ $DRILL_HOME/bin/drill-embedded
0: jdbc:drill:zk=local> select * from dfs.root.`/tables/raw` limit 10;
+---------+---------+----------+-------+
| _id | k | t | type |
+---------+---------+----------+-------+
| 0 | 0 | 518.308 | test |
| 1 | 1 | 520.653 | test |
| 10 | 10 | 520.654 | test |
| 100 | 100 | 520.662 | test |
| 1000 | 1000 | 520.737 | test |
| 10000 | 10000 | 521.237 | test |
| 100000 | 100000 | 523.458 | test |
| 100001 | 100001 | 523.469 | test |
| 100002 | 100002 | 523.469 | test |
| 100003 | 100003 | 523.469 | test |
+---------+---------+----------+-------+
Process the stream using pySpark
Save the following spark code as structured_streaming_kafka.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
"""
Launch spark application
$ /usr/lib/spark-2.2.0-bin-hadoop2.7/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.xerial.snappy:snappy-java:1.1.4 \
structured_streaming_kafka.py
"""
spark = (SparkSession
.builder
.appName("StructuredStreamingWithKafka")
.getOrCreate())
topics = "/user/mapr/streams:raw"
# Source: subscribe to 1 topic in Kafka
raw = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "100")
.load())
raw.printSchema()
# Sink: defined console sink
query = (raw
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
.writeStream
.format("console")
.option("truncate", False)
.option("numRows", 10)
.start())
spark.streams.awaitAnyTermination()
Submit the job
$ /opt/mapr/spark/spark-*/bin/spark-submit --verbose --master local[2] structured_streaming_kafka.py
For the storing the output of Spark streaming job to MaprDB, following this article.
https://blog.einext.com/mapr-xd/maprstream-to-maprdb
Clean up
Drop the streams and mapr DB table
$ maprcli stream delete -path /user/mapr/streams
$ maprcli table delete -path /tables/raw