Oozie - Incremental Table Load Workflow

Objective:

Build a oozie workflow that would incrementally load a table from mysql and create a matching table in hive. Schedule this workflow to run every hour.

Launch mysql

$ mysql -u root -p

<cloudera is the password>

mysql> USE retail_db;

mysql> DROP TABLE IF EXISTS employee;

mysql> CREATE TABLE employee(

emp_id INT NOT NULL AUTO_INCREMENT,

name VARCHAR(100),

age INT,

delete_flag TINYINT(1),

LastModifiedDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

PRIMARY KEY (emp_id)

);

mysql> INSERT INTO employee (name, age) VALUES

("Employee 1", 34),

("Employee 2", 40);

mysql> SELECT * FROM employee;

Create a .password file containing db user's password. Here the user in mysql db is "root" and corresponding password id "cloudera".

$ echo -n "cloudera" > .password

Upload the .password to /sqoop directory in HDFS.

$ sudo -u yarn hdfs dfs -mkdir /sqoop

$ sudo -u yarn hdfs dfs -put .password /sqoop

Restrict the access only to yarn user.

$ sudo -u yarn hdfs dfs -chmod 400 /sqoop/.password

Remove the .password from the local system for security reason.

$ rm -f .password

Create a sqoop job for incremental load in append mode. Create this job under yarn user because we will ultimately build a oozie job that runs under yarn user.

$ hadoop fs -rm -r -f /staging/mysql/retail_db/employee

$ sudo -u yarn sqoop job \

--create import-employee \

-- import \

--connect jdbc:mysql://localhost:3306/retail_db \

--username root \

--password-file /sqoop/.password \

--table employee \

--target-dir '/staging/mysql/retail_db/employee' \

--incremental lastmodified \

--check-column LastModifiedDate \

--append \

--driver com.mysql.jdbc.Driver \

-m 1

View the existing jobs under yarn user.

$ sudo -u yarn sqoop job --list

You can test the sqoop job by running

$ sudo -u yarn sqoop job --exec import-employee

Once test is successful delete the sqoop job and delete the HDFS target directory.

$ sudo -u yarn sqoop job --delete import-employee

$ hadoop fs -rm -r /staging/mysql/retail_db/employee

And re-create the sqoop job import-employee as earlier.

Verify that the sqoop job is visible as yarn user

$ sudo -u yarn sqoop job --list

Open hive terminal and create the following two tables

hive> drop table if exists employee_staging;

create external table employee_staging(

id string,

name string,

age int,

deleted boolean,

lmd string

)

row format delimited

fields terminated by ','

stored as textfile

location '/staging/mysql/retail_db/employee';

drop table if exists employee;

create table employee like employee_staging stored as parquet;

Create a employee.hql file with the following hive statement. Upload this script to /user/cloudera/scripts directory in HDFS. This script will be used in the Oozie workflow.

hive> insert overwrite table employee

select id, name, age, deleted, lmd from

(select

*,

row_number() over (partition by id order by lmd desc) r

from employee_staging) t

where r = 1;

Now open hue Oozie workflow editor, http://localhost:8888/oozie/editor/workflow/list/ and create a new workflow, name it "Build Employee Table".

Add the following properties Oozie Workflow:

oozie.use.system.libpath=true

oozie.libpath=${nameNode}/user/oozie/share/lib

Open the Oozie Workflow Workspace (it is a folder in HDFS that contains job.properties, workflow.xml files and lib folder. In the lib folder upload the mysql-connector-java-5.1.34-bin.jar file that you can generally find in /usr/share/java directory. Workspace directory (HDFS) for Oozie workflow looks something like this - /user/hue/oozie/workspaces/hue-oozie-1495733114.84.

Now, in the Workflow, add the following 2 actions: See the screenshot.

A. Sqoop job "job --exec import-employee"

B. Hive 2 job "/user/cloudera/scripts/employee.hql"

Save the workflow and run. Once finished successfully, you can open impala or hive and run the queries on employee table.

Insert new records in employee table in mysql and run the workflow again. Verify that hive employee table is showing the exact same view as as employee table in mysql.

Schedule the Job to Run Every 5 mins

In Hue, go to Workflows > Editors > Workflows.

Select the workflow you created and click on Schedule.

Click on Save and Submit. You incremental import job should now run every 5 mins.