Spark Kinesis
Build.sbt
name := "KinesisStreamSpark"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming-kinesis-asl_2.11" % "2.3.0",
"org.apache.spark" % "spark-sql_2.11" % "2.3.0",
"com.amazonaws" % "aws-java-sdk" % "1.11.330"
)
Kinesis App
import java.nio.charset.StandardCharsets
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.{KinesisInitialPositions, KinesisInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object KinesisApp {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName(getClass.getName)
.setIfMissing("spark.master", "local[*]")
val sc: SparkContext = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc, Durations.seconds(5))
val kinesisStream = KinesisInputDStream
.builder
.streamingContext(ssc)
.endpointUrl("kinesis.us-west-2.amazonaws.com") //https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
.regionName("us-west-2")
.streamName("credit_card_transactions")
.initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(InitialPositionInStream.LATEST))
.checkpointAppName("MyKinesisApp")
.checkpointInterval(Durations.seconds(10))
.storageLevel(StorageLevel.MEMORY_AND_DISK)
.build
kinesisStream
.map(bytes => new String(bytes, StandardCharsets.UTF_8))
.print()
ssc.start()
ssc.awaitTermination()
}
}