Hive Join Example

Goal: example of join operation in hive on csv file.

Download movielens dataset http://grouplens.org/datasets/movielens/.

Unzip the files and put each file in their own directory. Hive requires directory as location rather than file as location.

Create a table based on the movies data using delimited file.

hive> create external table movies (
    movieId int, 
    title string, 
    genres string)
row format delimited 
fields terminated by ',' 
lines terminated by '\n' 
location '/user/cloudera/movielens/movies'
tblproperties ("skip.header.line.count"="1");

Do you see the problem with the bad data? It seems some columns have commas in value broke the file structure. How to fix this? Hive serde can come to rescue.

Create a table using hive serde. Drop the table if it already exists.

hive> CREATE EXTERNAL TABLE movies(
    movieId string, 
    title string, 
    genres string)
ROW FORMAT SERDE
    'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar' = '"',
    'escapeChar' = '\\')
STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
    '/user/cloudera/movielens/movies'
TBLPROPERTIES (
    'serialization.null.format' = '',
    'skip.header.line.count' = '1');

Note: Converts all column type values to STRING. Even if you define any column as int or something else, but still it converts data type back to STRING. To convert to other data type use "cast" UDF function.

Same way create a table on the ratings data.

hive> CREATE EXTERNAL TABLE ratings(
    userId string,
    movieId string,
    rating double,
    `timestamp` bigint)
ROW FORMAT SERDE
    'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar' = '"',
    'escapeChar' = '\\')
STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
    '/user/cloudera/movielens/ratings'
TBLPROPERTIES (
    'serialization.null.format' = '',
    'skip.header.line.count' = '1');

Note: timestamp is a reserved word in certain distribution. So, enclosing with back tick


Now join the two tables.

hive> SELECT movieid, title, avg(rating) from movies left join ratings on movies.movieid = ratings.movieid group by movies.movieid, title;

Find which day of the week most of ratings are posted.

hive> select from_unixtime(cast(timestamp as bigint),'E'), count(*) from ratings group by from_unixtime(cast(timestamp as bigint),'E');

See available date formats here.

Movie Recommendation Based on Genres

Create exploded view of movie id and genre

hive> create view movie_by_genre as select movieid, genre from (select movieid, split(genres, '\\|') genres from movies) t lateral view explode(genres) t as genre;

Find for each user, the rank of genre. Ideally you would like to compute weighted

hive> create temporary table movie_by_user_genre as 
select t1.*, t2.rating,t2.userid from movie_by_genre t1 
left join ratings t2 on t1.movieid = t2.movieid where t2.rating >= 4;

Create a temporary view for user and his total ratings by genre

hive> create temporary table user_by_genre_totalrating as select userid, genre, sum(rating) total_rating from movie_by_user_genre group by userid, genre

Find top 3 genres for each user and create a temporary table for that.

hive> create temporary table user_top3_genres as select * from 
(select userid, genre, row_number() over (partition by userid order by total_rating desc) row_num from user_by_genre_totalrating) t where t.row_num <= 3

Create temporary table that contain for movie and genre combination the average rating.

hive> create temporary table genre_movie_avgrating as
select t1.movieid, t1.genre, avg(t2.rating) as avg_rating from movie_by_genre t1 left join ratings t2 on t1.movieid = t2.movieid group by t1.movieid, t1.genre

Create a temporary table for top 10 movies for each genre.

hive> create temporary table top10movies_by_genre as
select * from (select genre, movieid, avg_rating, row_number() over (partition by genre order by avg_rating desc) as row_num from genre_movie_avgrating) t1 where t1.row_num <= 10

Find movies 10 movies recommendation for each user

hive> create temporary table user_genre_avgrating as 
select t1.userid, t1.genre, t2.movieid, t2.avg_rating from user_top3_genres t1 left join top10movies_by_genre t2 on t1.genre = t2.genre
hive> create temporary table user_movie_reco as 
select t1.*, t2.title, t2.genres from (select userid, movieid, genre, avg_rating, row_number() over (partition by userid order by avg_rating desc) row_num from user_genre_avgrating) t1 join movies t2 on t1.movieid = t2.movieid  where t1.row_num <= 10

View the recommendation

hive> select userid, title, genre, avg_rating, genres from user_movie_reco

CTAS has the following properties

-- cannot be a partitioned table

-- cannot be an external table

-- cannot be a list-bucketing table


Save the output of a query to a HDFS directory

hive> INSERT OVERWRITE DIRECTORY '/user/cloudera/movies_action'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
NULL DEFINED AS ''
SELECT * FROM movies WHERE genres like '%Action%' 

View the files created in HDFS

$ hadoop fs -ls /user/cloudera/movies_action
Found 3 items
drwxr-xr-x   - cloudera cloudera          1 2018-08-28 03:11 /user/cloudera/movies_action/.hive-staging_hive_2018-08-28_03-11-47_260_3940892981476728227-1
-rwxr-xr-x   1 cloudera cloudera      76897 2018-08-28 03:11 /user/cloudera/movies_action/000000_0
drwx------   - cloudera cloudera          1 2018-08-28 03:11 /user/cloudera/movies_action/_scratchdir_hive_2018-08-28_03-11-47_260_3940892981476728227-13


View the content of the output

$ hadoop fs -cat /user/mapr/movies_action/0* | head -n 5
6,Heat (1995),Action|Crime|Thriller
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
15,Cutthroat Island (1995),Action|Adventure|Romance
20,Money Train (1995),Action|Comedy|Crime|Drama|Thriller