HBase - Bulk Load Into HBase Table

Goal: Load csv into hbase table using importtsv utility

Options: If you have the following options for loading (or extracting) data into HBase table

  1. If your data is nice and clean delimited file, you can use import tsv command. It is easy to use as it is a command line driven. It kicks off the map reduce job to load the data to HBase table in parallel. One drawback, the data have to be clean. No transformation capability.

  2. Create a hive table on the target hbase table using hbase serde (org.apache.hadoop.hive.hbase.HBaseSerDe) and use hive import to load data into the hbase table. It works very well pretty much all time. Parallelism is free. It is easy to use - requires SQL skills. In most of scenario, I would recommend this approach. It uses RPC calls to load the data into HBase table. A faster will be to create HFiles and dumps the HFiles into HBase file location.

  3. Use mapreduce job. Great, but remember mapreduce it going away.

  4. Use spark to build a sophisticated transformation logic. Spark gives cheap way to implement parallel loading. It is a great option if you are dealing with images or some binary values. Here is a sample project. One drawback - you have to write code.

  5. If you want to load data from RDBMS to HBase, sqoop can be a good choice. You have limited option for data transformation based on sqoop SQL query option. You can use sqoop job for incremental load.

  6. Use a java application. Not recommended for bulk loading. You have to implement multi threading to parallelize the load process lest it will be slow loading processing. It may be useful to integrate HBase with legacy application. However, if you are using hbase as operational database behind an application server, it is just a perfect way of integrating with Hbase.

  7. Create HFile using Hive and dump the HFiles into HBase file directory. It works when you want to load data into a single column family. It approach avoids the RPC channel. Here is details steps https://community.cloudera.com/t5/Community-Articles/Creating-HBase-HFiles-From-a-Hive-Table/ta-p/244627

Launch hbase shell

$ hbase shell

Create a table stocks

hbase> create 'stocks', 'info', 'price'

Load stocks price data into hdfs directory. You can use hadoop fs -put command or Hue to load the file into HDFS. Sample file https://github.com/abulbasar/data/blob/master/stocks.small.csv. Use the following steps to form a unique identifier (date + symbol) for each record. To avoid this extra data preparation steps you can use https://github.com/abulbasar/data/blob/master/stocks.small.row-key.csv

$ hadoop fs -mkdir -p stocks

$ hadoop fs -put stocks.csv stocks

Run the following few steps in hive to create a new dataset with a primary key.

hive> create external table stocks(

tr_date String,

open Double,

high Double,

low Double,

close Double,

volume Double,

adjClose Double,

symbol String)

row format delimited

fields terminated by ','

stored as textfile

location '/user/cloudera/stocks'

TBLPROPERTIES("skip.header.line.count"="1");

hive> create table stocks_extended like stocks

hive> alter table stocks_extended add (key string)

hive> insert into stocks_extended select *, concat(tr_date, '-', symbol) key from stocks;

hive> select * from stocks_extended limit 10;

$ hadoop fs -cat /user/hive/warehouse/stocks_extended/000000_0 | head

2000-07-17-XLNX2000-07-1795.437597.592.7596.6253508100.074.269199XLNX

2000-07-17-ES2000-07-1722.62522.7522.437522.5625201600.013.48614ES

2000-07-17-CHK2000-07-176.7500026.9375036.3756.51235700.05.241649CHK

2000-07-17-NI2000-07-1719.81250120.187519.50000120.18751434100.03.806147NI

2000-07-17-SNA2000-07-1730.530.687530.030.03125254600.019.81183SNA

2000-07-17-FOXA2000-07-1744.74999645.06249844.50000445.000009535200.017.400773FOXA

2000-07-17-R2000-07-1719.62519.62519.2519.375309500.013.768835R

2000-07-17-ROST2000-07-1716.656216.687516.12516.255507200.01.755466ROST

2000-07-17-PG2000-07-1756.2557.2556.062556.1257941200.018.31076PG

2000-07-17-TYC2000-07-1754.00032654.00032652.50031853.3753253725000.071.068871TYC



Import data into hbase table stocks created earlier.

$ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,price:date,price:open,price:high,price:low,price:close,price:volume,price:adjClose,info:symbol '-Dimporttsv.separator=,' stocks "/user/hive/warehouse/stocks_extended/*"


Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>

...

By default importtsv will load data directly into HBase. To instead generate

HFiles of data to prepare for a bulk data load, pass the option:

-Dimporttsv.bulk.output=/path/for/output

Note: if you do not use this option, then the target table must already exist in HBase

Other options that may be specified with -D include:

-Dimporttsv.dry.run=true - Dry run mode. Data is not actually populated into table. If table does not exist, it is created but deleted in the end.

-Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line

-Dimporttsv.log.bad.lines=true - logs invalid lines to stderr

-Dimporttsv.skip.empty.columns=false - If true then skip empty columns in bulk import

'-Dimporttsv.separator=|' - eg separate on pipes instead of tabs

-Dimporttsv.timestamp=currentTimeAsLong - use the specified timestamp for the import

-Dimporttsv.mapper.class=my.Mapper - A user-defined Mapper to use instead of org.apache.hadoop.hbase.mapreduce.TsvImporterMapper

-Dmapreduce.job.name=jobName - use the specified mapreduce job name for the import

-Dcreate.table=no - can be used to avoid creation of table by this tool

Note: if you set this to 'no', then the target table must already exist in HBase

-Dno.strict=true - ignore column family check in hbase table. Default is false

For performance consider the following options:

-Dmapreduce.map.speculative=false

-Dmapreduce.reduce.speculative=false


A few important counters to notice

  • Map-Reduce Framework:Map input records -> this should match to the number of records in the input file

  • Job Counters: Launched map tasks -> indicate amount of parallelism in the load process. To speed up load process, you may like to increase this count by setting dfs.block.size for while creating the source file.

  • ImportTsv: Bad Lines -> number of lines that could not be processed.

Verify Data by doing a few manual spot checking

hbase> count 'stocks' #returns the number of records in stocks table

hbase> scan 'stocks',{LIMIT => 3} #returns first 3 records from the table ordered by key value

hbase> scan 'stocks',{LIMIT => 3, REVERSED => true} #returns first 3 records ordered by row in desc order

hbase> scan 'stocks',{COLUMNS => ['info'], LIMIT => 3} #returns the volume column for first 3 records

hbase> get 'stocks', 'GE-2016-08-15' #return the record with row key value

hbase> scan 'stocks', {COLUMNS => ['price:close'], LIMIT => 100, ROWPREFIXFILTER => 'GE'} # Find GE's closing prices.

Query HBase Table Using Hive

Load hive with necessary libraries. Change the zookeeper quorum.

$ hive --auxpath /usr/lib/hive/lib/hive-hbase-handler.jar,/usr/lib/hive/lib/hbase-server.jar,usr/lib/hive/lib/guava14.0.1.jar,usr/lib/hive/lib/zookeeper.jar --hiveconf hbase.zookeeper.quorum=localhost:2181

Create hive table

hive>

CREATE EXTERNAL TABLE `stocks_hbase`(

`key` string COMMENT 'Key of stock price record',

`date` string COMMENT 'Date of stock price',

`open` string COMMENT 'Opening price of the day',

`high` string COMMENT 'Higest price during the day',

`low` string COMMENT 'Lowest price during the day',

`close` string COMMENT 'Closing price of the day',

`volume` string COMMENT 'Volume of stocks traded during the day',

`adjClose` string COMMENT 'Adjusted closing price',

`symbol` string COMMENT 'Stock Symbol')

ROW format serde 'org.apache.hadoop.hive.hbase.HBaseSerDe'

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

WITH SERDEPROPERTIES('hbase.columns.mapping' = ':key,price:date,price:open,price:high,price:low,price:close,price:volume,price:adjClose,info:symbol')

TBLPROPERTIES ('hbase.table.name' = 'stocks');

Now, you are ready to query using hive

hive> select avg(volume) `avg volume` from stocks;

Find average volume when stocks traded low during close compared to when they were traded high

hive> select t1.`traded low`, avg(t1.volume) `avg vol` from (select t.*, t.close < t.open `traded low` from stocks_hbase t) t1 group by t1.`traded low`;




If you store the columns in raw bytes, rather string encoded bytes, you have defined the hive table as below. Note: #b at the end of each field reference.

CREATE EXTERNAL TABLE `stocks_hbase_bin`(

`key` string COMMENT 'Key of stock price record',

`date` string COMMENT 'Date of stock price',

`open` double COMMENT 'Opening price of the day',

`high` double COMMENT 'Higest price during the day',

`low` double COMMENT 'Lowest price during the day',

`close` double COMMENT 'Closing price of the day',

`volume` double COMMENT 'Volume of stocks traded during the day',

`adjClose` double COMMENT 'Adjusted closing price',

`symbol` string COMMENT 'Stock Symbol')

ROW format serde 'org.apache.hadoop.hive.hbase.HBaseSerDe'

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

WITH SERDEPROPERTIES('hbase.columns.mapping' = ':key#b,price:date#b,price:open#b,price:high#b,price:low#b,price:close#b,price:volume#b,price:adjClose#b,info:symbol#b')

TBLPROPERTIES ('hbase.table.name' = 'stocks_bin');




Output of the above query should return:



+----------------+---------------------+

| t1.traded low | avg vol |

+----------------+---------------------+

| true | 5430525.2155172415 |

| false | 4965218.063112078 |

+----------------+---------------------+

Explore Hbase files for the stocks table

$ hdfs dfs -ls /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93

Found 4 items

-rw-r--r-- 1 hbase hadoop 41 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/.regioninfo

drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/prices

drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/recovered.edits

drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/volume




Importing Data From RDBMS Table Using Sqoop

Using hbase shell, create a table 'customers' with column family 'info'.

hbase> create 'customers', 'info'

Run sqoop import to import customers table to hbase customers table.

$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table customers --hbase-table customers --column-family info -m 1

Import will pass the data through memstore. If you run 'flush' command on the table name, then hbase will write the data as hfile. Every memstore flush create a HFile per column family.

Scan all rows for which row key starts with 1000

hbase > scan 'customers', {ROWPREFIXFILTER => '1000'}

Key Considerations During Importing Data into HBase Table

  1. Row key design:

    • it controls how the read-write will be parallelized across region servers

    • how you localize your scan on row keys ranges - think from common usage pattern

  2. Column Family Design

    • Put columns that are accessed together most often are created under the same column family

    • Consider cac

Filters


Compare Operator and Comparators

Compare Operator

A compare operator can be any of the following:

  1. LESS (<)

  2. LESS_OR_EQUAL (<=)

  3. EQUAL (=)

  4. NOT_EQUAL (!=)

  5. GREATER_OR_EQUAL (>=)

  6. GREATER (>)

  7. NO_OP (no operation)

The client should use the symbols (<, <=, =, !=, >, >=) to express compare operators.

Comparators

A comparator can be any of the following:

  1. BinaryComparator – This lexicographically compares against the specified byte array using Bytes.compareTo(byte[], byte[])

  2. BinaryPrefixComparator – This lexicographically compares against a specified byte array. It only compares up to the length of this byte array.

  3. RegexStringComparator – This compares against the specified byte array using the given regular expression. Only EQUAL and NOT_EQUAL comparisons are valid with this comparator

  4. SubStringComparator – This tests if the given substring appears in a specified byte array. The comparison is case insensitive. Only EQUAL and NOT_EQUAL comparisons are valid with this comparator


The ComparatorValue can be any value.

Example1: >, ‘binary:abc’ will match everything that is lexicographically greater than “abc”

Example2: =, ‘binaryprefix:abc’ will match everything whose first 3 characters are lexicographically equal to “abc”

Example3: !=, ‘regexstring:ab*yz’ will match everything that doesn’t begin with “ab” and ends with “yz”

Example4: =, ‘substring:abc123’ will match everything that begins with the substring “abc123”


Sub String Comparator in HBase Shell

hbase(main):017:0> import org.apache.hadoop.hbase.filter.CompareFilter

=> [Java::OrgApacheHadoopHbaseFilter::CompareFilter]

hbase(main):018:0> import org.apache.hadoop.hbase.filter.SubstringComparator

=> [Java::OrgApacheHadoopHbaseFilter::SubstringComparator]

hbase(main):025:0> scan "stocks", {FILTER => org.apache.hadoop.hbase.filter.RowFilter.new(CompareFilter::CompareOp.valueOf('EQUAL'),SubstringComparator.new("2016-03-14"))}

ROW COLUMN+CELL

YUM-2016-03-14 column=info:symbol, timestamp=1589223858243, value=YUM

YUM-2016-03-14 column=price:adjClose, timestamp=1589223858243, value=76.703856

YUM-2016-03-14 column=price:close, timestamp=1589223858243, value=77.550003

YUM-2016-03-14 column=price:date, timestamp=1589223858243, value=2016-03-14

YUM-2016-03-14 column=price:high, timestamp=1589223858243, value=77.75

YUM-2016-03-14 column=price:low, timestamp=1589223858243, value=76.919998

YUM-2016-03-14 column=price:open, timestamp=1589223858243, value=77.68