Spark Kinesis

Build.sbt

name := "KinesisStreamSpark"

version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(

"org.apache.spark" % "spark-streaming-kinesis-asl_2.11" % "2.3.0",

"org.apache.spark" % "spark-sql_2.11" % "2.3.0",

"com.amazonaws" % "aws-java-sdk" % "1.11.330"

)



Kinesis App

import java.nio.charset.StandardCharsets


import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.kinesis.{KinesisInitialPositions, KinesisInputDStream}

import org.apache.spark.streaming.{Durations, StreamingContext}

import org.apache.spark.{SparkConf, SparkContext}


object KinesisApp {


def main(args: Array[String]): Unit = {


val conf: SparkConf = new SparkConf()

.setAppName(getClass.getName)

.setIfMissing("spark.master", "local[*]")


val sc: SparkContext = new SparkContext(conf)

val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))


val kinesisStream = KinesisInputDStream

.builder

.streamingContext(ssc)

.endpointUrl("kinesis.us-west-2.amazonaws.com") //https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region

.regionName("us-west-2")

.streamName("credit_card_transactions")

.initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(InitialPositionInStream.LATEST))

.checkpointAppName("MyKinesisApp")

.checkpointInterval(Durations.seconds(10))

.storageLevel(StorageLevel.MEMORY_AND_DISK)

.build


kinesisStream

.map(bytes => new String(bytes, StandardCharsets.UTF_8))

.print()



ssc.start()

ssc.awaitTermination()

}


}