HBase - Bulk Load Into HBase Table

Goal: Load csv into hbase table using importtsv utility

Options: If you have the following options for loading (or extracting) data into HBase table

  1. If your data is nice and clean delimited file, you can use import tsv command. It is easy to use as it is a command line driven. It kicks off the map reduce job to load the data to HBase table in parallel. One drawback, the data have to be clean. No transformation capability.
  2. Create a hive table on the target hbase table using hbase serde and use hive import to load data into the hbase table. It works very well pretty much all time. Parallelism is free. It is easy to use - requires SQL skills. In most of scenario, I would recommend this approach.
  3. Use mapreduce job. Great, but remember mapreduce it going away.
  4. Use spark to build a sophisticated transformation logic. Spark gives cheap way to implement parallel loading. It is a great option if you are dealing with images or some binary values. Here is a sample project. One drawback - you have to write code.
  5. If you want to load data from RDBMS to HBase, sqoop can be a good choice. You have limited option for data transformation based on sqoop SQL query option. You can use sqoop job for incremental load.
  6. Use a java application. Not recommended for bulk loading. You have to implement multi threading to parallelize the load process lest it will be slow loading processing. It may be useful to integrate HBase with legacy application. However, if you are using hbase as operational database behind an application server, it is just a perfect way of integrating with Hbase.

Launch hbase shell

$hbase shell

Create a table stocks

hbase> create 'stocks', 'info'

Load stocks price data into hdfs directory. You can use hadoop fs -put command or Hue to load the file into HDFS.

$ hadoop fs -mkdir -p stocks
$ hadoop fs -put stocks.csv stocks

Run the following few steps in hive to create a new dataset with a primary key.

hive> create table stocks_extended like stocks; 
hive> DESCRIBE stocks_extended;
hive> alter table stocks_extended add columns (key string);
hive> insert into stocks_extended select *, concat(tr_date, '-', symbol) from stocks;
hive> select * from stocks_extended limit 10;
$ hadoop fs -cat /user/hive/warehouse/stocks_extended/000000_0 | head
"2000-07-17","95.4375","97.5","92.75","96.625","3508100.0","74.269199","XLNX","2000-07-17-XLNX"
"2000-07-17","22.625","22.75","22.4375","22.5625","201600.0","13.48614","ES","2000-07-17-ES"
"2000-07-17","6.750002","6.937503","6.375","6.5","1235700.0","5.241649","CHK","2000-07-17-CHK"
"2000-07-17","19.812501","20.1875","19.500001","20.1875","1434100.0","3.806147","NI","2000-07-17-NI"
"2000-07-17","30.5","30.6875","30.0","30.03125","254600.0","19.81183","SNA","2000-07-17-SNA"
"2000-07-17","44.749996","45.062498","44.500004","45.000009","535200.0","17.400773","FOXA","2000-07-17-FOXA"
"2000-07-17","19.625","19.625","19.25","19.375","309500.0","13.768835","R","2000-07-17-R"
"2000-07-17","16.6562","16.6875","16.125","16.25","5507200.0","1.755466","ROST","2000-07-17-ROST"
"2000-07-17","56.25","57.25","56.0625","56.125","7941200.0","18.31076","PG","2000-07-17-PG"
"2000-07-17","54.000326","54.000326","52.500318","53.375325","3725000.0","71.068871","TYC","2000-07-17-TYC"

Import data into hbase table stocks created earlier.

$ export HADOOP_CLASSPATH=`hbase classpath`

$ hadoop jar /usr/lib/hbase/hbase-server.jar importtsv -Dimporttsv.columns=info:date,info:open,info:high,info:low,info:close,info:volume,info:adjClose,info:symbol,HBASE_ROW_KEY '-Dimporttsv.separator=,' stocks "/user/hive/warehouse/stocks_extended/*"

A few important counters to notice

  • Map-Reduce Framework:Map input records -> this should match to the number of records in the input file
  • Job Counters: Launched map tasks -> indicate amount of parallelism in the load process. To speed up load process, you may like to increase this count by setting dfs.block.size for while creating the source file.
  • ImportTsv: Bad Lines -> number of lines that could not be processed.

Verify Data by doing a few manual spot checking

hbase> count 'stocks' #returns the number of records in stocks table
hbase> scan 'stocks',{LIMIT => 3} #returns first 3 records from the table ordered by key value
hbase> scan 'stocks',{LIMIT => 3, REVERSED => true} #returns first 3 records ordered by row in desc order
hbase> scan 'stocks',{COLUMNS => ['info'], LIMIT => 3} #returns the volume column for first 3 records
hbase> get 'stocks', 'GE-2016-08-15' #return the record with row key value
hbase> scan 'stocks', {COLUMNS => ['info:close'], LIMIT => 100, STARTROW => 'GE-2008-03'} # Find GE's closing price in 2008-03.

Query HBase Table Using Hive

Load hive with necessary libraries. Change the zookeeper quorum.

$ hive --auxpath /usr/lib/hive/lib/hive-hbase-handler.jar,/usr/lib/hive/lib/hbase-server.jar,usr/lib/hive/lib/guava14.0.1.jar,usr/lib/hive/lib/zookeeper.jar --hiveconf hbase.zookeeper.quorum=localhost:2181

Create hive table

hive>

CREATE EXTERNAL TABLE `stocks_hbase`(

`key` string COMMENT 'Key of stock price record',

`date` string COMMENT 'Date of stock price',

`open` string COMMENT 'Opening price of the day',

`high` string COMMENT 'Higest price during the day',

`low` string COMMENT 'Lowest price during the day',

`close` string COMMENT 'Closing price of the day',

`volume` string COMMENT 'Volume of stocks traded during the day',

`adjClose` string COMMENT 'Adjusted closing price',

`symbol` string COMMENT 'Stock Symbol')

ROW format serde 'org.apache.hadoop.hive.hbase.HBaseSerDe'

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

WITH SERDEPROPERTIES('hbase.columns.mapping' = ':key,info:date,info:open,info:high,info:low,info:close,info:volume,info:adjClose,info:symbol')

TBLPROPERTIES ('hbase.table.name' = 'stocks');

Now, you are ready to query using hive

hive> select avg(volume) `avg volume` from stocks;

Find average volume when stocks traded low during close compared to when they were traded high

hive> select t1.`traded low`, avg(t1.volume) `avg vol` from (select t.*, t.close < t.open `traded low` from stocks t) t1 group by t1.`traded low`;

Output of the above query should return: false 8745048.2421875 true 8401769.5

Explore Hbase files for the stocks table

$ hdfs dfs -ls /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93
Found 4 items
-rw-r--r-- 1 hbase hadoop 41 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/.regioninfo
drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/prices
drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/recovered.edits
drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/volume

Importing Data From RDBMS Table Using Sqoop

Using hbase shell, create a table 'customers' with column family 'info'.

hbase> create 'customers', 'info'

Run sqoop import to import customers table to hbase customers table.

$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table customers --hbase-table customers --column-family info -m 1

Import will pass the data through memstore. If you run 'flush' command on the table name, then hbase will write the data as hfile. Every memstore flush create a HFile per column family.

Scan all rows for which row key starts with 1000

hbase > scan 'customers', {ROWPREFIXFILTER => '1000'}

Key Considerations During Importing Data into HBase Table

  1. Row key design:
    • it controls how the read-write will be parallelized across region servers
    • how you localize your scan on row keys ranges - think from common usage pattern
  2. Column Family Design
    • Put columns that are accessed together most often are created under the same column family
    • Consider cac