Spark SQL over REST API

Objectives:

  • Primary goal is to submit a SQL query to Spark cluster and get response of collection objects as json with low latency (single digit seconds).
  • The service is exposed as REST service. This REST api can be consume in any BI service
  • Spark over Thrift service is a cleaner solution, but due security concerns around impersonating queries, Thrift service may not be viable solution in security enabled Hive ecosystem.
  • This solution be launched on any spark cluster (over yarn or standalone).
  • It requires further work to make this REST service secure (authentication and authorization)
  • Other alternative solutions
    • Thrift service
    • http://sparkjava.com/
    • http://arturmkrtchyan.com/apache-spark-hidden-rest-api

Create a scala project with your favourite IDE.

One notable dependency is on finagle - an open source high performance REST api library for scala developed at Twitter used at twitter.com. This package is used to create the REST service.

name := "DataService"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(

"com.twitter" %% "finagle-http" % "6.35.0",

"org.apache.spark" %% "spark-core" % "1.6.1",

"org.apache.spark" %% "spark-sql" % "1.6.1",

"log4j" % "log4j" % "1.2.14"

)

Create a scala object

import com.twitter.finagle.{Http, Service}
import com.twitter.finagle.http.{Request, Response}
import com.twitter.util.{Await, Future}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.log4j.Logger
object SparkService {
  lazy val log = Logger.getLogger(SparkService.getClass.getName)
  val conf = new SparkConf()
    .setAppName("Reporting Service")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  def sql(statement: String) = {
    val rs = sqlContext.sql(statement).toJSON.collect
    log.info("Length of the record set: %s".format(rs.length))
    rs
  }
  def initHTTPServer(): Unit ={
    val service = new Service[Request, Response] {
      def apply(request: Request): Future[Response] = {
        val query = request.getParam("query")
        log.info("Incoming query: %s".format(query))
        val response = Response()
        response.setContentString(sql(query).mkString("\n"))
        Future.value(response)
      }
    }
    val server = Http.serve(":8080", service)
    Await.ready(server)
  }

initHTTPServer()


}

Export the project as .jar. While creating jar, do not include the dependencies - those are not necessary.

Now submit the jar on the cluster. Submit using "client" mode so that driver machines get update from the running jobs. You do not have to include packages for Spark and log4j. Those are already available in classpath of the cluster. In the example below, the spark code has been submitted to standalone cluster.

$ bin/spark-submit --verbose --master spark://server01:7077 --deploy-mode client --class SparkService --packages com.twitter:finagle-http-compat_2.10:6.35.0  /Volumes/SONY/workspace/IntelliJ/DataService/out/artifacts/DataService/DataService.jar

Now you are ready to send SQL queries using REST call. The rest api service will be available on port 8080 of the machine on which spark application is submitted. Using postman or curl, submit the following GET request and see the output.

http://localhost:8080/?query=CREATE+TEMPORARY+TABLE+people+USING+json+OPTIONS+%28path+%22file%3A%2F%2F%2FVolumes%2FSONY%2FData%2Fpeople.json%22%29

http://localhost:8080/?query=CACHE+TABLE+people

http://localhost:8080/?query=SELECT+gender%2C+COUNT%28%2A%29+FROM+people+GROUP+BY+gender