Scheduling Job Using Oozie
Apache Oozie
Oozie is a workflow scheduler system to manage Apache Hadoop jobs.
Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions.
Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability.
Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop and Distcp) as well as system specific jobs (such as Java programs and shell scripts).
Oozie is a scalable, reliable and extensible system.
This blog describes in detail additional benefits of using Oozie. Here are the most important:
Oozie has built-in Hadoop actions, making Hadoop workflow development, maintenance, and troubleshooting easy.
Oozie is well-integrated with Hadoop security and Kerberos.
With the Oozie UI it’s easier to drill down to specific errors in the data nodes. Other systems require significantly more work to correlate YARN jobs with the workflow actions.
Oozie has been proven to scale in some of the world’s largest clusters.
Oozie coordinator supports data dependency, so it allows triggering actions whenever files arrive in HDFS.
Oozie Commands
https://oozie.apache.org/docs/3.1.3-incubating/DG_CommandLineTool.html
Check status of Oozie
$ sudo service oozie status
To make it convenient to use this utility, set the environment variable OOZIE_URL to point to the URL of the Oozie server. Then you can skip the -oozie option.
$ export OOZIE_URL=http://localhost:11000/oozie/
Check oozie version
$ oozie admin -version
Check Oozie system status. If system mode = NORMAL, then go ahead with rest of instructions, otherwise, restart the Oozie service.
$ oozie admin -status
Run a MapReduce Job using Oozie
Create a Oozie project folder oozie-mr-wordcount
$ mkdir oozie-mr-wordcount
$ cd oozie-mr-wordcount
Create a simple word count project for mapreduce. Find the details here. Create jar of the project and copy that inside lib dir of the oozie project.
$ mkdir lib
$ cp ~/workspace/WordCount/target/WordCount-0.0.1-SNAPSHOT.jar lib/
Inspect namespace within the jar
$ jar -tf lib/WordCount-0.0.1-SNAPSHOT.jar
Create a oozie workflow file with the following content. Make sure the class names match with those in above jar tf output.
$ vi workflow.xml
<?xml version="1.0"?>
<workflow-app xmlns="uri:oozie:workflow:0.1" name="hadoop-example-wordcount">
<start to="wordcount-node"/>
<action name="wordcount-node">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>WordCount$Map</value>
</property>
<property>
<name>mapreduce.combine.class</name>
<value>WordCount$Reduce</value>
</property>
<property>
<name>mapreduce.reducer.class</name>
<value>WordCount$Reduce</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
Do syntax validation of workflow.xml
$ oozie validate workflow.xml
Create a job.properties file
$ vi job.properties
nameNode=hdfs://localhost:8020
jobTracker=localhost:8032
queueName=default
baseDir=${nameNode}/user/${user.name}/oozie-mr-wordcount
oozie.wf.application.path=${baseDir}
oozie.libpath=${baseDir}/lib
inputDir=${nameNode}/user/${user.name}/input
outputDir=${nameNode}/user/${user.name}/output
Upload the entire directory to HDFS
$ hadoop fs -put -f .
Create input path and put a sample file to process. Note that in the configuration file input path has been specified in the job.properties file.
$ hadoop fs -mkdir input
$ hadoop fs -put /etc/passwd input
$ hadoop fs -ls input/passwd
Submit and run Oozie Job
$ oozie job --config job.properties -run
Check job info
$ oozie job -info <job id e.g. 0000014-161015194146192-oozie-oozi-W>
Check Oozie log for the job
$ oozie job -log <job id e.g. 0000014-161015194146192-oozie-oozi-W>
Chain Two Actions
Define a workflow
<workflow-app xmlns="uri:oozie:workflow:0.1" name="sorted-word-count">
<start to="wordcount-node"/>
<action name="wordcount-node">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${wordCountOutputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>WordCount$Map</value>
</property>
<property>
<name>mapreduce.combine.class</name>
<value>WordCount$Reduce</value>
</property>
<property>
<name>mapreduce.reducer.class</name>
<value>WordCount$Reduce</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${wordCountOutputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to="sort-node"/>
<error to="fail"/>
</action>
<action name="sort-node">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>SwapKeyValue$Swap</value>
</property>
<property>
<name>mapred.output.key.comparator.class</name>
<value>org.apache.hadoop.io.LongWritable$DecreasingComparator</value>
</property>
<property>
<name>mapred.mapoutput.key.class</name>
<value>org.apache.hadoop.io.LongWritable</value>
</property>
<property>
<name>mapred.mapoutput.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${wordCountOutputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
Define the job.properties
nameNode=hdfs://localhost:8020
jobTracker=localhost:8032
queueName=default
baseDir=${nameNode}/user/${user.name}/oozie-mr-wordcount-sort
oozie.wf.application.path=${baseDir}
oozie.libpath=${baseDir}/lib
inputDir=${nameNode}/user/${user.name}/input
wordCountOutputDir=${nameNode}/user/${user.name}/wordCountOutputDir
outputDir=${nameNode}/user/${user.name}/output_sort
Copy libraries for WordCount and SwapKeyValue inside lib directory.
Submit and run Oozie Job
$ oozie job --config job.properties -run
Create Oozie job to convert .csv to ORC
Create the following script, scripts/create-stocks-table.hql
Create external table stocks(
date String,
open Double,
high Double,
low Double,
close Double,
volume Double,
adjClose Double,
symbol String)
row format delimited
fields terminated by ','
stored as textfile
location '/user/cloudera/stocks';
Create the following script, scripts/create-stocks-orc-table.hql
Create Table stocks_orc(
date String,
open Double,
high Double,
low Double,
close Double,
volume Double,
adjClose Double,
symbol String
)
STORED AS ORC;
Create the following script scripts/ load-stocks-orc-table.hql
insert into table stocks_orc select * from stocks;
Create the following workflow script stocks_workflow.xml
<workflow-app xmlns = "uri:oozie:workflow:0.4" name = "Data-conversion-Workflow">
<start to = "fork_node" />
<!-- Fork will allow to run the commands in parallel. Fork is followed by join. -->
<fork name = "fork_node">
<path start = "Create_External_Table"/>
<path start = "Create_orc_Table"/>
</fork>
<!-- Step 1 -->
<action name = "Create_External_Table">
<hive xmlns = "uri:oozie:hive-action:0.4">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>${baseDir}/scripts/create-stocks-table.hql</script>
</hive>
<ok to = "join_node" />
<error to = "kill_job" />
</action>
<!-- Step 2 -->
<action name = "Create_orc_Table">
<hive xmlns = "uri:oozie:hive-action:0.4">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>${baseDir}/scripts/create-stocks-orc-table.hql</script>
</hive>
<ok to = "join_node" />
<error to = "kill_job" />
</action>
<join name = "join_node" to = "Insert_into_Table"/>
<!-- Step 3 -->
<action name = "Insert_into_Table">
<hive xmlns = "uri:oozie:hive-action:0.4">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>${baseDir}/scripts/load-stocks-orc-table.hql</script>
</hive>
<ok to = "end" />
<error to = "kill_job" />
</action>
<kill name = "kill_job">
<message>Job failed</message>
</kill>
<end name = "end" />
</workflow-app>
Create job.properties
nameNode=hdfs://localhost:8020
jobTracker=localhost:8032
queueName=default
baseDir=${nameNode}/user/${user.name}/oozie-stocks
oozie.wf.application.path=${baseDir}
oozie.use.system.libpath=true
oozie.libpath=${nameNode}/user/oozie/share/lib
Validate the directory structure
.
├── job.properties
├── scripts
│ ├── create-stocks-orc-table.hql
│ ├── create-stocks-table.hql
│ └── load-stocks-orc-table.hql
└── workflow.xml
Validate the workflow.xml
$ oozie validate workflow.xml
Run the Oozie job
$ oozie job --config job.properties -run
Check status of the job
$ oozie job -info <job id, e.g. 0000000-161002075917748-oozie-oozi-W>
Check log of the job
$ oozie job -log <job id, e.g. 0000001-161002075917748-oozie-oozi-W>
Kill Oozie job
$ oozie job -kill <job id 0000000-161002075917748-oozie-oozi-W>
Rerun the Oozie job
$ oozie job -rerun 0000000-161002075917748-oozie-oozi-W -D oozie.wf.rerun.failnodes