HBase - Bulk Load Into HBase Table
Goal: Load csv into hbase table using importtsv utility
Options: If you have the following options for loading (or extracting) data into HBase table
If your data is nice and clean delimited file, you can use import tsv command. It is easy to use as it is a command line driven. It kicks off the map reduce job to load the data to HBase table in parallel. One drawback, the data have to be clean. No transformation capability.
Create a hive table on the target hbase table using hbase serde (org.apache.hadoop.hive.hbase.HBaseSerDe) and use hive import to load data into the hbase table. It works very well pretty much all time. Parallelism is free. It is easy to use - requires SQL skills. In most of scenario, I would recommend this approach. It uses RPC calls to load the data into HBase table. A faster will be to create HFiles and dumps the HFiles into HBase file location.
Use mapreduce job. Great, but remember mapreduce it going away.
Use spark to build a sophisticated transformation logic. Spark gives cheap way to implement parallel loading. It is a great option if you are dealing with images or some binary values. Here is a sample project. One drawback - you have to write code.
If you want to load data from RDBMS to HBase, sqoop can be a good choice. You have limited option for data transformation based on sqoop SQL query option. You can use sqoop job for incremental load.
Use a java application. Not recommended for bulk loading. You have to implement multi threading to parallelize the load process lest it will be slow loading processing. It may be useful to integrate HBase with legacy application. However, if you are using hbase as operational database behind an application server, it is just a perfect way of integrating with Hbase.
Create HFile using Hive and dump the HFiles into HBase file directory. It works when you want to load data into a single column family. It approach avoids the RPC channel. Here is details steps https://community.cloudera.com/t5/Community-Articles/Creating-HBase-HFiles-From-a-Hive-Table/ta-p/244627
Launch hbase shell
$ hbase shell
Create a table stocks
hbase> create 'stocks', 'info', 'price'
Load stocks price data into hdfs directory. You can use hadoop fs -put command or Hue to load the file into HDFS. Sample file https://github.com/abulbasar/data/blob/master/stocks.small.csv. Use the following steps to form a unique identifier (date + symbol) for each record. To avoid this extra data preparation steps you can use https://github.com/abulbasar/data/blob/master/stocks.small.row-key.csv
$ hadoop fs -mkdir -p stocks
$ hadoop fs -put stocks.csv stocks
Run the following few steps in hive to create a new dataset with a primary key.
hive> create external table stocks(
tr_date String,
open Double,
high Double,
low Double,
close Double,
volume Double,
adjClose Double,
symbol String)
row format delimited
fields terminated by ','
stored as textfile
location '/user/cloudera/stocks'
TBLPROPERTIES("skip.header.line.count"="1");
hive> create table stocks_extended like stocks
hive> alter table stocks_extended add (key string)
hive> insert into stocks_extended select *, concat(tr_date, '-', symbol) key from stocks;
hive> select * from stocks_extended limit 10;
$ hadoop fs -cat /user/hive/warehouse/stocks_extended/000000_0 | head
2000-07-17-XLNX2000-07-1795.437597.592.7596.6253508100.074.269199XLNX
2000-07-17-ES2000-07-1722.62522.7522.437522.5625201600.013.48614ES
2000-07-17-CHK2000-07-176.7500026.9375036.3756.51235700.05.241649CHK
2000-07-17-NI2000-07-1719.81250120.187519.50000120.18751434100.03.806147NI
2000-07-17-SNA2000-07-1730.530.687530.030.03125254600.019.81183SNA
2000-07-17-FOXA2000-07-1744.74999645.06249844.50000445.000009535200.017.400773FOXA
2000-07-17-R2000-07-1719.62519.62519.2519.375309500.013.768835R
2000-07-17-ROST2000-07-1716.656216.687516.12516.255507200.01.755466ROST
2000-07-17-PG2000-07-1756.2557.2556.062556.1257941200.018.31076PG
2000-07-17-TYC2000-07-1754.00032654.00032652.50031853.3753253725000.071.068871TYC
Import data into hbase table stocks created earlier.
$ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,price:date,price:open,price:high,price:low,price:close,price:volume,price:adjClose,info:symbol '-Dimporttsv.separator=,' stocks "/user/hive/warehouse/stocks_extended/*"
Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>
...
By default importtsv will load data directly into HBase. To instead generate
HFiles of data to prepare for a bulk data load, pass the option:
-Dimporttsv.bulk.output=/path/for/output
Note: if you do not use this option, then the target table must already exist in HBase
Other options that may be specified with -D include:
-Dimporttsv.dry.run=true - Dry run mode. Data is not actually populated into table. If table does not exist, it is created but deleted in the end.
-Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line
-Dimporttsv.log.bad.lines=true - logs invalid lines to stderr
-Dimporttsv.skip.empty.columns=false - If true then skip empty columns in bulk import
'-Dimporttsv.separator=|' - eg separate on pipes instead of tabs
-Dimporttsv.timestamp=currentTimeAsLong - use the specified timestamp for the import
-Dimporttsv.mapper.class=my.Mapper - A user-defined Mapper to use instead of org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
-Dmapreduce.job.name=jobName - use the specified mapreduce job name for the import
-Dcreate.table=no - can be used to avoid creation of table by this tool
Note: if you set this to 'no', then the target table must already exist in HBase
-Dno.strict=true - ignore column family check in hbase table. Default is false
For performance consider the following options:
-Dmapreduce.map.speculative=false
-Dmapreduce.reduce.speculative=false
A few important counters to notice
Map-Reduce Framework:Map input records -> this should match to the number of records in the input file
Job Counters: Launched map tasks -> indicate amount of parallelism in the load process. To speed up load process, you may like to increase this count by setting dfs.block.size for while creating the source file.
ImportTsv: Bad Lines -> number of lines that could not be processed.
Verify Data by doing a few manual spot checking
hbase> count 'stocks' #returns the number of records in stocks table
hbase> scan 'stocks',{LIMIT => 3} #returns first 3 records from the table ordered by key value
hbase> scan 'stocks',{LIMIT => 3, REVERSED => true} #returns first 3 records ordered by row in desc order
hbase> scan 'stocks',{COLUMNS => ['info'], LIMIT => 3} #returns the volume column for first 3 records
hbase> get 'stocks', 'GE-2016-08-15' #return the record with row key value
hbase> scan 'stocks', {COLUMNS => ['price:close'], LIMIT => 100, ROWPREFIXFILTER => 'GE'} # Find GE's closing prices.
Query HBase Table Using Hive
Load hive with necessary libraries. Change the zookeeper quorum.
$ hive --auxpath /usr/lib/hive/lib/hive-hbase-handler.jar,/usr/lib/hive/lib/hbase-server.jar,usr/lib/hive/lib/guava14.0.1.jar,usr/lib/hive/lib/zookeeper.jar --hiveconf hbase.zookeeper.quorum=localhost:2181
Create hive table
hive>
CREATE EXTERNAL TABLE `stocks_hbase`(
`key` string COMMENT 'Key of stock price record',
`date` string COMMENT 'Date of stock price',
`open` string COMMENT 'Opening price of the day',
`high` string COMMENT 'Higest price during the day',
`low` string COMMENT 'Lowest price during the day',
`close` string COMMENT 'Closing price of the day',
`volume` string COMMENT 'Volume of stocks traded during the day',
`adjClose` string COMMENT 'Adjusted closing price',
`symbol` string COMMENT 'Stock Symbol')
ROW format serde 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES('hbase.columns.mapping' = ':key,price:date,price:open,price:high,price:low,price:close,price:volume,price:adjClose,info:symbol')
TBLPROPERTIES ('hbase.table.name' = 'stocks');
Now, you are ready to query using hive
hive> select avg(volume) `avg volume` from stocks;
Find average volume when stocks traded low during close compared to when they were traded high
hive> select t1.`traded low`, avg(t1.volume) `avg vol` from (select t.*, t.close < t.open `traded low` from stocks_hbase t) t1 group by t1.`traded low`;
If you store the columns in raw bytes, rather string encoded bytes, you have defined the hive table as below. Note: #b at the end of each field reference.
CREATE EXTERNAL TABLE `stocks_hbase_bin`(
`key` string COMMENT 'Key of stock price record',
`date` string COMMENT 'Date of stock price',
`open` double COMMENT 'Opening price of the day',
`high` double COMMENT 'Higest price during the day',
`low` double COMMENT 'Lowest price during the day',
`close` double COMMENT 'Closing price of the day',
`volume` double COMMENT 'Volume of stocks traded during the day',
`adjClose` double COMMENT 'Adjusted closing price',
`symbol` string COMMENT 'Stock Symbol')
ROW format serde 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES('hbase.columns.mapping' = ':key#b,price:date#b,price:open#b,price:high#b,price:low#b,price:close#b,price:volume#b,price:adjClose#b,info:symbol#b')
TBLPROPERTIES ('hbase.table.name' = 'stocks_bin');
Output of the above query should return:
+----------------+---------------------+
| t1.traded low | avg vol |
+----------------+---------------------+
| true | 5430525.2155172415 |
| false | 4965218.063112078 |
+----------------+---------------------+
Explore Hbase files for the stocks table
$ hdfs dfs -ls /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93
Found 4 items
-rw-r--r-- 1 hbase hadoop 41 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/.regioninfo
drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/prices
drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/recovered.edits
drwxr-xr-x - hbase hadoop 0 2016-08-16 18:46 /hbase/data/default/stocks/625e61d9708a2db3377d20f148f93c93/volume
Importing Data From RDBMS Table Using Sqoop
Using hbase shell, create a table 'customers' with column family 'info'.
hbase> create 'customers', 'info'
Run sqoop import to import customers table to hbase customers table.
$ sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table customers --hbase-table customers --column-family info -m 1
Import will pass the data through memstore. If you run 'flush' command on the table name, then hbase will write the data as hfile. Every memstore flush create a HFile per column family.
Scan all rows for which row key starts with 1000
hbase > scan 'customers', {ROWPREFIXFILTER => '1000'}
Key Considerations During Importing Data into HBase Table
Row key design:
it controls how the read-write will be parallelized across region servers
how you localize your scan on row keys ranges - think from common usage pattern
Column Family Design
Put columns that are accessed together most often are created under the same column family
Consider cac
Filters
Compare Operator and Comparators
Compare Operator
A compare operator can be any of the following:
LESS (<)
LESS_OR_EQUAL (<=)
EQUAL (=)
NOT_EQUAL (!=)
GREATER_OR_EQUAL (>=)
GREATER (>)
NO_OP (no operation)
The client should use the symbols (<, <=, =, !=, >, >=) to express compare operators.
Comparators
A comparator can be any of the following:
BinaryComparator – This lexicographically compares against the specified byte array using Bytes.compareTo(byte[], byte[])
BinaryPrefixComparator – This lexicographically compares against a specified byte array. It only compares up to the length of this byte array.
RegexStringComparator – This compares against the specified byte array using the given regular expression. Only EQUAL and NOT_EQUAL comparisons are valid with this comparator
SubStringComparator – This tests if the given substring appears in a specified byte array. The comparison is case insensitive. Only EQUAL and NOT_EQUAL comparisons are valid with this comparator
The ComparatorValue can be any value.
Example1: >, ‘binary:abc’ will match everything that is lexicographically greater than “abc”
Example2: =, ‘binaryprefix:abc’ will match everything whose first 3 characters are lexicographically equal to “abc”
Example3: !=, ‘regexstring:ab*yz’ will match everything that doesn’t begin with “ab” and ends with “yz”
Example4: =, ‘substring:abc123’ will match everything that begins with the substring “abc123”
Sub String Comparator in HBase Shell
hbase(main):017:0> import org.apache.hadoop.hbase.filter.CompareFilter
=> [Java::OrgApacheHadoopHbaseFilter::CompareFilter]
hbase(main):018:0> import org.apache.hadoop.hbase.filter.SubstringComparator
=> [Java::OrgApacheHadoopHbaseFilter::SubstringComparator]
hbase(main):025:0> scan "stocks", {FILTER => org.apache.hadoop.hbase.filter.RowFilter.new(CompareFilter::CompareOp.valueOf('EQUAL'),SubstringComparator.new("2016-03-14"))}
ROW COLUMN+CELL
YUM-2016-03-14 column=info:symbol, timestamp=1589223858243, value=YUM
YUM-2016-03-14 column=price:adjClose, timestamp=1589223858243, value=76.703856
YUM-2016-03-14 column=price:close, timestamp=1589223858243, value=77.550003
YUM-2016-03-14 column=price:date, timestamp=1589223858243, value=2016-03-14
YUM-2016-03-14 column=price:high, timestamp=1589223858243, value=77.75
YUM-2016-03-14 column=price:low, timestamp=1589223858243, value=76.919998
YUM-2016-03-14 column=price:open, timestamp=1589223858243, value=77.68