Spark Kinesis

Build.sbt

name := "KinesisStreamSpark"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming-kinesis-asl_2.11" % "2.3.0",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.0",
  "com.amazonaws" % "aws-java-sdk" % "1.11.330"
)



Kinesis App

import java.nio.charset.StandardCharsets

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.{KinesisInitialPositions, KinesisInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object KinesisApp {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setAppName(getClass.getName)
      .setIfMissing("spark.master", "local[*]")

    val sc: SparkContext = new SparkContext(conf)
    val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))

    val kinesisStream = KinesisInputDStream
      .builder
      .streamingContext(ssc)
      .endpointUrl("kinesis.us-west-2.amazonaws.com") //https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
      .regionName("us-west-2")
      .streamName("credit_card_transactions")
      .initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(InitialPositionInStream.LATEST))
      .checkpointAppName("MyKinesisApp")
      .checkpointInterval(Durations.seconds(10))
      .storageLevel(StorageLevel.MEMORY_AND_DISK)
      .build

    kinesisStream
      .map(bytes => new String(bytes, StandardCharsets.UTF_8))
      .print()


    ssc.start()
    ssc.awaitTermination()
  }

}