Hive and Sqoop: CDC

Following is a mechanism for generating the CDC report. Follow the step in sequence.

To follow the steps open 3 terminals

  1. Shell terminal to run hdfs and sqoop commands. Terminal prompt starting with $ are shell prompt.

  2. Mysql terminal. Terminal prompts starting with "mysql>" are mysql terminal.

  3. beeline/hive terminal. Terminal prompts starting with "0: jdbc:hive2://localhost:10000/default>" are beeline terminal

In the document, commands are run from one these three terminals depending on the context.

Tools Used:

  • Sqoop

  • Hive

  • HDFS

  • MySQL (as RDBMS data source)

$ mysql -u root -p

<enter password>

mysql>

CREATE DATABASE IF NOT EXISTS retail_db;

USE retail_db;

DROP TABLE IF EXISTS deal;

CREATE TABLE deal(

deal_id INT NOT NULL AUTO_INCREMENT,

name VARCHAR(100),

amount FLOAT,

delete_flag TINYINT(1),

LastModifiedDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

PRIMARY KEY (deal_id)

);

INSERT INTO deal (name, amount) VALUES

("Deal 1", 1000),

("Deal 2", 2000);

SELECT * FROM deal;

+---------+--------+--------+-------------+---------------------+

| deal_id | name | amount | delete_flag | LastModifiedDate |

+---------+--------+--------+-------------+---------------------+

| 1 | Deal 1 | 1000 | NULL | 2016-05-04 00:08:51 |

| 2 | Deal 2 | 2000 | NULL | 2016-05-04 00:08:51 |

+---------+--------+--------+-------------+---------------------+

$ sqoop list-databases --connect jdbc:mysql://localhost:3306 --username root --password cloudera

...

retail_db

...

$ sqoop list-tables --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera

...

$ hive -e "DROP TABLE IF EXISTS default.deal_shell;"

$ sqoop create-hive-table --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table deal --hive-table deal_shell --fields-terminated-by ','

$ sqoop job --delete import_deal

$ hdfs dfs -rm -r '/user/cloudera/data/deal'

$ sqoop job --create import_deal -- import --connect jdbc:mysql://localhost:3306/retail_db --username root --P --table deal --target-dir '/user/cloudera/data/deal' -m 1 --incremental lastmodified --check-column LastModifiedDate --append --fields-terminated-by ','

<Enter password for mysql user account. For cloudera we are using username: root, password: cloudera>

$ sqoop job --list

$ sqoop job --show import_deal

$ sqoop job --exec import_deal

<Enter password for mysql user account. For cloudera we are using username: root, password: cloudera>

$ hdfs dfs -ls /user/cloudera/data/deal

Found 1 items

-rw-r--r-- 1 cloudera cloudera 86 2016-05-04 00:20 /user/cloudera/data/deal/part-m-00000

$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00000

1,Deal 1,1000.0,null,2016-05-04 00:08:51.0

2,Deal 2,2000.0,null,2016-05-04 00:08:51.0

Connect to hive server using beeline

$ beeline

Beeline version 1.1.0-cdh5.5.0 by Apache Hive

beeline> !connect jdbc:hive2://localhost:10000/default cloudera cloudera

scan complete in 4ms

Connecting to jdbc:hive2://localhost:10000/default

Connected to: Apache Hive (version 1.1.0-cdh5.5.0)

Driver: Hive JDBC (version 1.1.0-cdh5.5.0)

Transaction isolation: TRANSACTION_REPEATABLE_READ

0: jdbc:hive2://localhost:10000/default>

0: jdbc:hive2://localhost:10000/default> describe formatted deal;

+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+

| col_name | data_type | comment |

+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+

| # col_name | data_type | comment |

| | NULL | NULL |

| deal_id | int | |

| name | string | |

| amount | double | |

| delete_flag | boolean | |

| lastmodifieddate | string | |

| | NULL | NULL |

| # Detailed Table Information | NULL | NULL |

| Database: | default | NULL |

| Owner: | cloudera | NULL |

| CreateTime: | Tue May 03 23:28:28 PDT 2016 | NULL |

| LastAccessTime: | UNKNOWN | NULL |

| Protect Mode: | None | NULL |

| Retention: | 0 | NULL |

| Location: | hdfs://quickstart.cloudera:8020/user/hive/warehouse/deal | NULL |

| Table Type: | MANAGED_TABLE | NULL |

| Table Parameters: | NULL | NULL |

| | COLUMN_STATS_ACCURATE | true |

| | comment | Imported by sqoop on 2016/05/03 23:28:23 |

| | numFiles | 1 |

| | totalSize | 86 |

| | transient_lastDdlTime | 1462343310 |

| | NULL | NULL |

| # Storage Information | NULL | NULL |

| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL |

| InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL |

| OutputFormat: | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL |

| Compressed: | No | NULL |

| Num Buckets: | -1 | NULL |

| Bucket Columns: | [] | NULL |

| Sort Columns: | [] | NULL |

| Storage Desc Params: | NULL | NULL |

| | field.delim | , |

| | line.delim | \n |

| | serialization.format | , |

+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+

36 rows selected (0.453 seconds)

0: jdbc:hive2://localhost:10000/default> CREATE EXTERNAL TABLE deal_hist LIKE deal_shell LOCATION '/user/cloudera/data/deal';

No rows affected (1.642 seconds)

0: jdbc:hive2://localhost:10000/default> DESCRIBE FORMATTED deal_hist;

+-------------------------------+-------------------------------------------------------------+-----------------------+--+

| col_name | data_type | comment |

+-------------------------------+-------------------------------------------------------------+-----------------------+--+

| # col_name | data_type | comment |

| | NULL | NULL |

| deal_id | int | |

| name | string | |

| amount | double | |

| delete_flag | boolean | |

| lastmodifieddate | string | |

| | NULL | NULL |

| # Detailed Table Information | NULL | NULL |

| Database: | default | NULL |

| Owner: | cloudera | NULL |

| CreateTime: | Wed May 04 00:22:51 PDT 2016 | NULL |

| LastAccessTime: | UNKNOWN | NULL |

| Protect Mode: | None | NULL |

| Retention: | 0 | NULL |

| Location: | hdfs://quickstart.cloudera:8020/user/cloudera/data/deal | NULL |

| Table Type: | EXTERNAL_TABLE | NULL |

| Table Parameters: | NULL | NULL |

| | COLUMN_STATS_ACCURATE | false |

| | EXTERNAL | TRUE |

| | numFiles | 0 |

| | numRows | -1 |

| | rawDataSize | -1 |

| | totalSize | 0 |

| | transient_lastDdlTime | 1462346571 |

| | NULL | NULL |

| # Storage Information | NULL | NULL |

| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL |

| InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL |

| OutputFormat: | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL |

| Compressed: | No | NULL |

| Num Buckets: | -1 | NULL |

| Bucket Columns: | [] | NULL |

| Sort Columns: | [] | NULL |

| Storage Desc Params: | NULL | NULL |

| | field.delim | , |

| | line.delim | \n |

| | serialization.format | , |

+-------------------------------+-------------------------------------------------------------+-----------------------+--+

0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal_hist;

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

| deal_hist.deal_id | deal_hist.name | deal_hist.amount | deal_hist.delete_flag | deal_hist.lastmodifieddate |

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

| 1 | Deal 1 | 1000.0 | NULL | 2016-05-04 00:08:51.0 |

| 2 | Deal 2 | 2000.0 | NULL | 2016-05-04 00:08:51.0 |

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

Now let's make 3 changes to deal table in MySQL.

mysql>

INSERT INTO deal (name, amount) VALUES

("Deal 3", 300),

("Deal 4", 400);

mysql> UPDATE deal SET amount = 1010 WHERE deal_id = 1;

mysql> SELECT * FROM deal;

+---------+--------+--------+-------------+---------------------+

| deal_id | name | amount | delete_flag | LastModifiedDate |

+---------+--------+--------+-------------+---------------------+

| 1 | Deal 1 | 1010 | NULL | 2016-05-04 00:24:43 |

| 2 | Deal 2 | 2000 | NULL | 2016-05-04 00:08:51 |

| 3 | Deal 3 | 300 | NULL | 2016-05-04 00:24:37 |

| 4 | Deal 4 | 400 | NULL | 2016-05-04 00:24:37 |

+---------+--------+--------+-------------+---------------------+

$ sqoop job --exec import_deal

<Enter mysql password when prompted>

$ hdfs dfs -ls /user/cloudera/data/deal

Found 2 items

-rw-r--r-- 1 cloudera cloudera 86 2016-05-04 00:20 /user/cloudera/data/deal/part-m-00000

-rw-r--r-- 1 cloudera cloudera 127 2016-05-04 00:26 /user/cloudera/data/deal/part-m-00001

$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00000

1,Deal 1,1000.0,null,2016-05-04 00:08:51.0

2,Deal 2,2000.0,null,2016-05-04 00:08:51.0

$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00001

1,Deal 1,1010.0,null,2016-05-04 00:24:43.0

3,Deal 3,300.0,null,2016-05-04 00:24:37.0

4,Deal 4,400.0,null,2016-05-04 00:24:37.0

0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal_hist;

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

| deal_hist.deal_id | deal_hist.name | deal_hist.amount | deal_hist.delete_flag | deal_hist.lastmodifieddate |

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

| 1 | Deal 1 | 1000.0 | NULL | 2016-05-04 00:08:51.0 |

| 2 | Deal 2 | 2000.0 | NULL | 2016-05-04 00:08:51.0 |

| 1 | Deal 1 | 1010.0 | NULL | 2016-05-04 00:24:43.0 |

| 3 | Deal 3 | 300.0 | NULL | 2016-05-04 00:24:37.0 |

| 4 | Deal 4 | 400.0 | NULL | 2016-05-04 00:24:37.0 |

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

0: jdbc:hive2://localhost:10000/default>

CREATE VIEW IF NOT EXISTS deal AS SELECT t1.* FROM deal_hist t1 JOIN

(SELECT deal_id, MAX(lastmodifieddate) AS lmd FROM deal_hist GROUP BY deal_id) t2

ON t1.deal_id = t2.deal_id AND t1.lastmodifieddate = t2.lmd;

0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal;

+---------------+------------+--------------+-------------------+------------------------+--+

| deal.deal_id | deal.name | deal.amount | deal.delete_flag | deal.lastmodifieddate |

+---------------+------------+--------------+-------------------+------------------------+--+

| 1 | Deal 1 | 1010.0 | NULL | 2016-05-04 02:32:58.0 |

| 2 | Deal 2 | 2000.0 | NULL | 2016-05-04 02:22:03.0 |

| 3 | Deal 3 | 300.0 | NULL | 2016-05-04 02:32:50.0 |

| 4 | Deal 4 | 400.0 | NULL | 2016-05-04 02:32:50.0 |

+---------------+------------+--------------+-------------------+------------------------+--+

Note: This deal view should be the true reflection of the "deal" table in mysql.

In Beeline following the following instructions

0: jdbc:hive2://localhost:10000/default>

set hive.variable.substitute=true;

set old_timestamp='2016-05-04 02:22:03.0';

DROP VIEW IF EXISTS deal_old;

CREATE VIEW deal_old AS SELECT t1.* FROM deal_hist t1 JOIN

(SELECT deal_id, MAX(lastmodifieddate) AS lmd FROM deal_hist WHERE lastmodifieddate <= ${hiveconf:old_timestamp} GROUP BY deal_id) t2

ON t1.deal_id = t2.deal_id AND t1.lastmodifieddate = t2.lmd;

CREATE VIEW IF NOT EXISTS cdc_report AS SELECT

CASE

WHEN v1.deal_id IS NOT NULL THEN v1.deal_id

WHEN v2.deal_id IS NOT NULL THEN v2.deal_id

ELSE 'Error'

END AS deal_id,

CASE

WHEN v1.deal_id = v2.deal_id AND concat(v1.name, v1.amount) = concat(v2.name, v2.amount) THEN 'NoChange'

WHEN v1.deal_id = v2.deal_id AND concat(v1.name, v1.amount) <> concat(v2.name, v2.amount) THEN 'Update'

WHEN v1.deal_id is null THEN 'New'

WHEN v2.deal_id is null THEN 'Deleted'

else 'Error'

END AS cdc_codes

FROM deal_old AS v1 FULL OUTER JOIN deal AS v2

ON v1.deal_id = v2.deal_id;

SELECT * FROM cdc_report;

+---------------------+-----------------------+--+

| cdc_report.deal_id | cdc_report.cdc_codes |

+---------------------+-----------------------+--+

| 1 | Update |

| 2 | NoChange |

| 3 | New |

| 4 | New |

+---------------------+-----------------------+--+

In steady state (I mean under normal operating condition) you have to run the following two commands

$ sqoop job --exec import_deal

$ hive -f generate_cdc_report.hql --hiveconf old_timestamp="2016-05-04 02:32:50"

Sample output:

1 Update

2 NoChange

3 NoChange

4 NoChange

5 New

6 New

Underlying Historical Records:

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

| deal_hist.deal_id | deal_hist.name | deal_hist.amount | deal_hist.delete_flag | deal_hist.lastmodifieddate |

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

| 1 | Deal 1 | 1000.0 | NULL | 2016-05-04 02:22:03.0 |

| 2 | Deal 2 | 2000.0 | NULL | 2016-05-04 02:22:03.0 |

| 1 | Deal 1 | 1010.0 | NULL | 2016-05-04 02:32:58.0 |

| 3 | Deal 3 | 300.0 | NULL | 2016-05-04 02:32:50.0 |

| 4 | Deal 4 | 400.0 | NULL | 2016-05-04 02:32:50.0 |

| 1 | Deal 1 | 1020.0 | NULL | 2016-05-04 03:11:31.0 |

| 5 | Deal 5 | 400.0 | NULL | 2016-05-04 03:11:16.0 |

| 6 | Deal 6 | 600.0 | NULL | 2016-05-04 03:11:16.0 |

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+