...
Follow the instructions on Hadoop MapReduce Tutorial to run a simple MapReduce application (wordcount). This involves invoking the hadoop commands to submit the mapreduce job by specifying various commandline options.
Running a MapReduce application using Oozie
...
Hadoop Map-Reduce Job Submission
Code Block |
---|
$ hadoop jar /usr/ninja/wordcount.jar org.myorg.WordCount -Dmapred.job.queue.name=queue_name /usr/ninja/wordcount/input /usr/ninja/wordcount/output
|
...
This file is present locally on the node from which the job is submitted (either a local machine or the gateway node). Its main purpose is to specify essential parameters needed for running the workflow. One mandatory property to be specified is oozie.wf.application.path that points to the location of the HDFS where workflow.xml
exists. In addition, definition for all those variables used in workflow.xml (eg: $jobTracker, $inputDir
, ..) can be added here. For some specific versions of Hadoop, additional authentication parametersPROVIDE LINK might also need to be defined. For our example, this file looks like below:
Code Block |
---|
nameNode=hdfs://localhost:9000 # or use a remote-server url. eg: hdfs://abc.xyz.yahoo.com:8020
jobTracker=localhost:9001 # or use a remote-server url. eg: abc.xyz.yahoo.com:50300
queueName=default
examplesRoot=map-reduce
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}
inputDir=input-data
outputDir=map-reduce
|
...
This file defines the workflow for the particular job as a set of actions. For our example, this file looks like below:
Code Block | ||||
---|---|---|---|---|
| ||||
<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.2">
<start to='wordcount'/>
<action name='wordcount'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.mapper.class</name>
<value>org.myorg.WordCount.Map</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.myorg.WordCount.Reduce</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to='end'/>
<error to='end'/>
</action>
<kill name='kill'>
<value>${wf:errorCode("wordcount")}</value>
</kill/>
<end name='end'/>
</workflow-app>
|
...
1. Follow the instructions on Quick Start to setup Oozie with hadoop and ensure that Oozie service is started.
2. Create a directory in your home to store all your workflow components (properties, workflow XML and the libraries). Inside, this workflow directory, create a sub-directory called lib/
.
Code Block |
---|
$ cd ~
$ mkdir map-reduce
$ mkdir map-reduce/lib
|
...
Tip: The following Oozie command line option can be used to perform XML schema validation on the workflow XML file and return errors if any:
Code Block |
---|
$ oozie validate ~/map_reduce/workflow.xml
|
4. Your job.properties file should will look like the following:
Code Block |
---|
$ cat ~/map_reduce/job.properties
nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default
examplesRoot=map-reduce
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}
inputDir=input-data
outputDir=map-reduce
|
5. Copy the wordcount.jar file into the workflow/lib directory. Now, your workflow directory should have contents as below:
Code Block |
---|
job.properties
workflow.xml
lib/
lib/wordcount.jar
|
...
6. Copy the workflow directory to HDFS. Please note that if a directory by this name already exists in HDFS, it might need to be deleted prior to copying.
Code Block |
---|
$ hadoop fs -put ~/map-reduce map-reduce
|
7. Run the following Oozie command to submit your workflow. Once the workflow is submitted, Oozie server returns the workflow ID which can be used for monitoring and debugging purposes.
Code Block |
---|
$ oozie job -oozie http://localhost:4080/oozie/ -config ~/map-reduce/job.properties -run
...
...
job: 14-20090525161321-oozie-ninj
|
...
8. Check status of the submitted MapReduce workflow job. The following command displays a detailed breakdown of the workflow job submission.
Code Block |
---|
$ oozie job -info 14-20090525161321-oozie-ninj -oozie http://localhost:4080/oozie/
...
...
.---------------------------------------------------------------------------------------------------------------------------------------------------------------
Workflow Name : wordcount-wf
App Path : hdfs://localhost:4080/user/ninja/map-reduce
Status : SUCCEEDED
Run : 0
User : ninja
Group : users
Created : 2011-09-21 05:01 +0000
Started : 2011-09-21 05:01 +0000
Ended : 2011-09-21 05:01 +0000
Actions
.----------------------------------------------------------------------------------------------------------------------------------------------------------------
Action Name Type Status Transition External Id External Status Error Code Start Time End Time
.----------------------------------------------------------------------------------------------------------------------------------------------------------------
wordcount map-reduce OK end job_200904281535_0254 SUCCEEDED - 2011-09-21 05:01 +0000 2011-09-21 05:01 +0000
.----------------------------------------------------------------------------------------------------------------------------------------------------------------
|
...
in job.properties:
Code Block |
---|
VAR=job-properties
|
in config-default.xml:
Code Block | ||||
---|---|---|---|---|
| ||||
<property><name>VAR</name><value>config-default-xml</value></property>
|
in workflow.xml:
Code Block | ||||
---|---|---|---|---|
| ||||
<property><name>VARIABLE</name><value>${VAR}</value></property>
|
...
in job.properties:
Code Block |
---|
variable4=job-properties
|
in config-default.xml:
Code Block | ||||
---|---|---|---|---|
| ||||
<property><name>variable4</name><value>config-default-xml</value></property>
|
in workflow.xml:
Code Block | ||||
---|---|---|---|---|
| ||||
<job-xml>mr3-job.xml</job-xml>
... ...
<property><name>variable4</name><value>grideng</value></property>
|
in mr3-job.xml:
Code Block | ||||
---|---|---|---|---|
| ||||
<property><name>mapred.job.queue.name</name><value>${variable4}</value></property>
|
...
in workflow.xml:
Code Block | ||||
---|---|---|---|---|
| ||||
<job-xml>mr3-job.xml</job-xml>
... ...
<property><name>mapred.job.queue.name</name><value>grideng</value></property>
|
in mr3-job.xml:
Code Block | ||||
---|---|---|---|---|
| ||||
<property><name>mapred.job.queue.name</name><value>bogus</value></property>
|
...
in job.properties:
Code Block |
---|
nameNode=hdfs://abc.xyz.yahoo.com:8020
outputDir=output-allactions
|
in workflow.xml:
Code Block | ||||
---|---|---|---|---|
| ||||
<case to="end">${fs:exists(concat(concat(concat(concat(concat(nameNode,"/user/"),wf:user()),"/"),wf:conf("outputDir")),"/streaming/part-00000")) and (fs:fileSize(concat(concat(concat(concat(concat(nameNode,"/user/"),wf:user()),"/"),wf:conf("outputDir")),"/streaming/part-00000")) gt 0) == "true"}</case>
|
...
- change mapred.mapper.class to mapreduce.map.class
- change mapred.reducer.class to mapreduce.reduce.class
- add mapred.output.key.class
- add mapred.output.value.class
and, include the following property into MR action configuration
Code Block xml xml <property> <name>mapred.reducer.new-api</name> <value>true</value> </property> <property> <name>mapred.mapper.new-api</name> <value>true</value> </property>
The changes to be made in workflow.xml file are highlighted below:
Code Block | ||||
---|---|---|---|---|
| ||||
<map-reduce xmlns="uri:oozie:workflow:0.2">
<job-tracker>abc.xyz.yahoo.com:50300</job-tracker>
<name-node>hdfs://abc.xyz.yahoo.com:8020</name-node>
<prepare>
<delete path="hdfs://abc.xyz.yahoo.com:8020/user/ninja/yoozie_test/output-mr20-fail" />
</prepare>
<configuration>
<!-- BEGIN: SNIPPET TO ADD IN ORDER TO MAKE USE OF HADOOP 20 API -->
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<!-- END: SNIPPET -->
<property>
<name>mapreduce.map.class</name>
<value>org.myorg.WordCount$TokenizerMapper</value>
</property>
<property>
<name>mapreduce.reduce.class</name>
<value>org.myorg.WordCount$IntSumReducer</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapred.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapred.map.tasks</name>
<value>1</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>/user/ninja/yoozie_test/input-data</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/user/ninja/yoozie_test/output-mr20/mapRed20</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>grideng</value>
</property>
<property>
<name>mapreduce.job.acl-view-job</name>
<value>*</value>
</property>
<property>
<name>oozie.launcher.mapreduce.job.acl-view-job</name>
<value>*</value>
</property>
</configuration>
</map-reduce>
|
...
The changes made in the workflow.xml are highlighted below:
Code Block | ||||
---|---|---|---|---|
| ||||
<workflow-app xmlns='uri:oozie:workflow:0.2' name='java-wf'>
<start to='mr1' />
<action name='mr1'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.mapper.class</name>
<value>org.myorg.SampleMapper</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.myorg.SampleReducer</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}/streaming-output</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx1024M</value>
</property>
</configuration>
</map-reduce>
<ok to="java1" />
<error to="fail" />
</action>
<action name='java1'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>org.myorg.MyTest</main-class>
<!-- BEGIN: SNIPPET TO ADD TO ACCESS HADOOP COUNTERS DEFINED IN PREVIOUS ACTIONS -->
<arg>${hadoop:counters("mr1")["COMMON"]["COMMON.ERROR_ACCESS_DH_FILES"]}</arg>
<!-- END: SNIPPET TO ADD -->
<capture-output/>
</java>
<ok to="pig1" />
<error to="fail" />
</action>
<kill name="fail">
<value>${wf:errorCode("wordcount")}</value>
</kill>
<end name='end' />
</workflow-app>
|
...
A property mapred.child.java.opts
can be defined in workflow.xml as below:
Code Block | ||||
---|---|---|---|---|
| ||||
<workflow-app xmlns='uri:oozie:workflow:0.2' name='streaming-wf'>
<start to='streaming1' />
<action name='streaming1'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<streaming>
<mapper>/bin/cat</mapper>
<reducer>/usr/bin/wc</reducer>
</streaming>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}/streaming-output</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<!-- BEGIN: SNIPPET TO ADD TO INCREASE MEMORY FOR THE HADOOP JOB-->
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx1024M</value>
</property>
<!-- END: SNIPPET TO ADD -->
</configuration>
</map-reduce>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<value>${wf:errorCode("wordcount")}</value>
</kill>
<end name='end' />
</workflow-app>
|
...
In order to define and use a custom input format in the map-reduce action, the property mapred.input.format.class
needs to be included in the workflow.xml as highlighted below:
Code Block | ||||
---|---|---|---|---|
| ||||
<workflow-app xmlns='uri:oozie:workflow:0.1' name='streaming-wf'>
<start to='streaming1' />
<action name='streaming1'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<streaming>
<mapper>/bin/cat</mapper>
<reducer>/usr/bin/wc</reducer>
</streaming>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}/streaming-output</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<!-- BEGIN: SNIPPET TO ADD TO DEFINE A CUSTOM INPUT FORMAT -->
<property>
<name>mapred.input.format.class</name>
<value>com.yahoo.ymail.antispam.featurelibrary.TextInputFormat</value>
</property>
<!-- END: SNIPPET TO ADD -->
</configuration>
</map-reduce>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<value>${wf:errorCode("wordcount")}</value>
</kill>
<end name='end' />
</workflow-app>
|
NOTE: It should be noted that the jar file containing the custom input format class must be placed in the workflow's lib/ directory.
CASE-6: HOW TO SPECIFY SYMBOLIC LINKS FOR FILES AND ARCHIVES
MapReduce applications can specify symbolic names for files and archives passed through the options files
and “archives
using # such as below:
Code Block |
---|
$ hadoop jar hadoop-examples.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output
|
...
Oozie supports these by allowing <file>
and <archive>
tags that can be defined in the workflow.xml as below:
Code Block | ||||
---|---|---|---|---|
| ||||
<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.2">
<start to='wordcount'/>
<action name='wordcount'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="hdfs://abc.xyz.yahoo.com:8020/user/ninja/test/output" />
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.mapper.class</name>
<value>org.myorg.WordCount.Map</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.myorg.WordCount.Reduce</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}</value>
</property>
</configuration>
<!-- BEGIN: SNIPPET TO ADD TO DEFINE FILE/ARCHIVE TAGS -->
<file>testdir1/dict.txt#dict1</file>
<archive>testtar.tgz#tgzdir</archive>
<!-- END: SNIPPET TO ADD -->
</map-reduce>
<ok to='end'/>
<error to='end'/>
</action>
<kill name='kill'>
<value>${wf:errorCode("wordcount")}</value>
</kill/>
<end name='end'/>
</workflow-app>
|
...
Hadoop Streaming allows the user to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer (instead of providing the mapper and reducer as conventional java classes). The commandline way of launching such a Hadoop MapReduce streaming job is as follows:
Code Block |
---|
$ hadoop jar lib/hadoop-streaming-0.20.1.3006291003.jar -D mapred.job.queue.name=unfunded -input /user/ninja/input-data -output /user/ninja/output-dir -mapper /bin/cat -reducer /usr/bin/wc
|
In order to accomplish the same using Oozie, the following <streaming>
element needs to be included inside the <map-reduce>
action.
Code Block | ||||
---|---|---|---|---|
| ||||
<streaming>
<mapper>/bin/cat</mapper>
<reducer>/usr/bin/wc</reducer>
</streaming>
|
The complete workflow.xml file looks like below with the highlighted addition:
Code Block | ||||
---|---|---|---|---|
| ||||
<workflow-app xmlns='uri:oozie:workflow:0.2' name='streaming-wf'>
<start to='streaming1' />
<action name='streaming1'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<!-- BEGIN: SNIPPET TO ADD FOR HADOOP STREAMING ACTION -->
<streaming>
<mapper>/bin/cat</mapper>
<reducer>/usr/bin/wc</reducer>
</streaming>
<!-- END: SNIPPET TO ADD -->
<configuration>
<property>
<name>mapred.input.dir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${outputDir}/streaming-output</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx1024M</value>
</property>
</configuration>
</map-reduce>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<value>${wf:errorCode("wordcount")}</value>
</kill>
<end name='end' />
</workflow-app>
|
...