Hive and Sqoop: CDC

Following is a mechanism for generating the CDC report. Follow the step in sequence.

To follow the steps open 3 terminals

  1. Shell terminal to run hdfs and sqoop commands. Terminal prompt starting with $ are shell prompt.
  2. Mysql terminal. Terminal prompts starting with "mysql>" are mysql terminal.
  3. beeline/hive terminal. Terminal prompts starting with "0: jdbc:hive2://localhost:10000/default>" are beeline terminal

In the document, commands are run from one these three terminals depending on the context.

Tools Used:

  • Sqoop
  • Hive
  • HDFS
  • MySQL (as RDBMS data source)

$ mysql -u root -p

<enter password>

mysql>

CREATE DATABASE IF NOT EXISTS retail_db;
USE retail_db;
DROP TABLE IF EXISTS deal;
CREATE TABLE deal(

deal_id INT NOT NULL AUTO_INCREMENT,

name VARCHAR(100),

amount FLOAT,

delete_flag TINYINT(1),

LastModifiedDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

PRIMARY KEY (deal_id)

);
INSERT INTO deal (name, amount) VALUES 
("Deal 1", 1000), 
("Deal 2", 2000);
SELECT * FROM deal;
+---------+--------+--------+-------------+---------------------+
| deal_id | name   | amount | delete_flag | LastModifiedDate    |
+---------+--------+--------+-------------+---------------------+
|       1 | Deal 1 |   1000 |        NULL | 2016-05-04 00:08:51 |
|       2 | Deal 2 |   2000 |        NULL | 2016-05-04 00:08:51 |
+---------+--------+--------+-------------+---------------------+

$ sqoop list-databases --connect jdbc:mysql://localhost:3306 --username root --password cloudera

...

retail_db

...

$ sqoop list-tables --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera

...

$ hive -e "DROP TABLE IF EXISTS default.deal_shell;"

$ sqoop create-hive-table --connect jdbc:mysql://localhost:3306/retail_db --username root --password cloudera --table deal --hive-table deal_shell --fields-terminated-by ','

$ sqoop job --delete import_deal

$ hdfs dfs -rm -r '/user/cloudera/data/deal'

$ sqoop job --create import_deal -- import --connect jdbc:mysql://localhost:3306/retail_db --username root --P --table deal --target-dir '/user/cloudera/data/deal' -m 1 --incremental lastmodified --check-column LastModifiedDate --append --fields-terminated-by ','

<Enter password for mysql user account. For cloudera we are using username: root, password: cloudera>

$ sqoop job --list

$ sqoop job --show import_deal

$ sqoop job --exec import_deal

<Enter password for mysql user account. For cloudera we are using username: root, password: cloudera>

$ hdfs dfs -ls /user/cloudera/data/deal

Found 1 items

-rw-r--r-- 1 cloudera cloudera 86 2016-05-04 00:20 /user/cloudera/data/deal/part-m-00000

$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00000

1,Deal 1,1000.0,null,2016-05-04 00:08:51.0

2,Deal 2,2000.0,null,2016-05-04 00:08:51.0

Connect to hive server using beeline

$ beeline

Beeline version 1.1.0-cdh5.5.0 by Apache Hive

beeline> !connect jdbc:hive2://localhost:10000/default cloudera cloudera

scan complete in 4ms

Connecting to jdbc:hive2://localhost:10000/default

Connected to: Apache Hive (version 1.1.0-cdh5.5.0)

Driver: Hive JDBC (version 1.1.0-cdh5.5.0)

Transaction isolation: TRANSACTION_REPEATABLE_READ

0: jdbc:hive2://localhost:10000/default>

0: jdbc:hive2://localhost:10000/default> describe formatted deal;

+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+
|           col_name            |                          data_type                          |                  comment                  |
+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+
| # col_name                    | data_type                                                   | comment                                   |
|                               | NULL                                                        | NULL                                      |
| deal_id                       | int                                                         |                                           |
| name                          | string                                                      |                                           |
| amount                        | double                                                      |                                           |
| delete_flag                   | boolean                                                     |                                           |
| lastmodifieddate              | string                                                      |                                           |
|                               | NULL                                                        | NULL                                      |
| # Detailed Table Information  | NULL                                                        | NULL                                      |
| Database:                     | default                                                     | NULL                                      |
| Owner:                        | cloudera                                                    | NULL                                      |
| CreateTime:                   | Tue May 03 23:28:28 PDT 2016                                | NULL                                      |
| LastAccessTime:               | UNKNOWN                                                     | NULL                                      |
| Protect Mode:                 | None                                                        | NULL                                      |
| Retention:                    | 0                                                           | NULL                                      |
| Location:                     | hdfs://quickstart.cloudera:8020/user/hive/warehouse/deal    | NULL                                      |
| Table Type:                   | MANAGED_TABLE                                               | NULL                                      |
| Table Parameters:             | NULL                                                        | NULL                                      |
|                               | COLUMN_STATS_ACCURATE                                       | true                                      |
|                               | comment                                                     | Imported by sqoop on 2016/05/03 23:28:23  |
|                               | numFiles                                                    | 1                                         |
|                               | totalSize                                                   | 86                                        |
|                               | transient_lastDdlTime                                       | 1462343310                                |
|                               | NULL                                                        | NULL                                      |
| # Storage Information         | NULL                                                        | NULL                                      |
| SerDe Library:                | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe          | NULL                                      |
| InputFormat:                  | org.apache.hadoop.mapred.TextInputFormat                    | NULL                                      |
| OutputFormat:                 | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat  | NULL                                      |
| Compressed:                   | No                                                          | NULL                                      |
| Num Buckets:                  | -1                                                          | NULL                                      |
| Bucket Columns:               | []                                                          | NULL                                      |
| Sort Columns:                 | []                                                          | NULL                                      |
| Storage Desc Params:          | NULL                                                        | NULL                                      |
|                               | field.delim                                                 | ,                                         |
|                               | line.delim                                                  | \n                                        |
|                               | serialization.format                                        | ,                                         |
+-------------------------------+-------------------------------------------------------------+-------------------------------------------+--+

36 rows selected (0.453 seconds)

0: jdbc:hive2://localhost:10000/default> CREATE EXTERNAL TABLE deal_hist LIKE deal_shell LOCATION '/user/cloudera/data/deal';

No rows affected (1.642 seconds)

0: jdbc:hive2://localhost:10000/default> DESCRIBE FORMATTED deal_hist;

+-------------------------------+-------------------------------------------------------------+-----------------------+--+
|           col_name            |                          data_type                          |        comment        |
+-------------------------------+-------------------------------------------------------------+-----------------------+--+
| # col_name                    | data_type                                                   | comment               |
|                               | NULL                                                        | NULL                  |
| deal_id                       | int                                                         |                       |
| name                          | string                                                      |                       |
| amount                        | double                                                      |                       |
| delete_flag                   | boolean                                                     |                       |
| lastmodifieddate              | string                                                      |                       |
|                               | NULL                                                        | NULL                  |
| # Detailed Table Information  | NULL                                                        | NULL                  |
| Database:                     | default                                                     | NULL                  |
| Owner:                        | cloudera                                                    | NULL                  |
| CreateTime:                   | Wed May 04 00:22:51 PDT 2016                                | NULL                  |
| LastAccessTime:               | UNKNOWN                                                     | NULL                  |
| Protect Mode:                 | None                                                        | NULL                  |
| Retention:                    | 0                                                           | NULL                  |
| Location:                     | hdfs://quickstart.cloudera:8020/user/cloudera/data/deal     | NULL                  |
| Table Type:                   | EXTERNAL_TABLE                                              | NULL                  |
| Table Parameters:             | NULL                                                        | NULL                  |
|                               | COLUMN_STATS_ACCURATE                                       | false                 |
|                               | EXTERNAL                                                    | TRUE                  |
|                               | numFiles                                                    | 0                     |
|                               | numRows                                                     | -1                    |
|                               | rawDataSize                                                 | -1                    |
|                               | totalSize                                                   | 0                     |
|                               | transient_lastDdlTime                                       | 1462346571            |
|                               | NULL                                                        | NULL                  |
| # Storage Information         | NULL                                                        | NULL                  |
| SerDe Library:                | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe          | NULL                  |
| InputFormat:                  | org.apache.hadoop.mapred.TextInputFormat                    | NULL                  |
| OutputFormat:                 | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat  | NULL                  |
| Compressed:                   | No                                                          | NULL                  |
| Num Buckets:                  | -1                                                          | NULL                  |
| Bucket Columns:               | []                                                          | NULL                  |
| Sort Columns:                 | []                                                          | NULL                  |
| Storage Desc Params:          | NULL                                                        | NULL                  |
|                               | field.delim                                                 | ,                     |
|                               | line.delim                                                  | \n                    |
|                               | serialization.format                                        | ,                     |
+-------------------------------+-------------------------------------------------------------+-----------------------+--+

0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal_hist;

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| deal_hist.deal_id  | deal_hist.name  | deal_hist.amount  | deal_hist.delete_flag  | deal_hist.lastmodifieddate  |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| 1                  | Deal 1          | 1000.0            | NULL                   | 2016-05-04 00:08:51.0       |
| 2                  | Deal 2          | 2000.0            | NULL                   | 2016-05-04 00:08:51.0       |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

Now let's make 3 changes to deal table in MySQL.

mysql>

INSERT INTO deal (name, amount) VALUES

("Deal 3", 300),

("Deal 4", 400);

mysql> UPDATE deal SET amount = 1010 WHERE deal_id = 1;

mysql> SELECT * FROM deal;

+---------+--------+--------+-------------+---------------------+
| deal_id | name   | amount | delete_flag | LastModifiedDate    |
+---------+--------+--------+-------------+---------------------+
|       1 | Deal 1 |   1010 |        NULL | 2016-05-04 00:24:43 |
|       2 | Deal 2 |   2000 |        NULL | 2016-05-04 00:08:51 |
|       3 | Deal 3 |    300 |        NULL | 2016-05-04 00:24:37 |
|       4 | Deal 4 |    400 |        NULL | 2016-05-04 00:24:37 |
+---------+--------+--------+-------------+---------------------+

$ sqoop job --exec import_deal

<Enter mysql password when prompted>

$ hdfs dfs -ls /user/cloudera/data/deal

Found 2 items

-rw-r--r-- 1 cloudera cloudera 86 2016-05-04 00:20 /user/cloudera/data/deal/part-m-00000

-rw-r--r-- 1 cloudera cloudera 127 2016-05-04 00:26 /user/cloudera/data/deal/part-m-00001

$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00000

1,Deal 1,1000.0,null,2016-05-04 00:08:51.0

2,Deal 2,2000.0,null,2016-05-04 00:08:51.0

$ hdfs dfs -cat /user/cloudera/data/deal/part-m-00001

1,Deal 1,1010.0,null,2016-05-04 00:24:43.0

3,Deal 3,300.0,null,2016-05-04 00:24:37.0

4,Deal 4,400.0,null,2016-05-04 00:24:37.0

0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal_hist;

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| deal_hist.deal_id  | deal_hist.name  | deal_hist.amount  | deal_hist.delete_flag  | deal_hist.lastmodifieddate  |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| 1                  | Deal 1          | 1000.0            | NULL                   | 2016-05-04 00:08:51.0       |
| 2                  | Deal 2          | 2000.0            | NULL                   | 2016-05-04 00:08:51.0       |
| 1                  | Deal 1          | 1010.0            | NULL                   | 2016-05-04 00:24:43.0       |
| 3                  | Deal 3          | 300.0             | NULL                   | 2016-05-04 00:24:37.0       |
| 4                  | Deal 4          | 400.0             | NULL                   | 2016-05-04 00:24:37.0       |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+

0: jdbc:hive2://localhost:10000/default>

CREATE VIEW IF NOT EXISTS deal AS SELECT t1.* FROM deal_hist t1 JOIN

(SELECT deal_id, MAX(lastmodifieddate) AS lmd FROM deal_hist GROUP BY deal_id) t2

ON t1.deal_id = t2.deal_id AND t1.lastmodifieddate = t2.lmd;

0: jdbc:hive2://localhost:10000/default> SELECT * FROM deal;

+---------------+------------+--------------+-------------------+------------------------+--+
| deal.deal_id  | deal.name  | deal.amount  | deal.delete_flag  | deal.lastmodifieddate  |
+---------------+------------+--------------+-------------------+------------------------+--+
| 1             | Deal 1     | 1010.0       | NULL              | 2016-05-04 02:32:58.0  |
| 2             | Deal 2     | 2000.0       | NULL              | 2016-05-04 02:22:03.0  |
| 3             | Deal 3     | 300.0        | NULL              | 2016-05-04 02:32:50.0  |
| 4             | Deal 4     | 400.0        | NULL              | 2016-05-04 02:32:50.0  |
+---------------+------------+--------------+-------------------+------------------------+--+

Note: This deal view should be the true reflection of the "deal" table in mysql.

In Beeline following the following instructions

0: jdbc:hive2://localhost:10000/default>

set hive.variable.substitute=true;

set old_timestamp='2016-05-04 02:22:03.0';

DROP VIEW IF EXISTS deal_old;

CREATE VIEW deal_old AS SELECT t1.* FROM deal_hist t1 JOIN

(SELECT deal_id, MAX(lastmodifieddate) AS lmd FROM deal_hist WHERE lastmodifieddate <= ${hiveconf:old_timestamp} GROUP BY deal_id) t2

ON t1.deal_id = t2.deal_id AND t1.lastmodifieddate = t2.lmd;

CREATE VIEW IF NOT EXISTS cdc_report AS SELECT

CASE

WHEN v1.deal_id IS NOT NULL THEN v1.deal_id

WHEN v2.deal_id IS NOT NULL THEN v2.deal_id

ELSE 'Error'

END AS deal_id,

CASE

WHEN v1.deal_id = v2.deal_id AND concat(v1.name, v1.amount) = concat(v2.name, v2.amount) THEN 'NoChange'

WHEN v1.deal_id = v2.deal_id AND concat(v1.name, v1.amount) <> concat(v2.name, v2.amount) THEN 'Update'

WHEN v1.deal_id is null THEN 'New'

WHEN v2.deal_id is null THEN 'Deleted'

else 'Error'

END AS cdc_codes

FROM deal_old AS v1 FULL OUTER JOIN deal AS v2

ON v1.deal_id = v2.deal_id;

SELECT * FROM cdc_report;

+---------------------+-----------------------+--+
| cdc_report.deal_id  | cdc_report.cdc_codes  |
+---------------------+-----------------------+--+
| 1                   | Update                |
| 2                   | NoChange              |
| 3                   | New                   |
| 4                   | New                   |
+---------------------+-----------------------+--+

In steady state (I mean under normal operating condition) you have to run the following two commands

$ sqoop job --exec import_deal

$ hive -f generate_cdc_report.hql --hiveconf old_timestamp="2016-05-04 02:32:50"

Sample output:

1 Update

2 NoChange

3 NoChange

4 NoChange

5 New

6 New

Underlying Historical Records:

+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| deal_hist.deal_id  | deal_hist.name  | deal_hist.amount  | deal_hist.delete_flag  | deal_hist.lastmodifieddate  |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+
| 1                  | Deal 1          | 1000.0            | NULL                   | 2016-05-04 02:22:03.0       |
| 2                  | Deal 2          | 2000.0            | NULL                   | 2016-05-04 02:22:03.0       |
| 1                  | Deal 1          | 1010.0            | NULL                   | 2016-05-04 02:32:58.0       |
| 3                  | Deal 3          | 300.0             | NULL                   | 2016-05-04 02:32:50.0       |
| 4                  | Deal 4          | 400.0             | NULL                   | 2016-05-04 02:32:50.0       |
| 1                  | Deal 1          | 1020.0            | NULL                   | 2016-05-04 03:11:31.0       |
| 5                  | Deal 5          | 400.0             | NULL                   | 2016-05-04 03:11:16.0       |
| 6                  | Deal 6          | 600.0             | NULL                   | 2016-05-04 03:11:16.0       |
+--------------------+-----------------+-------------------+------------------------+-----------------------------+--+