Hive - Optimize Joins

Environment:

  • VM on Mac OS, 8 GB RAM, cloudera VM 5.8
  • Hive version 1.1.0-cdh5.8.0 (/usr/lib/hive/lib/hive-hwi-1.1.0-cdh5.8.0.jar)

Download movielens dataset (full) and upload movies and ratings data files to following directories

  • /user/cloudera/movielens/movies - movies.csv
  • /user/cloudera/movielens/ratings - ratings.csv

Launch beeline

$ beeline -u jdbc:hive2://localhost:10000/default 

Turn off logging

set hive.server2.logging.operation.enabled=false;

Create movies table

CREATE EXTERNAL TABLE movies(
    movieId string, 
    title string, 
    genres string)
ROW FORMAT SERDE
    'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar' = '"',
    'escapeChar' = '\\')
STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
    '/user/cloudera/movielens/movies'
TBLPROPERTIES (
    'serialization.null.format' = '',
    'skip.header.line.count' = '1');

Create ratings table

CREATE EXTERNAL TABLE ratings(
    userId string,
    movieId string,
    rating double,
    timestamp bigint)
ROW FORMAT SERDE
    'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar' = '"',
    'escapeChar' = '\\')
STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
    '/user/cloudera/movielens/ratings'
TBLPROPERTIES (
    'serialization.null.format' = '',
    'skip.header.line.count' = '1');
SELECT t2.movieid, t2.title, avg(t1.rating) from ratings t1 left join movies t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

40,110 rows selected (331.265 seconds)

select count(*) from movies;

40110

1 row selected (24.879 seconds)

select count(*) from ratings;

24404096

1 row selected (107.387 seconds)

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings t1 left join movies t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (173.936 seconds)

Columnar Format and PPD

set hive.optimize.ppd;
set parquet.compression=SNAPPY;
drop table if exists movies_parquet;
create table movies_parquet like movies stored as parquet;
insert into movies_parquet select * from movies;

No rows affected (21.101 seconds)

drop table if exists ratings_parquet;
create table ratings_parquet like ratings stored as parquet;
insert into ratings_parquet select * from ratings;

No rows affected (181.23 seconds)

select count(*) from movies_parquet;

40110

1 row selected (30.167 seconds)

select count(*) from ratings_parquet;

24404096

1 row selected (31.289 seconds)

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (79.771 seconds)

Enable Vectorized Computation

set hive.vectorized.execution.enabled = false;
set hive.vectorized.execution.reduce.enabled = false;
SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (79.771 seconds)

set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (89.253 seconds)

Cost Based Optimization

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
analyze table ratings_parquet compute statistics;
analyze table ratings_parquet compute statistics for columns;
SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (78.255 seconds)

analyze table movies_parquet compute statistics;
analyze table movies_parquet compute statistics for columns;
SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (83.978 seconds)

Bucketing and Map Join

set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=25000000
set hive.enforce.bucketing = true;
set hive.optimize.bucketmapjoin = true;
drop table if exists ratings_bucketed;
CREATE TABLE ratings_bucketed(userid string, movieid string, rating double, timestamp bigint) CLUSTERED BY (movieid) SORTED BY (userid ASC) INTO 96 BUCKETS stored as parquet;
insert into ratings_bucketed select * from ratings_parquet;
SELECT /*+ MAPJOIN(t2) */ t2.movieid, t2.title, avg(t1.rating) from ratings_bucketed t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (79.505 seconds)

analyze table ratings_bucketed compute statistics;
analyze table ratings_bucketed compute statistics for columns;
SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_bucketed t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (74.526 seconds)