Scheduling Job Using Oozie

Apache Oozie

  • Oozie is a workflow scheduler system to manage Apache Hadoop jobs.
  • Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions.
  • Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability.
  • Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop and Distcp) as well as system specific jobs (such as Java programs and shell scripts).
  • Oozie is a scalable, reliable and extensible system.

This blog describes in detail additional benefits of using Oozie. Here are the most important:

  • Oozie has built-in Hadoop actions, making Hadoop workflow development, maintenance, and troubleshooting easy.
  • Oozie is well-integrated with Hadoop security and Kerberos.
  • With the Oozie UI it’s easier to drill down to specific errors in the data nodes. Other systems require significantly more work to correlate YARN jobs with the workflow actions.
  • Oozie has been proven to scale in some of the world’s largest clusters.
  • Oozie coordinator supports data dependency, so it allows triggering actions whenever files arrive in HDFS.

Oozie Commands

https://oozie.apache.org/docs/3.1.3-incubating/DG_CommandLineTool.html

Check status of Oozie

$ sudo service oozie status

To make it convenient to use this utility, set the environment variable OOZIE_URL to point to the URL of the Oozie server. Then you can skip the -oozie option.

$ export OOZIE_URL=http://localhost:11000/oozie/

Check oozie version

$ oozie admin -version

Check Oozie system status. If system mode = NORMAL, then go ahead with rest of instructions, otherwise, restart the Oozie service.

$ oozie admin -status

Run a MapReduce Job using Oozie

Create a Oozie project folder oozie-mr-wordcount

$ mkdir oozie-mr-wordcount
$ cd oozie-mr-wordcount

Create a simple word count project for mapreduce. Find the details here. Create jar of the project and copy that inside lib dir of the oozie project.

$ mkdir lib
$ cp ~/workspace/WordCount/target/WordCount-0.0.1-SNAPSHOT.jar lib/

Inspect namespace within the jar

$ jar -tf  lib/WordCount-0.0.1-SNAPSHOT.jar

Create a oozie workflow file with the following content. Make sure the class names match with those in above jar tf output.

$ vi workflow.xml
<?xml version="1.0"?>
<workflow-app xmlns="uri:oozie:workflow:0.1" name="hadoop-example-wordcount">
    <start to="wordcount-node"/>
    <action name="wordcount-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.map.class</name>
                    <value>WordCount$Map</value>
                </property>
                <property>
                    <name>mapreduce.combine.class</name>
                    <value>WordCount$Reduce</value>
                </property>
                <property>
                    <name>mapreduce.reducer.class</name>
                    <value>WordCount$Reduce</value>
                </property>
                <property>
                    <name>mapred.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

Do syntax validation of workflow.xml

$ oozie validate workflow.xml 

Create a job.properties file

$ vi job.properties
nameNode=hdfs://localhost:8020
jobTracker=localhost:8032
queueName=default
baseDir=${nameNode}/user/${user.name}/oozie-mr-wordcount
oozie.wf.application.path=${baseDir}
oozie.libpath=${baseDir}/lib
inputDir=${nameNode}/user/${user.name}/input
outputDir=${nameNode}/user/${user.name}/output

Upload the entire directory to HDFS

$ hadoop fs -put -f . 

Create input path and put a sample file to process. Note that in the configuration file input path has been specified in the job.properties file.

$ hadoop fs -mkdir input
$ hadoop fs -put /etc/passwd input
$ hadoop fs -ls input/passwd

Submit and run Oozie Job

$ oozie job --config job.properties -run 

Check job info

$ oozie job -info <job id e.g. 0000014-161015194146192-oozie-oozi-W>

Check Oozie log for the job

$ oozie job -log <job id e.g. 0000014-161015194146192-oozie-oozi-W>

Chain Two Actions

Define a workflow

<workflow-app xmlns="uri:oozie:workflow:0.1" name="sorted-word-count">
    <start to="wordcount-node"/>
    <action name="wordcount-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${wordCountOutputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.map.class</name>
                    <value>WordCount$Map</value>
                </property>
                <property>
                    <name>mapreduce.combine.class</name>
                    <value>WordCount$Reduce</value>
                </property>
                <property>
                    <name>mapreduce.reducer.class</name>
                    <value>WordCount$Reduce</value>
                </property>
                <property>
                    <name>mapred.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${wordCountOutputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="sort-node"/>
        <error to="fail"/>
    </action>
    <action name="sort-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.map.class</name>
                    <value>SwapKeyValue$Swap</value>
                </property>
                <property>
                    <name>mapred.output.key.comparator.class</name>
                    <value>org.apache.hadoop.io.LongWritable$DecreasingComparator</value>
                </property>
                <property>
                    <name>mapred.mapoutput.key.class</name>
                    <value>org.apache.hadoop.io.LongWritable</value>
                </property>
                <property>
                    <name>mapred.mapoutput.value.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${wordCountOutputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

Define the job.properties

nameNode=hdfs://localhost:8020
jobTracker=localhost:8032
queueName=default
baseDir=${nameNode}/user/${user.name}/oozie-mr-wordcount-sort
oozie.wf.application.path=${baseDir}
oozie.libpath=${baseDir}/lib
inputDir=${nameNode}/user/${user.name}/input
wordCountOutputDir=${nameNode}/user/${user.name}/wordCountOutputDir
outputDir=${nameNode}/user/${user.name}/output_sort

Copy libraries for WordCount and SwapKeyValue inside lib directory.

Submit and run Oozie Job

$ oozie job --config job.properties -run 

Create Oozie job to convert .csv to ORC

Create the following script, scripts/create-stocks-table.hql

Create external table stocks(
    date String, 
    open Double, 
    high Double, 
    low Double, 
    close Double, 
    volume Double, 
    adjClose Double, 
    symbol String)
row format delimited
fields terminated by ','
stored as textfile
location '/user/cloudera/stocks';

Create the following script, scripts/create-stocks-orc-table.hql

Create Table stocks_orc(
    date String, 
    open Double, 
    high Double, 
    low Double, 
    close Double, 
    volume Double, 
    adjClose Double, 
    symbol String
    )
STORED AS ORC;

Create the following script scripts/ load-stocks-orc-table.hql

insert into table stocks_orc select * from stocks;

Create the following workflow script stocks_workflow.xml

<workflow-app xmlns = "uri:oozie:workflow:0.4" name = "Data-conversion-Workflow">
    <start to = "fork_node" />
   <!-- Fork will allow to run the commands in parallel. Fork is followed by join. -->        
   <fork name = "fork_node">
      <path start = "Create_External_Table"/>
      <path start = "Create_orc_Table"/>
   </fork>
   <!-- Step 1 -->
   <action name = "Create_External_Table">
      <hive xmlns = "uri:oozie:hive-action:0.4">
         <job-tracker>${jobTracker}</job-tracker>
         <name-node>${nameNode}</name-node>
         <script>${baseDir}/scripts/create-stocks-table.hql</script>
      </hive>
      <ok to = "join_node" />
      <error to = "kill_job" />
   </action>
   <!-- Step 2 -->
   <action name = "Create_orc_Table">
      <hive xmlns = "uri:oozie:hive-action:0.4">
         <job-tracker>${jobTracker}</job-tracker>
         <name-node>${nameNode}</name-node>
         <script>${baseDir}/scripts/create-stocks-orc-table.hql</script>
      </hive>
      <ok to = "join_node" />
      <error to = "kill_job" />
   </action>
   <join name = "join_node" to = "Insert_into_Table"/>
   
   <!-- Step 3 -->
   <action name = "Insert_into_Table">
      <hive xmlns = "uri:oozie:hive-action:0.4">
         <job-tracker>${jobTracker}</job-tracker>
         <name-node>${nameNode}</name-node>
         <script>${baseDir}/scripts/load-stocks-orc-table.hql</script>
      </hive>
      <ok to = "end" />
      <error to = "kill_job" />
   </action>
   
   <kill name = "kill_job">
      <message>Job failed</message>
   </kill>
   <end name = "end" />
</workflow-app>

Create job.properties

nameNode=hdfs://localhost:8020
jobTracker=localhost:8032
queueName=default
baseDir=${nameNode}/user/${user.name}/oozie-stocks
oozie.wf.application.path=${baseDir}
oozie.use.system.libpath=true
oozie.libpath=${nameNode}/user/oozie/share/lib

Validate the directory structure

.
├── job.properties
├── scripts
│   ├── create-stocks-orc-table.hql
│   ├── create-stocks-table.hql
│   └── load-stocks-orc-table.hql
└── workflow.xml

Validate the workflow.xml

$ oozie validate workflow.xml

Run the Oozie job

$ oozie job --config job.properties -run

Check status of the job

$ oozie job -info <job id, e.g. 0000000-161002075917748-oozie-oozi-W>

Check log of the job

$ oozie job -log <job id, e.g. 0000001-161002075917748-oozie-oozi-W>

Kill Oozie job

$ oozie job -kill <job id 0000000-161002075917748-oozie-oozi-W>

Rerun the Oozie job

$ oozie job -rerun 0000000-161002075917748-oozie-oozi-W -D oozie.wf.rerun.failnodes