PIO Client For EventServer of PowerPlant
Create a project directory
$ mkdir PowerPlanEventServerClient
$ cd PowerPlanEventServerClient
Create build.sbt with the following
name := "PowerPlanEventServerClient"
version := "1.0"
scalaVersion := "2.10.6"
mainClass in Compile := Some("PowerEventIngester")
libraryDependencies ++= Seq(
"io.prediction" % "client" % "0.9.5",
"joda-time" % "joda-time" % "2.9.6",
"org.joda" % "joda-convert" % "1.7"
)
Create directory for scala source
$ mkdir -p src/main/scala
Create a scala file src/main/scala/PowerEventIngester.scala
import java.util.LinkedList;
import io.prediction.EventClient;
import org.joda.time.DateTime;
import collection.JavaConversions._
import io.prediction.Event;
import scala.io.Source
object PowerEventIngester{
def main(args:Array[String]) {
if(args.size != 2){
println("Usage: PowerEventIngester <pio app accesskey> <input file of power plant data>")
sys.exit(0)
}
val accessKey = args(0)
val fileName = args(1)
val client = new EventClient(accessKey);
var i = 0
val eventObjects = for(line <- Source.fromFile(fileName).getLines()) yield {
val tokens = line.split(",")
val properties = new java.util.HashMap[String, Object]()
properties += "AT" -> tokens(0)
properties += "V" -> tokens(1)
properties += "AP" -> tokens(2)
properties += "RH" -> tokens(3)
properties += "PE" -> tokens(4)
val event = new Event()
.event("$set")
.entityType("iot")
.entityId(i.toString())
.properties(properties)
.eventTime(new DateTime())
i += 1
event
}
for((group, index) <- eventObjects.toList.grouped(50).zipWithIndex){
val events = new LinkedList[Event]();
group.foreach(e => events.add(e))
client.createEvents(events);
println(s"Batch: $index, Size of batch: ${group.size}")
}
client.close()
}
}
View the list of pio application and set ACCESS_KEY environment variable for the application.
$ pio app list
$ ACCESS_KEY=<access key for the pio app>
Compile and run. Make sure header is removed from the data file.
$ sbt "run $ACCESS_KEY /wd/data/Combined_Cycle_Power_Plant/Folds5x2_pp_1.csv"