Hive and Sqoop: CDC
Following is a mechanism for generating the CDC report. Follow the step in sequence.
To follow the steps open 3 terminals
Shell terminal to run hdfs and sqoop commands. Terminal prompt starting with $ are shell prompt.
Mysql terminal. Terminal prompts starting with "mysql>" are mysql terminal.
beeline/hive terminal. Terminal prompts starting with "0: jdbc:hive2://localhost:10000/default>" are beeline terminal
In the document, commands are run from one these three terminals depending on the context.
Tools Used:
Sqoop
Hive
HDFS
MySQL (as RDBMS data source)
$ mysql -u root -p
<enter password>
mysql>
CREATE DATABASE IF NOT EXISTS retail_db;
USE retail_db;
DROP TABLE IF EXISTS deal;
CREATE TABLE deal(
deal_id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(100),
amount FLOAT,
delete_flag TINYINT(1),
LastModifiedDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (deal_id)
);
INSERT INTO deal (name, amount) VALUES
("Deal 1", 1000),
("Deal 2", 2000);
SELECT * FROM deal;
+---------+--------+--------+-------------+---------------------+
| deal_id | name | amount | delete_flag | LastModifiedDate |
+---------+--------+--------+-------------+---------------------+
| 1 | Deal 1 | 1000 | NULL | 2016-05-04 00:08:51 |
| 2 | Deal 2 | 2000 | NULL | 2016-05-04 00:08:51 |
+---------+--------+--------+-------------+---------------------+
$ sqoop list-databases --connect jdbc:mysql://localhost:3306 --username root --password cloudera
...
retail_db
...
$ sqoop list-tables --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera
...
$ hive -e "DROP TABLE IF EXISTS default.deal_shell;"
$ sqoop create-hive-table --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table deal --hive-table deal_shell --fields-terminated-by ','
$ sqoop job --delete import_deal
$ hdfs dfs -rm -r '/user/cloudera/data/deal'
$ sqoop job --create import_deal -- import --connect jdbc:mysql://localhost:3306/retail_db --username root --P --table deal --target-dir '/user/cloudera/data/deal' -m 1 --incremental lastmodified --check-column LastModifiedDate --append --fields-terminated-by ','
<Enter password for mysql user account. For cloudera we are using username: root, password: cloudera>
$ sqoop job --list
$ sqoop job --show import_deal
$ sqoop job --exec import_deal
<Enter password for mysql user account. For cloudera we are using username: root, password: cloudera>
$ hdfs dfs -ls /user/cloudera/data/deal
Found 1 items
-rw-r--r-- 1 cloudera cloudera 86 2016-05-04 00:20 /user/cloudera/data/deal/part-m-00000
$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00000
1,Deal 1,1000.0,null,2016-05-04 00:08:51.0
2,Deal 2,2000.0,null,2016-05-04 00:08:51.0
Connect to hive server using beeline
$ beeline
Beeline version 1.1.0-cdh5.5.0 by Apache Hive
beeline> !connect jdbc:hive2://localhost:10000/default cloudera cloudera
scan complete in 4ms
Connecting to jdbc:hive2://localhost:10000/default
Connected to: Apache Hive (version 1.1.0-cdh5.5.0)
Driver: Hive JDBC (version 1.1.0-cdh5.5.0)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000/default>
0: jdbc:hive2://localhost:10000/default> describe formatted deal;
+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+
| col_name | data_type | comment |
+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+
| # col_name | data_type | comment |
| | NULL | NULL |
| deal_id | int | |
| name | string | |
| amount | double | |
| delete_flag | boolean | |
| lastmodifieddate | string | |
| | NULL | NULL |
| # Detailed Table Information | NULL | NULL |
| Database: | default | NULL |
| Owner: | cloudera | NULL |
| CreateTime: | Tue May 03 23:28:28 PDT 2016 | NULL |
| LastAccessTime: | UNKNOWN | NULL |
| Protect Mode: | None | NULL |
| Retention: | 0 | NULL |
| Location: | hdfs://quickstart.cloudera:8020/user/hive/warehouse/deal | NULL |
| Table Type: | MANAGED_TABLE | NULL |
| Table Parameters: | NULL | NULL |
| | COLUMN_STATS_ACCURATE | true |
| | comment | Imported by sqoop on 2016/05/03 23:28:23 |
| | numFiles | 1 |
| | totalSize | 86 |
| | transient_lastDdlTime | 1462343310 |
| | NULL | NULL |
| # Storage Information | NULL | NULL |
| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL |
| InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL |
| OutputFormat: | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL |
| Compressed: | No | NULL |
| Num Buckets: | -1 | NULL |
| Bucket Columns: | [] | NULL |
| Sort Columns: | [] | NULL |
| Storage Desc Params: | NULL | NULL |
| | field.delim | , |
| | line.delim | \n |
| | serialization.format | , |
+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+
36 rows selected (0.453 seconds)
0: jdbc:hive2://localhost:10000/default> CREATE EXTERNAL TABLE deal_hist LIKE deal_shell LOCATION '/user/cloudera/data/deal';
No rows affected (1.642 seconds)
0: jdbc:hive2://localhost:10000/default> DESCRIBE FORMATTED deal_hist;
+-------------------------------+-------------------------------------------------------------+-----------------------+--+
| col_name | data_type | comment |
+-------------------------------+-------------------------------------------------------------+-----------------------+--+
| # col_name | data_type | comment |
| | NULL | NULL |
| deal_id | int | |
| name | string | |
| amount | double | |
| delete_flag | boolean | |
| lastmodifieddate | string | |
| | NULL | NULL |
| # Detailed Table Information | NULL | NULL |
| Database: | default | NULL |
| Owner: | cloudera | NULL |
| CreateTime: | Wed May 04 00:22:51 PDT 2016 | NULL |
| LastAccessTime: | UNKNOWN | NULL |
| Protect Mode: | None | NULL |
| Retention: | 0 | NULL |
| Location: | hdfs://quickstart.cloudera:8020/user/cloudera/data/deal | NULL |
| Table Type: | EXTERNAL_TABLE | NULL |
| Table Parameters: | NULL | NULL |
| | COLUMN_STATS_ACCURATE | false |
| | EXTERNAL | TRUE |
| | numFiles | 0 |
| | numRows | -1 |
| | rawDataSize | -1 |
| | totalSize | 0 |
| | transient_lastDdlTime | 1462346571 |
| | NULL | NULL |
| # Storage Information | NULL | NULL |
| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL |
| InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL |
| OutputFormat: | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL |
| Compressed: | No | NULL |
| Num Buckets: | -1 | NULL |
| Bucket Columns: | [] | NULL |
| Sort Columns: | [] | NULL |
| Storage Desc Params: | NULL | NULL |
| | field.delim | , |
| | line.delim | \n |
| | serialization.format | , |
+-------------------------------+-------------------------------------------------------------+-----------------------+--+
0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal_hist;
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| deal_hist.deal_id | deal_hist.name | deal_hist.amount | deal_hist.delete_flag | deal_hist.lastmodifieddate |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| 1 | Deal 1 | 1000.0 | NULL | 2016-05-04 00:08:51.0 |
| 2 | Deal 2 | 2000.0 | NULL | 2016-05-04 00:08:51.0 |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
Now let's make 3 changes to deal table in MySQL.
mysql>
INSERT INTO deal (name, amount) VALUES
("Deal 3", 300),
("Deal 4", 400);
mysql> UPDATE deal SET amount = 1010 WHERE deal_id = 1;
mysql> SELECT * FROM deal;
+---------+--------+--------+-------------+---------------------+
| deal_id | name | amount | delete_flag | LastModifiedDate |
+---------+--------+--------+-------------+---------------------+
| 1 | Deal 1 | 1010 | NULL | 2016-05-04 00:24:43 |
| 2 | Deal 2 | 2000 | NULL | 2016-05-04 00:08:51 |
| 3 | Deal 3 | 300 | NULL | 2016-05-04 00:24:37 |
| 4 | Deal 4 | 400 | NULL | 2016-05-04 00:24:37 |
+---------+--------+--------+-------------+---------------------+
$ sqoop job --exec import_deal
<Enter mysql password when prompted>
$ hdfs dfs -ls /user/cloudera/data/deal
Found 2 items
-rw-r--r-- 1 cloudera cloudera 86 2016-05-04 00:20 /user/cloudera/data/deal/part-m-00000
-rw-r--r-- 1 cloudera cloudera 127 2016-05-04 00:26 /user/cloudera/data/deal/part-m-00001
$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00000
1,Deal 1,1000.0,null,2016-05-04 00:08:51.0
2,Deal 2,2000.0,null,2016-05-04 00:08:51.0
$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00001
1,Deal 1,1010.0,null,2016-05-04 00:24:43.0
3,Deal 3,300.0,null,2016-05-04 00:24:37.0
4,Deal 4,400.0,null,2016-05-04 00:24:37.0
0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal_hist;
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| deal_hist.deal_id | deal_hist.name | deal_hist.amount | deal_hist.delete_flag | deal_hist.lastmodifieddate |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| 1 | Deal 1 | 1000.0 | NULL | 2016-05-04 00:08:51.0 |
| 2 | Deal 2 | 2000.0 | NULL | 2016-05-04 00:08:51.0 |
| 1 | Deal 1 | 1010.0 | NULL | 2016-05-04 00:24:43.0 |
| 3 | Deal 3 | 300.0 | NULL | 2016-05-04 00:24:37.0 |
| 4 | Deal 4 | 400.0 | NULL | 2016-05-04 00:24:37.0 |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
0: jdbc:hive2://localhost:10000/default>
CREATE VIEW IF NOT EXISTS deal AS SELECT t1.* FROM deal_hist t1 JOIN
(SELECT deal_id, MAX(lastmodifieddate) AS lmd FROM deal_hist GROUP BY deal_id) t2
ON t1.deal_id = t2.deal_id AND t1.lastmodifieddate = t2.lmd;
0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal;
+---------------+------------+--------------+-------------------+------------------------+--+
| deal.deal_id | deal.name | deal.amount | deal.delete_flag | deal.lastmodifieddate |
+---------------+------------+--------------+-------------------+------------------------+--+
| 1 | Deal 1 | 1010.0 | NULL | 2016-05-04 02:32:58.0 |
| 2 | Deal 2 | 2000.0 | NULL | 2016-05-04 02:22:03.0 |
| 3 | Deal 3 | 300.0 | NULL | 2016-05-04 02:32:50.0 |
| 4 | Deal 4 | 400.0 | NULL | 2016-05-04 02:32:50.0 |
+---------------+------------+--------------+-------------------+------------------------+--+
Note: This deal view should be the true reflection of the "deal" table in mysql.
In Beeline following the following instructions
0: jdbc:hive2://localhost:10000/default>
set hive.variable.substitute=true;
set old_timestamp='2016-05-04 02:22:03.0';
DROP VIEW IF EXISTS deal_old;
CREATE VIEW deal_old AS SELECT t1.* FROM deal_hist t1 JOIN
(SELECT deal_id, MAX(lastmodifieddate) AS lmd FROM deal_hist WHERE lastmodifieddate <= ${hiveconf:old_timestamp} GROUP BY deal_id) t2
ON t1.deal_id = t2.deal_id AND t1.lastmodifieddate = t2.lmd;
CREATE VIEW IF NOT EXISTS cdc_report AS SELECT
CASE
WHEN v1.deal_id IS NOT NULL THEN v1.deal_id
WHEN v2.deal_id IS NOT NULL THEN v2.deal_id
ELSE 'Error'
END AS deal_id,
CASE
WHEN v1.deal_id = v2.deal_id AND concat(v1.name, v1.amount) = concat(v2.name, v2.amount) THEN 'NoChange'
WHEN v1.deal_id = v2.deal_id AND concat(v1.name, v1.amount) <> concat(v2.name, v2.amount) THEN 'Update'
WHEN v1.deal_id is null THEN 'New'
WHEN v2.deal_id is null THEN 'Deleted'
else 'Error'
END AS cdc_codes
FROM deal_old AS v1 FULL OUTER JOIN deal AS v2
ON v1.deal_id = v2.deal_id;
SELECT * FROM cdc_report;
+---------------------+-----------------------+--+
| cdc_report.deal_id | cdc_report.cdc_codes |
+---------------------+-----------------------+--+
| 1 | Update |
| 2 | NoChange |
| 3 | New |
| 4 | New |
+---------------------+-----------------------+--+
In steady state (I mean under normal operating condition) you have to run the following two commands
$ sqoop job --exec import_deal
$ hive -f generate_cdc_report.hql --hiveconf old_timestamp="2016-05-04 02:32:50"
Sample output:
1 Update
2 NoChange
3 NoChange
4 NoChange
5 New
6 New
Underlying Historical Records:
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| deal_hist.deal_id | deal_hist.name | deal_hist.amount | deal_hist.delete_flag | deal_hist.lastmodifieddate |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| 1 | Deal 1 | 1000.0 | NULL | 2016-05-04 02:22:03.0 |
| 2 | Deal 2 | 2000.0 | NULL | 2016-05-04 02:22:03.0 |
| 1 | Deal 1 | 1010.0 | NULL | 2016-05-04 02:32:58.0 |
| 3 | Deal 3 | 300.0 | NULL | 2016-05-04 02:32:50.0 |
| 4 | Deal 4 | 400.0 | NULL | 2016-05-04 02:32:50.0 |
| 1 | Deal 1 | 1020.0 | NULL | 2016-05-04 03:11:31.0 |
| 5 | Deal 5 | 400.0 | NULL | 2016-05-04 03:11:16.0 |
| 6 | Deal 6 | 600.0 | NULL | 2016-05-04 03:11:16.0 |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+