ML Using SparkR
Load Spark package
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/usr/lib/spark-2.1.1-bin-hadoop2.7")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g", spark.sql.shuffle.partitions = "5"))
Load data
stocks = read.df("stocks", "csv", header="true", inferSchema = "true")
createOrReplaceTempView(stocks, "stocks")
Class of stocks
Cache table stocks
Check size of the stocks dataframe in memory. As you can see the in R session the memory footprint of stocks object is segligibly small.
Dimension of the stocks dataframe
Find frequency per stock symbol. We are going to perform this in three different ways.
Way 1 - using dplyr like functions from SparkR
stocks2016 = filter(stocks, year(stocks$date) == 2016)
head(summarize(groupBy(stocks2016, stocks2016$symbol), avg.volume = mean(stocks2016$volume)))
Way 2: using sql statement
head(sql("select symbol, avg(volume) from stocks where year(date) = 2016 group by symbol"))
Way3: using SparkR's special gapplyCollect function. Note this it takes about 10 mins to finish, if you running it on your laptop.
gapplyCollect(stocks, "symbol", function(k, x){
y = data.frame(k, mean(x$volume))
colnames(y) = c("symbol", "avg_vol")
Loading dataset for Machine Learning
Load power plan dataset
pp = read.df("powerplant", "csv", inferSchema = "true", header = "true")
Split the data into training and test set.
pp.train = sample(pp, withReplacement = FALSE, fraction = 0.7)
pp.test = sample(pp, withReplacement = FALSE, fraction = 0.3)
cat("dim(pp.train): ", dim(pp.train))
cat("\ndim(pp.test): ", dim(pp.test))
Train a model using glm
fit1 =spark.glm(pp.train, PE ~ ., family = "gaussian", regParam = 0.1)
Make prediction
pred = predict(fit1, pp.test)
showDF(pred, 10)
Create a temp table on the pred dataset to calculate rmse.
createOrReplaceTempView(pred, "pred")
showDF(sql("select sqrt(avg(pow((label - prediction), 2))) as rmse from pred"))
Now train the model using Gradient Boosting.
fit2 =spark.gbt(pp.train, PE ~ ., type = "regression")
pred = predict(fit2, pp.test)
showDF(pred, 10)
Compute the RMSE in the same way and the compare the result with that of glm model.
createOrReplaceTempView(pred, "pred")
showDF(sql("select sqrt(avg(pow((label - prediction), 2))) as rmse from pred"))