Hive Table - Indexing
Before continuing with this article, note indexes has been removed with Hive 3.0, they recommend to use materialized view to achieve similar results but I would say go with columnar storage like PARQUET or ORC, they they can do selective scanning and even skip entire files/blocks.
Upload the following file to HDFS in location /user/cloudera/sfpd in decompressed format.
Create a hive table on top of this dataset.
hive> CREATE EXTERNAL TABLE sfpd(
IncidntNum string,
Category string,
Descript string,
DayOfWeek string,
Date string,
Time string,
PdDistrict string,
Resolution string,
Address string,
X string,
Y string,
Location string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '"',
'escapeChar' = '\\')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'/user/cloudera/sfpd'
TBLPROPERTIES (
'serialization.null.format' = '',
'skip.header.line.count' = '1');
Find top most frequent categories of incidents.
hive> select category, count(*) cnt from sfpd group by category order by cnt desc limit 10;
Query ID = cloudera_20191127091313_acf7795f-21a4-4429-9b0c-178ad1eee29b
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 2
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1574873894464_0003, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1574873894464_0003/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1574873894464_0003
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 2
2019-11-27 09:13:11,467 Stage-1 map = 0%, reduce = 0%
2019-11-27 09:13:23,245 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 4.7 sec
2019-11-27 09:13:25,391 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 11.62 sec
2019-11-27 09:13:31,825 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 13.71 sec
MapReduce Total cumulative CPU time: 13 seconds 710 msec
Ended Job = job_1574873894464_0003
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1574873894464_0004, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1574873894464_0004/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1574873894464_0004
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2019-11-27 09:13:40,680 Stage-2 map = 0%, reduce = 0%
2019-11-27 09:13:46,905 Stage-2 map = 100%, reduce = 0%, Cumulative CPU 0.67 sec
2019-11-27 09:13:53,103 Stage-2 map = 100%, reduce = 100%, Cumulative CPU 1.72 sec
MapReduce Total cumulative CPU time: 1 seconds 720 msec
Ended Job = job_1574873894464_0004
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 2 Cumulative CPU: 13.71 sec HDFS Read: 382748285 HDFS Write: 1495 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 1.72 sec HDFS Read: 6793 HDFS Write: 187 SUCCESS
Total MapReduce CPU Time Spent: 15 seconds 430 msec
OK
LARCENY/THEFT 385774
OTHER OFFENSES 269250
NON-CRIMINAL 200942
ASSAULT 165324
VEHICLE THEFT 114258
DRUG/NARCOTIC 111436
VANDALISM 96350
WARRANTS 89782
BURGLARY 78968
SUSPICIOUS OCC 67788
Time taken: 48.904 seconds, Fetched: 10 row(s)
Note the time taken to run this query.
Hive support 2 types of index - compact index and bitmap index.
Create bitmap index. Since we mention deferred rebuild option, index is not materialzed at his moment.
hive> CREATE INDEX sfpd_index_bitmap ON TABLE sfpd (category) AS 'BITMAP' WITH DEFERRED REBUILD;
OK
Time taken: 1.32 seconds
Show the existing indexes on sfpd table.
hive> show index on sfpd;
OK
sfpd_index_bitmap sfpd category default__sfpd_sfpd_index_bitmap__ bitmap
Time taken: 0.034 seconds, Fetched: 1 row(s)
Build the index data.
hive> ALTER INDEX sfpd_index_bitmap on sfpd REBUILD;
Query ID = cloudera_20191127091616_9e15dc73-42af-4b90-aa40-6b55c072eea1
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 2
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1574873894464_0005, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1574873894464_0005/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1574873894464_0005
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 2
2019-11-27 09:16:10,144 Stage-1 map = 0%, reduce = 0%
2019-11-27 09:16:27,461 Stage-1 map = 17%, reduce = 0%, Cumulative CPU 9.59 sec
2019-11-27 09:16:29,141 Stage-1 map = 58%, reduce = 0%, Cumulative CPU 18.65 sec
2019-11-27 09:16:30,295 Stage-1 map = 67%, reduce = 0%, Cumulative CPU 19.21 sec
2019-11-27 09:16:33,582 Stage-1 map = 83%, reduce = 0%, Cumulative CPU 23.9 sec
2019-11-27 09:16:38,011 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 25.98 sec
2019-11-27 09:16:49,604 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 42.2 sec
MapReduce Total cumulative CPU time: 42 seconds 200 msec
Ended Job = job_1574873894464_0005
Loading data to table default.default__sfpd_sfpd_index_bitmap__
Table default.default__sfpd_sfpd_index_bitmap__ stats: [numFiles=2, numRows=1888567, totalSize=259593408, rawDataSize=257704841]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 2 Cumulative CPU: 42.2 sec HDFS Read: 382754711 HDFS Write: 259593624 SUCCESS
Total MapReduce CPU Time Spent: 42 seconds 200 msec
OK
Time taken: 48.426 seconds
Index table is a like normal table that can be queried.
hive> select * from default__sfpd_sfpd_index_bitmap__ limit 5;
OK
ASSAULT hdfs://quickstart.cloudera:8020/user/cloudera/sfpd/Map__Crime_Incidents_-_from_1_Jan_2003.csv 0 [1,2,4,8589934592,1,0]
LARCENY/THEFT hdfs://quickstart.cloudera:8020/user/cloudera/sfpd/Map__Crime_Incidents_-_from_1_Jan_2003.csv 265 [1,2,4,8589934592,1,0]
BURGLARY hdfs://quickstart.cloudera:8020/user/cloudera/sfpd/Map__Crime_Incidents_-_from_1_Jan_2003.csv 1133 [1,2,4,8589934592,1,0]
SEX OFFENSES, FORCIBLE hdfs://quickstart.cloudera:8020/user/cloudera/sfpd/Map__Crime_Incidents_-_from_1_Jan_2003.csv 2602 [1,2,4,8589934592,1,0]
VEHICLE THEFT hdfs://quickstart.cloudera:8020/user/cloudera/sfpd/Map__Crime_Incidents_-_from_1_Jan_2003.csv 3056 [1,2,4,8589934592,1,0]
Time taken: 0.067 seconds, Fetched: 5 row(s)
Describe the index. Notice the bit index has 4 columns - index column, _bucket_name, _offset and _bitmaps.
hive> describe formatted default__sfpd_sfpd_index_bitmap__;
OK
# col_name data_type comment
category string from deserializer
_bucketname string
_offset bigint
_bitmaps array<bigint>
# Detailed Table Information
Database: default
Owner: cloudera
CreateTime: Wed Nov 27 09:15:26 PST 2019
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://quickstart.cloudera:8020/user/hive/warehouse/default__sfpd_sfpd_index_bitmap__
Table Type: INDEX_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE true
hive.index.basetbl.dfs.lastModifiedTime 1574875010953
numFiles 2
numRows 1888567
rawDataSize 257704841
totalSize 259593408
transient_lastDdlTime 1574875011
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: [Order(col:category, order:1)]
Time taken: 0.076 seconds, Fetched: 33 row(s)
If you use previous query to find 10 most frequent categories of incidents, the queries not run any significantly, because hive query optimiser does not leverage index data by default.
hive> select category, count(*) cnt from sfpd group by category order by cnt desc limit 10;
Query ID = cloudera_20191127091717_4714ae55-76fd-4aff-b82e-dcd3b3ed7205
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 2
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1574873894464_0006, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1574873894464_0006/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1574873894464_0006
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 2
2019-11-27 09:17:31,832 Stage-1 map = 0%, reduce = 0%
2019-11-27 09:17:44,384 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 4.68 sec
2019-11-27 09:17:47,488 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 12.05 sec
2019-11-27 09:17:52,744 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 14.33 sec
MapReduce Total cumulative CPU time: 14 seconds 330 msec
Ended Job = job_1574873894464_0006
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1574873894464_0007, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1574873894464_0007/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1574873894464_0007
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2019-11-27 09:18:03,256 Stage-2 map = 0%, reduce = 0%
2019-11-27 09:18:08,408 Stage-2 map = 100%, reduce = 0%, Cumulative CPU 0.77 sec
2019-11-27 09:18:14,621 Stage-2 map = 100%, reduce = 100%, Cumulative CPU 1.96 sec
MapReduce Total cumulative CPU time: 1 seconds 960 msec
Ended Job = job_1574873894464_0007
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 2 Cumulative CPU: 14.33 sec HDFS Read: 382748285 HDFS Write: 1495 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 1.96 sec HDFS Read: 6793 HDFS Write: 187 SUCCESS
Total MapReduce CPU Time Spent: 16 seconds 290 msec
OK
LARCENY/THEFT 385774
OTHER OFFENSES 269250
NON-CRIMINAL 200942
ASSAULT 165324
VEHICLE THEFT 114258
DRUG/NARCOTIC 111436
VANDALISM 96350
WARRANTS 89782
BURGLARY 78968
SUSPICIOUS OCC 67788
Time taken: 52.793 seconds, Fetched: 10 row(s)
Explain plan also shows that the above query does not leverage the index.
hive> explain select category, count(*) cnt from sfpd group by category order by cnt desc limit 10;
OK
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: sfpd
Statistics: Num rows: 3827253 Data size: 382725376 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: category (type: string)
outputColumnNames: category
Statistics: Num rows: 3827253 Data size: 382725376 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
keys: category (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 3827253 Data size: 382725376 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 3827253 Data size: 382725376 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 1913626 Data size: 191362637 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col1 (type: bigint)
sort order: -
Statistics: Num rows: 1913626 Data size: 191362637 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string)
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: bigint)
outputColumnNames: _col0, _col1
Statistics: Num rows: 1913626 Data size: 191362637 Basic stats: COMPLETE Column stats: NONE
Limit
Number of rows: 10
Statistics: Num rows: 10 Data size: 1000 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 10 Data size: 1000 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: 10
Processor Tree:
ListSink
Time taken: 0.088 seconds, Fetched: 73 row(s)
Before we see how to index data can be helpful, drop the existing bitmap index, and create compact index.
hive> DROP INDEX sfpd_index_bitmap ON sfpd;
OK
Time taken: 0.174 seconds
Create a new compact index.
hive> CREATE INDEX sfpd_index_compact ON TABLE sfpd (category) AS 'COMPACT' WITH DEFERRED REBUILD STORED AS RCFILE;
OK
Time taken: 0.272 seconds
hive> show index on sfpd;
OK
sfpd_index_compact sfpd category default__sfpd_sfpd_index_compact__ compact
Time taken: 0.026 seconds, Fetched: 1 row(s)
hive> describe default__sfpd_sfpd_index_compact__;
OK
category string from deserializer
_bucketname string
_offsets array<bigint>
Time taken: 0.06 seconds, Fetched: 3 row(s)
Build index data.
hive> ALTER INDEX sfpd_index_compact on sfpd REBUILD;
Query ID = cloudera_20191127094242_78d51c52-6c7a-4efb-aaef-ba82c5d93ed2
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 2
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1574873894464_0009, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1574873894464_0009/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1574873894464_0009
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 2
2019-11-27 09:42:10,211 Stage-1 map = 0%, reduce = 0%
2019-11-27 09:42:24,714 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 6.59 sec
2019-11-27 09:42:27,275 Stage-1 map = 67%, reduce = 0%, Cumulative CPU 15.43 sec
2019-11-27 09:42:32,631 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 17.86 sec
2019-11-27 09:42:36,800 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 23.26 sec
MapReduce Total cumulative CPU time: 23 seconds 260 msec
Ended Job = job_1574873894464_0009
Loading data to table default.default__sfpd_sfpd_index_compact__
Table default.default__sfpd_sfpd_index_compact__ stats: [numFiles=2, numRows=39, totalSize=18342174, rawDataSize=18341656]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 2 Cumulative CPU: 23.26 sec HDFS Read: 382753197 HDFS Write: 18342381 SUCCESS
Total MapReduce CPU Time Spent: 23 seconds 260 msec
OK
Time taken: 34.69 seconds
The original query to return 10 most frequent incident categories can be written as below that runs significantly faster (18 secs vs 48 secs).
hive> select category, size(`_offsets`) cnt from default__sfpd_sfpd_index_compact__ order by cnt desc limit 10;
Query ID = cloudera_20191127095353_abedf429-46a5-4d01-a56c-b02978f6ee72
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1574873894464_0013, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1574873894464_0013/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1574873894464_0013
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-11-27 09:53:55,731 Stage-1 map = 0%, reduce = 0%
2019-11-27 09:54:00,860 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.11 sec
2019-11-27 09:54:07,053 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.28 sec
MapReduce Total cumulative CPU time: 2 seconds 280 msec
Ended Job = job_1574873894464_0013
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.28 sec HDFS Read: 18345706 HDFS Write: 187 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 280 msec
OK
LARCENY/THEFT 385774
OTHER OFFENSES 269250
NON-CRIMINAL 200942
ASSAULT 165324
VEHICLE THEFT 114258
DRUG/NARCOTIC 111436
VANDALISM 96350
WARRANTS 89782
BURGLARY 78968
SUSPICIOUS OCC 67788
Time taken: 17.901 seconds, Fetched: 10 row(s)
Refer the article below to see to how queries can be re-written to take advantage of index.
https://www.slideshare.net/NikhilDeshpande/indexed-hive