RDD Operations (Scala)
Launch Spark
$ $SPARK_HOME/bin/spark-shell
Working with textual data
Create a directory in HDFS /user/cloudera/Shakespeare.
Download complete works of Shakespeare from gutenberg.org and upload to the above directory.
val shakespeare = sc.textFile(“/user/cloudera/Shakespeare”)
shakespeare.count
Load stop words
import scala.io.Source
val stopWords = Source.fromURL("http://www.textfixer.com/tutorials/common-english-words.txt").mkString.split(",")
stopWords: Array[String] = Array(a, able, about, across, after, all, almost, also, am, among, an, and, any, are, as, at, be, because, been, but, by, can, cannot, could, dear, did, do, does, either, else, ever, every, for, from, get, got, had, has, have, he, her, hers, him, his, how, however, i, if, in, into, is, it, its, just, least, let, like, likely, may, me, might, most, must, my, neither, no, nor, not, of, off, often, on, only, or, other, our, own, rather, said, say, says, she, should, since, so, some, than, that, the, their, them, then, there, these, they, this, tis, to, too, twas, us, wants, was, we, were, what, when, where, which, while, who, whom, why, will, with, would, yet, you, your)
Show top 100 most frequently used words. Ignore cases and remove stop words and words having single letter.
shakespeare
.flatMap{_.toLowerCase.split("\\W+").filter(word => word.length > 1 && !stopWords.contains(word))}
.map(word => (word, 1))
.reduceByKey(_+_)
.sortBy(p => p._2, false)
.take(20)
.foreach(println)
(thou,5549)
(thy,4034)
(shall,3600)
(thee,3181)
(lord,3094)
(king,3041)
(good,2834)
(now,2792)
(sir,2764)
(come,2519)
(ll,2409)
(enter,2357)
(here,2331)
(more,2292)
(well,2241)
(love,2198)
(man,2034)
(hath,1942)
(one,1806)
(upon,1759)
Stocks Datasets
Download the dataset from here and unzip it and load it to HDFS location /user/cloudera/stocks
Find top 10 most traded stocks by volume in 2016.
val stocks = sc.textFile("/user/cloudera/stocks")
stocks.sample(false, 0.00001, 100).take(10).foreach(println)
2001-05-04,27.120001,29.15,26.549999,28.73,3101900.0,28.73,CTXS
2001-06-07,70.029999,70.25,69.230003,69.739998,650700.0,44.888058,PNC
2003-07-29,32.57,32.689999,31.5,31.559999,1.69492E7,15.78,YHOO
2004-02-11,20.25,20.46999,19.95,20.45001,607200.0,6.10041,FLS
2007-01-29,29.15,29.26,28.709999,28.76,1.70013E7,25.521385,HAL
2007-02-05,139.339996,140.0,138.399994,139.660004,3366400.0,29.400928,CMI
2008-08-25,67.449997,67.489998,66.279999,66.449997,3202200.0,54.625587,COST
2008-11-28,15.05,15.87,14.78,15.46,235000.0,15.46,REGN
2009-01-28,41.540001,41.790001,40.790001,41.18,1971000.0,29.345599,ED
2009-02-13,7.74,7.81,7.52,7.58,5169000.0,6.4993,NWL
stocks
.filter(_.startsWith("2016"))
.map(_.split(","))
.map(tokens => (tokens(7), tokens(5).toDouble))
.groupByKey()
.mapValues(values => values.sum/values.size)
.sortBy(_._2, false)
.take(10)
.foreach(println)
(BAC,1.0995368974358974E8)
(FCX,4.7979558333333336E7)
(CHK,4.1622735256410256E7)
(AAPL,4.0944183974358976E7)
(GE,3.7751663461538464E7)
(F,3.743219743589743E7)
(PFE,3.5777183974358976E7)
(MSFT,3.419444807692308E7)
(FB,2.8902566025641024E7)
(MU,2.7260807692307692E7)
Which top 10 stocks have given highest returns as of July 1st, 2016 calculated over a year.
val stocks2016 = stocks
.filter(_.startsWith("2016-07-01"))
.map{line =>
val tokens = line.split(",")
(tokens(7), tokens(6).toDouble)
}
val stocks2015 = stocks
.filter(_.startsWith("2015-07-01"))
.map{line =>
val tokens = line.split(",")
(tokens(7), tokens(6).toDouble)
}
val stocksJoined = stocks2015.join(stocks2016)
stocksJoined
.map(t => (t._1, (t._2._2 - t._2._1) / t._2._1))
.sortBy(_._2, false)
.collect
.take(10)
.foreach(println)
(NVDA,1.3200093397032653)
(NEM,0.7519792392631666)
(AWK,0.7496908832327756)
(DLR,0.6655013759638937)
(AMZN,0.659114218691069)
(ATVI,0.6147325322613237)
(O,0.6123244112060771)
(EQIX,0.6102184013923561)
(ULTA,0.5682666847029557)
(TSN,0.552133357084752)
MovieLens Dataset
Download the dataset from https://grouplens.org/datasets/movielens/
Find top 10 movies based on average rating. Consider only those movies that have received 100 or more ratings. Include movie title in the output.
Load movies.csv as movies and ratings.csv as ratings RDD
Extract the title field from movies RDD. Note that some title contains comma in them. You can use a univocity library to parse.
Find average rating and rating count for each movieId from ratings RDD
Filter out all records that have fewer than 100 rating counts
Sort the data by average rating descending order and take top 10 records.
Join this with movies to include title in this dataset.
Finally save the output to HDFS. Include movieId, title, average rating
val ratings = sc
.textFile("/user/cloudera/movie-lens/ratings")
.filter(!_.startsWith("userId"))
.map(_.split(","))
.map(tokens => (tokens(1), tokens(2).toFloat))
ratings.take(10).foreach(println)
(16,4.0)
(24,1.5)
(32,4.0)
(47,4.0)
(50,4.0)
(110,4.0)
(150,3.0)
(161,4.0)
(165,3.0)
(204,0.5)
val ratingAvg = ratings
.groupByKey()
.mapValues(values => (values.sum / values.size, values.size))
.filter(_._2._2 >= 100)
.map(p => (p._1, p._2._1))
ratingAvg.take(10).foreach(println)
(1968,3.7584746)
(1580,3.627907)
(293,4.1284723)
(4886,3.8793104)
(161,3.82243)
(2716,3.8741007)
(1214,4.0641026)
(8961,3.9264705)
(750,4.116)
(1193,4.2727275)
val movies = sc
.textFile("/user/cloudera/movie-lens/movies")
.filter(!_.startsWith("movieId"))
.map(_.split(","))
.map(tokens => (tokens(0), tokens(1)))
movies.take(10).foreach(println)
(1,Toy Story (1995))
(2,Jumanji (1995))
(3,Grumpier Old Men (1995))
(4,Waiting to Exhale (1995))
(5,Father of the Bride Part II (1995))
(6,Heat (1995))
(7,Sabrina (1995))
(8,Tom and Huck (1995))
(9,Sudden Death (1995))
(10,GoldenEye (1995))
ratingAvg.join(movies).take(10).foreach(println)
(1968,((3.7584746,118),"Breakfast Club))
(1580,((3.627907,172),Men in Black (a.k.a. MIB) (1997)))
ratingAvg.join(movies).sortBy(_._2._1, false).take(10).foreach(println)
(318,(4.4545455,"Shawshank Redemption))
(858,(4.392857,"Godfather))
(50,(4.3289475,"Usual Suspects))
(1136,(4.301948,Monty Python and the Holy Grail (1975)))
(527,(4.296371,Schindler's List (1993)))
(1193,(4.2727275,One Flew Over the Cuckoo's Nest (1975)))
(608,(4.2711444,Fargo (1996)))
(2571,(4.2643676,"Matrix))
(1221,(4.260714,"Godfather: Part II))
(1213,(4.259259,Goodfellas (1990)))