Oozie - Incremental Table Load Workflow
Objective:
Build a oozie workflow that would incrementally load a table from mysql and create a matching table in hive. Schedule this workflow to run every hour.
Launch mysql
$ mysql -u root -p
<cloudera is the password>
mysql> USE retail_db;
mysql> DROP TABLE IF EXISTS employee;
mysql> CREATE TABLE employee(
emp_id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(100),
age INT,
delete_flag TINYINT(1),
LastModifiedDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (emp_id)
);
mysql> INSERT INTO employee (name, age) VALUES
("Employee 1", 34),
("Employee 2", 40);
mysql> SELECT * FROM employee;
Create a .password file containing db user's password. Here the user in mysql db is "root" and corresponding password id "cloudera".
$ echo -n "cloudera" > .password
Upload the .password to /sqoop directory in HDFS.
$ sudo -u yarn hdfs dfs -mkdir /sqoop
$ sudo -u yarn hdfs dfs -put .password /sqoop
Restrict the access only to yarn user.
$ sudo -u yarn hdfs dfs -chmod 400 /sqoop/.password
Remove the .password from the local system for security reason.
$ rm -f .password
Create a sqoop job for incremental load in append mode. Create this job under yarn user because we will ultimately build a oozie job that runs under yarn user.
$ hadoop fs -rm -r -f /staging/mysql/retail_db/employee
$ sudo -u yarn sqoop job \
--create import-employee \
-- import \
--connect jdbc:mysql://localhost:3306/retail_db \
--username root \
--password-file /sqoop/.password \
--table employee \
--target-dir '/staging/mysql/retail_db/employee' \
--incremental lastmodified \
--check-column LastModifiedDate \
--append \
--driver com.mysql.jdbc.Driver \
-m 1
View the existing jobs under yarn user.
$ sudo -u yarn sqoop job --list
You can test the sqoop job by running
$ sudo -u yarn sqoop job --exec import-employee
Once test is successful delete the sqoop job and delete the HDFS target directory.
$ sudo -u yarn sqoop job --delete import-employee
$ hadoop fs -rm -r /staging/mysql/retail_db/employee
And re-create the sqoop job import-employee as earlier.
Verify that the sqoop job is visible as yarn user
$ sudo -u yarn sqoop job --list
Open hive terminal and create the following two tables
hive> drop table if exists employee_staging;
create external table employee_staging(
id string,
name string,
age int,
deleted boolean,
lmd string
)
row format delimited
fields terminated by ','
stored as textfile
location '/staging/mysql/retail_db/employee';
drop table if exists employee;
create table employee like employee_staging stored as parquet;
Create a employee.hql file with the following hive statement. Upload this script to /user/cloudera/scripts directory in HDFS. This script will be used in the Oozie workflow.
hive> insert overwrite table employee
select id, name, age, deleted, lmd from
(select
*,
row_number() over (partition by id order by lmd desc) r
from employee_staging) t
where r = 1;
Now open hue Oozie workflow editor, http://localhost:8888/oozie/editor/workflow/list/ and create a new workflow, name it "Build Employee Table".
Add the following properties Oozie Workflow:
oozie.use.system.libpath=true
oozie.libpath=${nameNode}/user/oozie/share/lib
Open the Oozie Workflow Workspace (it is a folder in HDFS that contains job.properties, workflow.xml files and lib folder. In the lib folder upload the mysql-connector-java-5.1.34-bin.jar file that you can generally find in /usr/share/java directory. Workspace directory (HDFS) for Oozie workflow looks something like this - /user/hue/oozie/workspaces/hue-oozie-1495733114.84.
Now, in the Workflow, add the following 2 actions: See the screenshot.
A. Sqoop job "job --exec import-employee"
B. Hive 2 job "/user/cloudera/scripts/employee.hql"
Save the workflow and run. Once finished successfully, you can open impala or hive and run the queries on employee table.
Insert new records in employee table in mysql and run the workflow again. Verify that hive employee table is showing the exact same view as as employee table in mysql.
Schedule the Job to Run Every 5 mins
In Hue, go to Workflows > Editors > Workflows.
Select the workflow you created and click on Schedule.
Click on Save and Submit. You incremental import job should now run every 5 mins.