ML Using SparkR

Load Spark package

```{r}

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {

Sys.setenv(SPARK_HOME = "/usr/lib/spark-2.1.1-bin-hadoop2.7")

}

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g", spark.sql.shuffle.partitions = "5"))

```

Load data

```{r}

stocks = read.df("stocks", "csv", header="true", inferSchema = "true")

createOrReplaceTempView(stocks, "stocks")

str(stocks)

```

Class of stocks

```{r}

class(stocks)

```

Cache table stocks

```{r}

cacheTable("stocks")

```

Check size of the stocks dataframe in memory. As you can see the in R session the memory footprint of stocks object is segligibly small.

```{r}

object.size(stocks)

```

Dimension of the stocks dataframe

```{r}

dim(stocks)

```

```{r}

head(stocks)

```

Find frequency per stock symbol. We are going to perform this in three different ways.

Way 1 - using dplyr like functions from SparkR

```{r}

stocks2016 = filter(stocks, year(stocks$date) == 2016)

head(summarize(groupBy(stocks2016, stocks2016$symbol), avg.volume = mean(stocks2016$volume)))

```

Way 2: using sql statement

```{r}

head(sql("select symbol, avg(volume) from stocks where year(date) = 2016 group by symbol"))

```

Way3: using SparkR's special gapplyCollect function. Note this it takes about 10 mins to finish, if you running it on your laptop.

```{r}

gapplyCollect(stocks, "symbol", function(k, x){

y = data.frame(k, mean(x$volume))

colnames(y) = c("symbol", "avg_vol")

y

})

```

Loading dataset for Machine Learning

Load power plan dataset

```{r}

pp = read.df("powerplant", "csv", inferSchema = "true", header = "true")

str(pp)

```

Split the data into training and test set.

```{r}

pp.train = sample(pp, withReplacement = FALSE, fraction = 0.7)

pp.test = sample(pp, withReplacement = FALSE, fraction = 0.3)

cat("dim(pp.train): ", dim(pp.train))

cat("\ndim(pp.test): ", dim(pp.test))

cache(pp)

```

Train a model using glm

```{r}

fit1 =spark.glm(pp.train, PE ~ ., family = "gaussian", regParam = 0.1)

summary(fit1)

```

Make prediction

```{r}

pred = predict(fit1, pp.test)

showDF(pred, 10)

```

Create a temp table on the pred dataset to calculate rmse.

```{r}

createOrReplaceTempView(pred, "pred")

showDF(sql("select sqrt(avg(pow((label - prediction), 2))) as rmse from pred"))

```

Now train the model using Gradient Boosting.

```{r}

fit2 =spark.gbt(pp.train, PE ~ ., type = "regression")

pred = predict(fit2, pp.test)

showDF(pred, 10)

```

Compute the RMSE in the same way and the compare the result with that of glm model.

```{r}

createOrReplaceTempView(pred, "pred")

showDF(sql("select sqrt(avg(pow((label - prediction), 2))) as rmse from pred"))

```