Simple Dataframe Operations

Load movies.csv as a data frame

val movies = spark
.format("csv") // Indicates the format is of csv type
.option("header", "true")

View a few sample values

Print the schema of movies dataframe


Load ratings.csv as dataframe

val ratings = spark
  "header" -> "true",  //indicates the file contains header
  "inferSchema" -> "true", //infer schema for field type. If not specified all fields will be of string type
  "sep" -> "," // Specity seperator. Default is command(,). 

Print schema of ratings dataframe


View a few sample values

Get access to underlying RDD. Notice, the RDD contains object of type Row class.


Register the movies dataframe as a temporary table. Temporary tables are accessible only till your session lasts.


Register the ratings dataframe as ratings temporary table.


View the available table. The temporary tables shows up and "Is Temporary" attribute of the table should true.

sql("show tables").show

Now, you are write SQL query on the registered temporary table. Here, we are trying to fing our the avarage rating of the movies by movieId and title.

val ratingsAvg = sql("""select t1.movieid, t1.title, avg(t2.rating) avg_rating from movies t1
join ratings t2 on t1.movieid = t2.movieid group by t1.movieId, t1.title""")

View some values from the dataframe generated out of the query.

Let's save the dataframe to a persistent file system.

.coalesce(1) // reducing partition ensures the num of output file will be 1
.format("parquet") // format of the output file. You can specify csv, json etc.
.mode("overwrite") // If you have exiting file in the target dir, spark will overwrite
.save("/user/cloudera/ratingsAvg") //target dir where the files will be save
sql("select * from parquet.`/user/cloudera/ratingsAvg`").show(5)