ML Using SparkR

Load Spark package

```{r}
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/usr/lib/spark-2.1.1-bin-hadoop2.7")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g", spark.sql.shuffle.partitions = "5"))
```

Load data

```{r}
stocks = read.df("stocks", "csv", header="true", inferSchema = "true")
createOrReplaceTempView(stocks, "stocks")
str(stocks)
```

Class of stocks

```{r}
class(stocks)
```

Cache table stocks

```{r}
cacheTable("stocks")
```

Check size of the stocks dataframe in memory. As you can see the in R session the memory footprint of stocks object is segligibly small.

```{r}
object.size(stocks)
```

Dimension of the stocks dataframe

```{r}
dim(stocks)
```
```{r}
head(stocks)
```

Find frequency per stock symbol. We are going to perform this in three different ways.

Way 1 - using dplyr like functions from SparkR

```{r}
stocks2016 = filter(stocks, year(stocks$date) == 2016)
head(summarize(groupBy(stocks2016, stocks2016$symbol), avg.volume = mean(stocks2016$volume)))
```

Way 2: using sql statement

```{r}
head(sql("select symbol, avg(volume) from stocks where year(date) = 2016 group by symbol"))
```

Way3: using SparkR's special gapplyCollect function. Note this it takes about 10 mins to finish, if you running it on your laptop.

```{r}
gapplyCollect(stocks, "symbol", function(k, x){
  y = data.frame(k, mean(x$volume))
  colnames(y) = c("symbol", "avg_vol")
  y  
  })

```

Loading dataset for Machine Learning

Load power plan dataset

```{r}
pp = read.df("powerplant", "csv", inferSchema = "true", header = "true")
str(pp)
```

Split the data into training and test set.

```{r}
pp.train = sample(pp, withReplacement = FALSE, fraction = 0.7)
pp.test = sample(pp, withReplacement = FALSE, fraction = 0.3)
cat("dim(pp.train): ", dim(pp.train)) 
cat("\ndim(pp.test): ", dim(pp.test)) 
cache(pp)
```

Train a model using glm

```{r}
fit1 =spark.glm(pp.train, PE ~ ., family = "gaussian", regParam = 0.1)
summary(fit1)
```

Make prediction

```{r}
pred = predict(fit1, pp.test)
showDF(pred, 10)
```

Create a temp table on the pred dataset to calculate rmse.

```{r}
createOrReplaceTempView(pred, "pred")
showDF(sql("select sqrt(avg(pow((label - prediction), 2))) as rmse from pred"))
```

Now train the model using Gradient Boosting.

```{r}
fit2 =spark.gbt(pp.train, PE ~ ., type = "regression")
pred = predict(fit2, pp.test)
showDF(pred, 10)
```

Compute the RMSE in the same way and the compare the result with that of glm model.

```{r}
createOrReplaceTempView(pred, "pred")
showDF(sql("select sqrt(avg(pow((label - prediction), 2))) as rmse from pred"))
```