Pyspark working with HBase

Install happybase using pip command.

$ sudo pip3 install happybase

[sudo] password for mapr:

Collecting happybase

Downloading happybase-1.1.0.tar.gz (40kB)

100% |████████████████████████████████| 40kB 114kB/s

Requirement already satisfied: six in /usr/lib/python3.4/site-packages (from happybase)

Collecting thriftpy>=0.3.8 (from happybase)

Downloading thriftpy-0.3.9.tar.gz (208kB)

100% |████████████████████████████████| 215kB 131kB/s

Collecting ply<4.0,>=3.4 (from thriftpy>=0.3.8->happybase)

Downloading ply-3.10.tar.gz (150kB)

100% |████████████████████████████████| 153kB 146kB/s

Building wheels for collected packages: happybase, thriftpy, ply

Running setup.py bdist_wheel for happybase ... done

Stored in directory: /root/.cache/pip/wheels/60/96/09/7df5bb12d43abe4bb4b767ccff29ca252f7688fcd1b5c2b467

Running setup.py bdist_wheel for thriftpy ... done

Stored in directory: /root/.cache/pip/wheels/ae/3b/73/082d28b917d2886b1c0d3e0dc6f3ca3919b7e1df519580873b

Running setup.py bdist_wheel for ply ... done

Stored in directory: /root/.cache/pip/wheels/ad/dd/ad/8ce1991a7b380dfe23d6cc81a4de5c2775bc728b5a0a7721aa

Successfully built happybase thriftpy ply

Installing collected packages: ply, thriftpy, happybase

Successfully installed happybase-1.1.0 ply-3.10 thriftpy-0.3.9

Launch Hbase shell and create a table called stocks with a column family "info".

hbase(main):014:0> create 'stocks', 'info'

0 row(s) in 0.0120 seconds

=> Hbase::Table - /tables/stocks

Launch pyspark and test that you can save data into HBase table.

import happybase

server = "localhost"

table_name = "/tables/stocks"

table = happybase.Connection(server).table(table_name)

table.put("r1", {"info:c1": "v1"})

table.put("r1", {"info:c2": "v2"})

table.put("r1", {"info:c3": "v3"})

Using hbase shell, verify that in HBase that you can see the data.

hbase(main):038:0> scan 'stocks'

ROW COLUMN+CELL

r1 column=info:c1, timestamp=1502210951911, value=v1

r1 column=info:c2, timestamp=1502210951942, value=v2

r1 column=info:c3, timestamp=1502210951953, value=v3

1 row(s) in 0.0070 seconds

Now, let's truncate the table and load stocks data from this file using Spark.

Load stocks.csv in an RDD.

stocksRaw = sc.textFile("/user/mapr/stocks")

for r in stocksRaw.take(10):

print(r)

date,open,high,low,close,volume,adjclose,symbol 2000-07-17,95.4375,97.5,92.75,96.625,3508100.0,74.269199,XLNX 2000-07-17,22.625,22.75,22.4375,22.5625,201600.0,13.48614,ES 2000-07-17,6.750002,6.937503,6.375,6.5,1235700.0,5.241649,CHK 2000-07-17,19.812501,20.1875,19.500001,20.1875,1434100.0,3.806147,NI 2000-07-17,30.5,30.6875,30.0,30.03125,254600.0,19.81183,SNA 2000-07-17,44.749996,45.062498,44.500004,45.000009,535200.0,17.400773,FOXA 2000-07-17,19.625,19.625,19.25,19.375,309500.0,13.768835,R 2000-07-17,16.6562,16.6875,16.125,16.25,5507200.0,1.755466,ROST 2000-07-17,56.25,57.25,56.0625,56.125,7941200.0,18.31076,PG

Create a new RDD by filtering out the header.

stocks = stocksRaw.filter(lambda r: not r.startswith("date")) for r in stocks.take(10): print(r)

2000-07-17,95.4375,97.5,92.75,96.625,3508100.0,74.269199,XLNX 2000-07-17,22.625,22.75,22.4375,22.5625,201600.0,13.48614,ES 2000-07-17,6.750002,6.937503,6.375,6.5,1235700.0,5.241649,CHK 2000-07-17,19.812501,20.1875,19.500001,20.1875,1434100.0,3.806147,NI 2000-07-17,30.5,30.6875,30.0,30.03125,254600.0,19.81183,SNA 2000-07-17,44.749996,45.062498,44.500004,45.000009,535200.0,17.400773,FOXA 2000-07-17,19.625,19.625,19.25,19.375,309500.0,13.768835,R 2000-07-17,16.6562,16.6875,16.125,16.25,5507200.0,1.755466,ROST 2000-07-17,56.25,57.25,56.0625,56.125,7941200.0,18.31076,PG 2000-07-17,54.000326,54.000326,52.500318,53.375325,3725000.0,71.068871,TYC

Insert the stocks data using foreachPartition function, so that the connection can be reused at the partition level rather than opening a new connection for every record.

def bulk_insert(batch): table = happybase.Connection(server).table(table_name) for r in batch: tokens = r.split(",") key = tokens[0] + "-" + tokens[7] value = {"info:date": tokens[0] ,"info:open": tokens[1] ,"info:high": tokens[2] ,"info:low": tokens[3] ,"info:close": tokens[4] ,"info:volume": tokens[5] ,"info:adjclose": tokens[6] ,"info:symbol": tokens[0] } # Look at jupyter console to see the print output print(key, value) table.put(key, value) stocks.foreachPartition(bulk_insert)

Verify the result in HBase.

hbase(main):043:0> scan 'stocks', {LIMIT => 1}

ROW COLUMN+CELL 2000-07-26-SHW column=info:adjclose, timestamp=1502212543473, value=15.61832 2000-07-26-SHW column=info:close, timestamp=1502212543473, value=20.9375 2000-07-26-SHW column=info:date, timestamp=1502212543473, value=2000-07-26 2000-07-26-SHW column=info:high, timestamp=1502212543473, value=22.125 2000-07-26-SHW column=info:low, timestamp=1502212543473, value=20.9375 2000-07-26-SHW column=info:open, timestamp=1502212543473, value=21.875 2000-07-26-SHW column=info:symbol, timestamp=1502212543473, value=2000-07-26 2000-07-26-SHW column=info:volume, timestamp=1502212543473, value=1464300.0 1 row(s) in 0.0090 seconds

Now, let's query this table using hive.

drop table if exists stocks_hbase;

CREATE EXTERNAL TABLE stocks_hbase( key string, `date` string, open string, high string, low string, close string, volume string, adjClose string, symbol string) ROW format serde 'org.apache.hadoop.hive.hbase.HBaseSerDe' STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES('hbase.columns.mapping' = ':key,info:date,info:open,info:high,info:low,info:close,info:volume,info:adjclose,info:symbol') TBLPROPERTIES ('hbase.table.name' = 'stocks'); select * from stocks_hbase limit 10;