Hive - Optimize Joins

Environment:

  • VM on Mac OS, 8 GB RAM, cloudera VM 5.8

  • Hive version 1.1.0-cdh5.8.0 (/usr/lib/hive/lib/hive-hwi-1.1.0-cdh5.8.0.jar)

Download movielens dataset (full) and upload movies and ratings data files to following directories

  • /user/cloudera/movielens/movies - movies.csv

  • /user/cloudera/movielens/ratings - ratings.csv

Launch beeline

$ beeline -u jdbc:hive2://localhost:10000/default

Turn off logging

set hive.server2.logging.operation.enabled=false;

Create movies table

CREATE EXTERNAL TABLE movies(

movieId string,

title string,

genres string)

ROW FORMAT SERDE

'org.apache.hadoop.hive.serde2.OpenCSVSerde'

WITH SERDEPROPERTIES (

'separatorChar' = ',',

'quoteChar' = '"',

'escapeChar' = '\\')

STORED AS INPUTFORMAT

'org.apache.hadoop.mapred.TextInputFormat'

OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

LOCATION

'/user/cloudera/movielens/movies'

TBLPROPERTIES (

'serialization.null.format' = '',

'skip.header.line.count' = '1');

Create ratings table

CREATE EXTERNAL TABLE ratings(

userId string,

movieId string,

rating double,

timestamp bigint)

ROW FORMAT SERDE

'org.apache.hadoop.hive.serde2.OpenCSVSerde'

WITH SERDEPROPERTIES (

'separatorChar' = ',',

'quoteChar' = '"',

'escapeChar' = '\\')

STORED AS INPUTFORMAT

'org.apache.hadoop.mapred.TextInputFormat'

OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

LOCATION

'/user/cloudera/movielens/ratings'

TBLPROPERTIES (

'serialization.null.format' = '',

'skip.header.line.count' = '1');

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings t1 left join movies t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

40,110 rows selected (331.265 seconds)

select count(*) from movies;

40110

1 row selected (24.879 seconds)

select count(*) from ratings;

24404096

1 row selected (107.387 seconds)



set hive.auto.convert.join=false;


SELECT t2.movieid, t2.title, avg(t1.rating) from ratings t1 left join movies t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (173.936 seconds)

Columnar Format and PPD

set hive.optimize.ppd;

set parquet.compression=SNAPPY;

drop table if exists movies_parquet;

create table movies_parquet like movies stored as parquet;

insert into movies_parquet select * from movies;

No rows affected (21.101 seconds)

drop table if exists ratings_parquet;

create table ratings_parquet like ratings stored as parquet;

insert into ratings_parquet select * from ratings;

No rows affected (181.23 seconds)

select count(*) from movies_parquet;

40110

1 row selected (30.167 seconds)

select count(*) from ratings_parquet;

24404096

1 row selected (31.289 seconds)

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (79.771 seconds)

Enable Vectorized Computation

set hive.vectorized.execution.enabled = false;

set hive.vectorized.execution.reduce.enabled = false;

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (79.771 seconds)

set hive.vectorized.execution.enabled = true;

set hive.vectorized.execution.reduce.enabled = true;

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (89.253 seconds)

Cost Based Optimization


Hive’s Cost-Based Optimizer (CBO) is a core component in Hive’s query processing engine. Powered by Apache Calcite, the CBO optimizes and calculates the cost of various plans for a query.

The main goal of a CBO is to generate efficient execution plans by examining the tables and conditions specified in the query, ultimately cutting down on query execution time and reducing resource utilization. After parsing, a query gets converted to a logical tree (Abstract Syntax Tree) that represents the operations that the query must perform, such as reading a particular table or performing an inner JOIN.

Calcite applies various optimizations such as query rewrite, JOIN reordering, and deriving implied predicates and JOIN elimination to produce logically equivalent plans. The current model prefers bushy plans for maximum parallelism. Each logical plan is assigned a cost based in number of distinct value based heuristics.

Calcite has an efficient plan pruner that can select the cheapest query plan. The chosen logical plan is then converted by Hive to a physical operator tree, optimized and converted to Tez jobs, and then executed on the Hadoop cluster.

(https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.6.4/bk_hive-performance-tuning/content/ch_cost-based-optimizer.html)


Learn about table statistics

https://cwiki.apache.org/confluence/display/Hive/StatsDev

Learn about column statistics

https://cwiki.apache.org/confluence/display/Hive/Column+Statistics+in+Hive



set hive.cbo.enable=true;

set hive.compute.query.using.stats=true;

set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

analyze table ratings_parquet compute statistics;

analyze table ratings_parquet compute statistics for columns;

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (78.255 seconds)

analyze table movies_parquet compute statistics;

analyze table movies_parquet compute statistics for columns;

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_parquet t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (83.978 seconds)

Bucketing and Map Join

set hive.auto.convert.join=true;

set hive.mapjoin.smalltable.filesize=25000000

set hive.enforce.bucketing = true;

set hive.optimize.bucketmapjoin = true;

drop table if exists ratings_bucketed;

CREATE TABLE ratings_bucketed(userid string, movieid string, rating double, timestamp bigint) CLUSTERED BY (movieid) SORTED BY (userid ASC) INTO 96 BUCKETS stored as parquet;

insert into ratings_bucketed select * from ratings_parquet;

SELECT /*+ MAPJOIN(t2) */ t2.movieid, t2.title, avg(t1.rating) from ratings_bucketed t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (79.505 seconds)

analyze table ratings_bucketed compute statistics;

analyze table ratings_bucketed compute statistics for columns;

SELECT t2.movieid, t2.title, avg(t1.rating) from ratings_bucketed t1 left join movies_parquet t2 on t1.movieid = t2.movieid group by t2.movieid, t2.title;

39,443 rows selected (74.526 seconds)