ML Using SparkR
Load Spark package
```{r}
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/usr/lib/spark-2.1.1-bin-hadoop2.7")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g", spark.sql.shuffle.partitions = "5"))
```
Load data
```{r}
stocks = read.df("stocks", "csv", header="true", inferSchema = "true")
createOrReplaceTempView(stocks, "stocks")
str(stocks)
```
Class of stocks
```{r}
class(stocks)
```
Cache table stocks
```{r}
cacheTable("stocks")
```
Check size of the stocks dataframe in memory. As you can see the in R session the memory footprint of stocks object is segligibly small.
```{r}
object.size(stocks)
```
Dimension of the stocks dataframe
```{r}
dim(stocks)
```
```{r}
head(stocks)
```
Find frequency per stock symbol. We are going to perform this in three different ways.
Way 1 - using dplyr like functions from SparkR
```{r}
stocks2016 = filter(stocks, year(stocks$date) == 2016)
head(summarize(groupBy(stocks2016, stocks2016$symbol), avg.volume = mean(stocks2016$volume)))
```
Way 2: using sql statement
```{r}
head(sql("select symbol, avg(volume) from stocks where year(date) = 2016 group by symbol"))
```
Way3: using SparkR's special gapplyCollect function. Note this it takes about 10 mins to finish, if you running it on your laptop.
```{r}
gapplyCollect(stocks, "symbol", function(k, x){
y = data.frame(k, mean(x$volume))
colnames(y) = c("symbol", "avg_vol")
y
})
```
Loading dataset for Machine Learning
Load power plan dataset
```{r}
pp = read.df("powerplant", "csv", inferSchema = "true", header = "true")
str(pp)
```
Split the data into training and test set.
```{r}
pp.train = sample(pp, withReplacement = FALSE, fraction = 0.7)
pp.test = sample(pp, withReplacement = FALSE, fraction = 0.3)
cat("dim(pp.train): ", dim(pp.train))
cat("\ndim(pp.test): ", dim(pp.test))
cache(pp)
```
Train a model using glm
```{r}
fit1 =spark.glm(pp.train, PE ~ ., family = "gaussian", regParam = 0.1)
summary(fit1)
```
Make prediction
```{r}
pred = predict(fit1, pp.test)
showDF(pred, 10)
```
Create a temp table on the pred dataset to calculate rmse.
```{r}
createOrReplaceTempView(pred, "pred")
showDF(sql("select sqrt(avg(pow((label - prediction), 2))) as rmse from pred"))
```
Now train the model using Gradient Boosting.
```{r}
fit2 =spark.gbt(pp.train, PE ~ ., type = "regression")
pred = predict(fit2, pp.test)
showDF(pred, 10)
```
Compute the RMSE in the same way and the compare the result with that of glm model.
```{r}
createOrReplaceTempView(pred, "pred")
showDF(sql("select sqrt(avg(pow((label - prediction), 2))) as rmse from pred"))
```