RDD Operations (Scala)

Launch Spark

$ $SPARK_HOME/bin/spark-shell

Working with textual data

Create a directory in HDFS /user/cloudera/Shakespeare.

Download complete works of Shakespeare from gutenberg.org and upload to the above directory.

val shakespeare = sc.textFile(“/user/cloudera/Shakespeare”)
shakespeare.count

Load stop words

import scala.io.Source
val stopWords = Source.fromURL("http://www.textfixer.com/tutorials/common-english-words.txt").mkString.split(",")
stopWords: Array[String] = Array(a, able, about, across, after, all, almost, also, am, among, an, and, any, are, as, at, be, because, been, but, by, can, cannot, could, dear, did, do, does, either, else, ever, every, for, from, get, got, had, has, have, he, her, hers, him, his, how, however, i, if, in, into, is, it, its, just, least, let, like, likely, may, me, might, most, must, my, neither, no, nor, not, of, off, often, on, only, or, other, our, own, rather, said, say, says, she, should, since, so, some, than, that, the, their, them, then, there, these, they, this, tis, to, too, twas, us, wants, was, we, were, what, when, where, which, while, who, whom, why, will, with, would, yet, you, your)

Show top 100 most frequently used words. Ignore cases and remove stop words and words having single letter.

shakespeare
.flatMap{_.toLowerCase.split("\\W+").filter(word => word.length > 1 && !stopWords.contains(word))}
.map(word => (word, 1))
.reduceByKey(_+_)
.sortBy(p => p._2, false)
.take(20)
.foreach(println)
(thou,5549)
(thy,4034)
(shall,3600)
(thee,3181)
(lord,3094)
(king,3041)
(good,2834)
(now,2792)
(sir,2764)
(come,2519)
(ll,2409)
(enter,2357)
(here,2331)
(more,2292)
(well,2241)
(love,2198)
(man,2034)
(hath,1942)
(one,1806)
(upon,1759)

Stocks Datasets

Download the dataset from here and unzip it and load it to HDFS location /user/cloudera/stocks

Find top 10 most traded stocks by volume in 2016.

val stocks = sc.textFile("/user/cloudera/stocks")
stocks.sample(false, 0.00001, 100).take(10).foreach(println)
2001-05-04,27.120001,29.15,26.549999,28.73,3101900.0,28.73,CTXS
2001-06-07,70.029999,70.25,69.230003,69.739998,650700.0,44.888058,PNC
2003-07-29,32.57,32.689999,31.5,31.559999,1.69492E7,15.78,YHOO
2004-02-11,20.25,20.46999,19.95,20.45001,607200.0,6.10041,FLS
2007-01-29,29.15,29.26,28.709999,28.76,1.70013E7,25.521385,HAL
2007-02-05,139.339996,140.0,138.399994,139.660004,3366400.0,29.400928,CMI
2008-08-25,67.449997,67.489998,66.279999,66.449997,3202200.0,54.625587,COST
2008-11-28,15.05,15.87,14.78,15.46,235000.0,15.46,REGN
2009-01-28,41.540001,41.790001,40.790001,41.18,1971000.0,29.345599,ED
2009-02-13,7.74,7.81,7.52,7.58,5169000.0,6.4993,NWL
stocks
.filter(_.startsWith("2016"))
.map(_.split(","))
.map(tokens => (tokens(7), tokens(5).toDouble))
.groupByKey()
.mapValues(values => values.sum/values.size)
.sortBy(_._2, false)
.take(10)
.foreach(println)
(BAC,1.0995368974358974E8)
(FCX,4.7979558333333336E7)
(CHK,4.1622735256410256E7)
(AAPL,4.0944183974358976E7)
(GE,3.7751663461538464E7)
(F,3.743219743589743E7)
(PFE,3.5777183974358976E7)
(MSFT,3.419444807692308E7)
(FB,2.8902566025641024E7)
(MU,2.7260807692307692E7)

Which top 10 stocks have given highest returns as of July 1st, 2016 calculated over a year.

val stocks2016 = stocks
.filter(_.startsWith("2016-07-01"))
.map{line =>
    val tokens = line.split(",")
    (tokens(7), tokens(6).toDouble)
}
val stocks2015 = stocks
.filter(_.startsWith("2015-07-01"))
.map{line =>
    val tokens = line.split(",")
    (tokens(7), tokens(6).toDouble)
}
val stocksJoined = stocks2015.join(stocks2016)
stocksJoined
.map(t => (t._1, (t._2._2 - t._2._1) / t._2._1))
.sortBy(_._2, false)
.collect
.take(10)
.foreach(println)
(NVDA,1.3200093397032653)
(NEM,0.7519792392631666)
(AWK,0.7496908832327756)
(DLR,0.6655013759638937)
(AMZN,0.659114218691069)
(ATVI,0.6147325322613237)
(O,0.6123244112060771)
(EQIX,0.6102184013923561)
(ULTA,0.5682666847029557)
(TSN,0.552133357084752)

MovieLens Dataset

Download the dataset from https://grouplens.org/datasets/movielens/

Find top 10 movies based on average rating. Consider only those movies that have received 100 or more ratings. Include movie title in the output.

  • Load movies.csv as movies and ratings.csv as ratings RDD
  • Extract the title field from movies RDD. Note that some title contains comma in them. You can use a univocity library to parse.
  • Find average rating and rating count for each movieId from ratings RDD
  • Filter out all records that have fewer than 100 rating counts
  • Sort the data by average rating descending order and take top 10 records.
  • Join this with movies to include title in this dataset.
  • Finally save the output to HDFS. Include movieId, title, average rating


val ratings = sc
.textFile("/user/cloudera/movie-lens/ratings")
.filter(!_.startsWith("userId"))
.map(_.split(","))
.map(tokens => (tokens(1), tokens(2).toFloat))
ratings.take(10).foreach(println)
(16,4.0)
(24,1.5)
(32,4.0)
(47,4.0)
(50,4.0)
(110,4.0)
(150,3.0)
(161,4.0)
(165,3.0)
(204,0.5)
val ratingAvg = ratings 
.groupByKey()
.mapValues(values => (values.sum / values.size, values.size))
.filter(_._2._2 >= 100)
.map(p => (p._1, p._2._1))

ratingAvg.take(10).foreach(println)
(1968,3.7584746)
(1580,3.627907)
(293,4.1284723)
(4886,3.8793104)
(161,3.82243)
(2716,3.8741007)
(1214,4.0641026)
(8961,3.9264705)
(750,4.116)
(1193,4.2727275)

val movies = sc
.textFile("/user/cloudera/movie-lens/movies")
.filter(!_.startsWith("movieId"))
.map(_.split(","))
.map(tokens => (tokens(0), tokens(1))) 
movies.take(10).foreach(println)
(1,Toy Story (1995))
(2,Jumanji (1995))
(3,Grumpier Old Men (1995))
(4,Waiting to Exhale (1995))
(5,Father of the Bride Part II (1995))
(6,Heat (1995))
(7,Sabrina (1995))
(8,Tom and Huck (1995))
(9,Sudden Death (1995))
(10,GoldenEye (1995))

ratingAvg.join(movies).take(10).foreach(println)
(1968,((3.7584746,118),"Breakfast Club))
(1580,((3.627907,172),Men in Black (a.k.a. MIB) (1997)))

ratingAvg.join(movies).sortBy(_._2._1, false).take(10).foreach(println)
(318,(4.4545455,"Shawshank Redemption))
(858,(4.392857,"Godfather))
(50,(4.3289475,"Usual Suspects))
(1136,(4.301948,Monty Python and the Holy Grail (1975)))
(527,(4.296371,Schindler's List (1993)))
(1193,(4.2727275,One Flew Over the Cuckoo's Nest (1975)))
(608,(4.2711444,Fargo (1996)))
(2571,(4.2643676,"Matrix))
(1221,(4.260714,"Godfather: Part II))
(1213,(4.259259,Goodfellas (1990)))