Pyspark working with HBase

Install happybase using pip command.

$ sudo pip3 install happybase
[sudo] password for mapr: 
Collecting happybase
  Downloading happybase-1.1.0.tar.gz (40kB)
    100% |████████████████████████████████| 40kB 114kB/s 
Requirement already satisfied: six in /usr/lib/python3.4/site-packages (from happybase)
Collecting thriftpy>=0.3.8 (from happybase)
  Downloading thriftpy-0.3.9.tar.gz (208kB)
    100% |████████████████████████████████| 215kB 131kB/s 
Collecting ply<4.0,>=3.4 (from thriftpy>=0.3.8->happybase)
  Downloading ply-3.10.tar.gz (150kB)
    100% |████████████████████████████████| 153kB 146kB/s 
Building wheels for collected packages: happybase, thriftpy, ply
  Running setup.py bdist_wheel for happybase ... done
  Stored in directory: /root/.cache/pip/wheels/60/96/09/7df5bb12d43abe4bb4b767ccff29ca252f7688fcd1b5c2b467
  Running setup.py bdist_wheel for thriftpy ... done
  Stored in directory: /root/.cache/pip/wheels/ae/3b/73/082d28b917d2886b1c0d3e0dc6f3ca3919b7e1df519580873b
  Running setup.py bdist_wheel for ply ... done
  Stored in directory: /root/.cache/pip/wheels/ad/dd/ad/8ce1991a7b380dfe23d6cc81a4de5c2775bc728b5a0a7721aa
Successfully built happybase thriftpy ply
Installing collected packages: ply, thriftpy, happybase
Successfully installed happybase-1.1.0 ply-3.10 thriftpy-0.3.9

Launch Hbase shell and create a table called stocks with a column family "info".

hbase(main):014:0> create 'stocks', 'info'
0 row(s) in 0.0120 seconds
=> Hbase::Table - /tables/stocks 

Launch pyspark and test that you can save data into HBase table.

import happybase
server = "localhost"
table_name = "/tables/stocks"
table = happybase.Connection(server).table(table_name) 
table.put("r1", {"info:c1": "v1"})
table.put("r1", {"info:c2": "v2"})
table.put("r1", {"info:c3": "v3"})

Using hbase shell, verify that in HBase that you can see the data.

hbase(main):038:0> scan 'stocks'
ROW                                    COLUMN+CELL                                                                                                    
 r1                                    column=info:c1, timestamp=1502210951911, value=v1                                                              
 r1                                    column=info:c2, timestamp=1502210951942, value=v2                                                              
 r1                                    column=info:c3, timestamp=1502210951953, value=v3                                                              
1 row(s) in 0.0070 seconds

Now, let's truncate the table and load stocks data from this file using Spark.

Load stocks.csv in an RDD.

stocksRaw = sc.textFile("/user/mapr/stocks")
for r in stocksRaw.take(10):
    print(r)
date,open,high,low,close,volume,adjclose,symbol 2000-07-17,95.4375,97.5,92.75,96.625,3508100.0,74.269199,XLNX 2000-07-17,22.625,22.75,22.4375,22.5625,201600.0,13.48614,ES 2000-07-17,6.750002,6.937503,6.375,6.5,1235700.0,5.241649,CHK 2000-07-17,19.812501,20.1875,19.500001,20.1875,1434100.0,3.806147,NI 2000-07-17,30.5,30.6875,30.0,30.03125,254600.0,19.81183,SNA 2000-07-17,44.749996,45.062498,44.500004,45.000009,535200.0,17.400773,FOXA 2000-07-17,19.625,19.625,19.25,19.375,309500.0,13.768835,R 2000-07-17,16.6562,16.6875,16.125,16.25,5507200.0,1.755466,ROST 2000-07-17,56.25,57.25,56.0625,56.125,7941200.0,18.31076,PG

Create a new RDD by filtering out the header.

stocks = stocksRaw.filter(lambda r: not r.startswith("date")) for r in stocks.take(10):     print(r)
2000-07-17,95.4375,97.5,92.75,96.625,3508100.0,74.269199,XLNX 2000-07-17,22.625,22.75,22.4375,22.5625,201600.0,13.48614,ES 2000-07-17,6.750002,6.937503,6.375,6.5,1235700.0,5.241649,CHK 2000-07-17,19.812501,20.1875,19.500001,20.1875,1434100.0,3.806147,NI 2000-07-17,30.5,30.6875,30.0,30.03125,254600.0,19.81183,SNA 2000-07-17,44.749996,45.062498,44.500004,45.000009,535200.0,17.400773,FOXA 2000-07-17,19.625,19.625,19.25,19.375,309500.0,13.768835,R 2000-07-17,16.6562,16.6875,16.125,16.25,5507200.0,1.755466,ROST 2000-07-17,56.25,57.25,56.0625,56.125,7941200.0,18.31076,PG 2000-07-17,54.000326,54.000326,52.500318,53.375325,3725000.0,71.068871,TYC

Insert the stocks data using foreachPartition function, so that the connection can be reused at the partition level rather than opening a new connection for every record.

def bulk_insert(batch):     table = happybase.Connection(server).table(table_name)      for r in batch:         tokens = r.split(",")         key = tokens[0] + "-" + tokens[7]         value = {"info:date": tokens[0]                  ,"info:open": tokens[1]                  ,"info:high": tokens[2]                  ,"info:low": tokens[3]                  ,"info:close": tokens[4]                  ,"info:volume": tokens[5]                  ,"info:adjclose": tokens[6]                  ,"info:symbol": tokens[0]                 }         # Look at jupyter console to see the print output         print(key, value)         table.put(key, value)               stocks.foreachPartition(bulk_insert)

Verify the result in HBase.

hbase(main):043:0> scan 'stocks', {LIMIT => 1}

ROW                                    COLUMN+CELL                                                                                                      2000-07-26-SHW                        column=info:adjclose, timestamp=1502212543473, value=15.61832                                                    2000-07-26-SHW                        column=info:close, timestamp=1502212543473, value=20.9375                                                        2000-07-26-SHW                        column=info:date, timestamp=1502212543473, value=2000-07-26                                                      2000-07-26-SHW                        column=info:high, timestamp=1502212543473, value=22.125                                                          2000-07-26-SHW                        column=info:low, timestamp=1502212543473, value=20.9375                                                          2000-07-26-SHW                        column=info:open, timestamp=1502212543473, value=21.875                                                          2000-07-26-SHW                        column=info:symbol, timestamp=1502212543473, value=2000-07-26                                                    2000-07-26-SHW                        column=info:volume, timestamp=1502212543473, value=1464300.0                                                    1 row(s) in 0.0090 seconds

Now, let's query this table using hive.

drop table if exists stocks_hbase;
 CREATE EXTERNAL TABLE stocks_hbase(  key string,  `date` string,  open string,  high string,  low string,  close string,  volume string,  adjClose string,  symbol string) ROW format serde 'org.apache.hadoop.hive.hbase.HBaseSerDe'  STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  WITH SERDEPROPERTIES('hbase.columns.mapping' = ':key,info:date,info:open,info:high,info:low,info:close,info:volume,info:adjclose,info:symbol')  TBLPROPERTIES ('hbase.table.name' = 'stocks');  select * from stocks_hbase limit 10;