PIO Client For EventServer of PowerPlant

Create a project directory

$ mkdir PowerPlanEventServerClient

$ cd PowerPlanEventServerClient

Create build.sbt with the following

name := "PowerPlanEventServerClient"
version := "1.0"
scalaVersion := "2.10.6"
mainClass in Compile := Some("PowerEventIngester")
libraryDependencies ++= Seq(
    "io.prediction"                     % "client"              % "0.9.5",
    "joda-time"                         % "joda-time"           % "2.9.6",
    "org.joda"                          % "joda-convert"        % "1.7" 
)

Create directory for scala source

$ mkdir -p src/main/scala

Create a scala file src/main/scala/PowerEventIngester.scala

import java.util.LinkedList;
import io.prediction.EventClient;
import org.joda.time.DateTime;
import collection.JavaConversions._
import io.prediction.Event;
import scala.io.Source
object PowerEventIngester{
  def main(args:Array[String]) {
    if(args.size != 2){
        println("Usage: PowerEventIngester <pio app accesskey> <input file of power plant data>")
        sys.exit(0)
    }
    val accessKey = args(0)
    val fileName = args(1)
    val client = new EventClient(accessKey);
    var i = 0
    val eventObjects = for(line <- Source.fromFile(fileName).getLines()) yield {
      val tokens  = line.split(",")
      val properties = new java.util.HashMap[String, Object]()
      properties += "AT" -> tokens(0)
      properties += "V"  -> tokens(1)
      properties += "AP" -> tokens(2)
      properties += "RH" -> tokens(3)
      properties += "PE" -> tokens(4)
      val event = new Event()
        .event("$set")
        .entityType("iot")
        .entityId(i.toString())
        .properties(properties)
        .eventTime(new DateTime())
      i += 1
      event
    }
    for((group, index) <- eventObjects.toList.grouped(50).zipWithIndex){
      val events = new LinkedList[Event]();
      group.foreach(e => events.add(e))
      client.createEvents(events);
      println(s"Batch: $index, Size of batch: ${group.size}")
    }
    client.close()
  }
}

View the list of pio application and set ACCESS_KEY environment variable for the application.

$ pio app list
$ ACCESS_KEY=<access key for the pio app>

Compile and run. Make sure header is removed from the data file.

$ sbt "run $ACCESS_KEY /wd/data/Combined_Cycle_Power_Plant/Folds5x2_pp_1.csv"