Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Follow the instructions on Hadoop MapReduce Tutorial to run a simple MapReduce application (wordcount). This involves invoking the hadoop commands to submit the mapreduce job by specifying various commandline options.

Running a MapReduce application using Oozie

...

Hadoop Map-Reduce Job Submission

Code Block

$ hadoop jar /usr/ninja/wordcount.jar org.myorg.WordCount -Dmapred.job.queue.name=queue_name /usr/ninja/wordcount/input /usr/ninja/wordcount/output

...

This file is present locally on the node from which the job is submitted (either a local machine or the gateway node). Its main purpose is to specify essential parameters needed for running the workflow. One mandatory property to be specified is oozie.wf.application.path that points to the location of the HDFS where workflow.xml exists. In addition, definition for all those variables used in workflow.xml (eg: $jobTracker, $inputDir, ..) can be added here. For some specific versions of Hadoop, additional authentication parametersPROVIDE LINK might also need to be defined. For our example, this file looks like below:

Code Block

nameNode=hdfs://localhost:9000    # or use a remote-server url. eg: hdfs://abc.xyz.yahoo.com:8020
jobTracker=localhost:9001         # or use a remote-server url. eg: abc.xyz.yahoo.com:50300
queueName=default
examplesRoot=map-reduce

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}
inputDir=input-data
outputDir=map-reduce

...

This file defines the workflow for the particular job as a set of actions. For our example, this file looks like below:

Code Block
xml
xml

<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.2">
    <start to='wordcount'/>
    <action name='wordcount'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.WordCount.Map</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.myorg.WordCount.Reduce</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='end'/>
        <error to='end'/>
    </action>
    <kill name='kill'>
        <value>${wf:errorCode("wordcount")}</value>
    </kill/>
    <end name='end'/>
</workflow-app>

...

1. Follow the instructions on Quick Start to setup Oozie with hadoop and ensure that Oozie service is started.
2. Create a directory in your home to store all your workflow components (properties, workflow XML and the libraries). Inside, this workflow directory, create a sub-directory called lib/.

Code Block

    $ cd ~
    $ mkdir map-reduce
    $ mkdir map-reduce/lib

...

Tip: The following Oozie command line option can be used to perform XML schema validation on the workflow XML file and return errors if any:

Code Block

$ oozie validate ~/map_reduce/workflow.xml

4. Your job.properties file should will look like the following:

Code Block

$ cat ~/map_reduce/job.properties

nameNode=hdfs://localhost:9000
jobTracker=localhost:9001
queueName=default
examplesRoot=map-reduce

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}
inputDir=input-data
outputDir=map-reduce

5. Copy the wordcount.jar file into the workflow/lib directory. Now, your workflow directory should have contents as below:

Code Block

job.properties
workflow.xml
lib/
lib/wordcount.jar

...

6. Copy the workflow directory to HDFS. Please note that if a directory by this name already exists in HDFS, it might need to be deleted prior to copying.

Code Block

$ hadoop fs -put ~/map-reduce map-reduce

7. Run the following Oozie command to submit your workflow. Once the workflow is submitted, Oozie server returns the workflow ID which can be used for monitoring and debugging purposes.

Code Block

$ oozie job -oozie http://localhost:4080/oozie/ -config ~/map-reduce/job.properties -run

...
...
job: 14-20090525161321-oozie-ninj

...

8. Check status of the submitted MapReduce workflow job. The following command displays a detailed breakdown of the workflow job submission.

Code Block

$ oozie job -info 14-20090525161321-oozie-ninj -oozie http://localhost:4080/oozie/

...
...

.---------------------------------------------------------------------------------------------------------------------------------------------------------------
Workflow Name :  wordcount-wf
App Path      :  hdfs://localhost:4080/user/ninja/map-reduce
Status        :  SUCCEEDED
Run           :  0
User          :  ninja
Group         :  users
Created       :  2011-09-21 05:01 +0000
Started       :  2011-09-21 05:01 +0000
Ended         :  2011-09-21 05:01 +0000
Actions
.----------------------------------------------------------------------------------------------------------------------------------------------------------------
Action Name             Type        Status     Transition  External Id            External Status  Error Code    Start Time              End Time
.----------------------------------------------------------------------------------------------------------------------------------------------------------------
wordcount                 map-reduce  OK         end         job_200904281535_0254  SUCCEEDED        -             2011-09-21 05:01 +0000  2011-09-21 05:01 +0000
.----------------------------------------------------------------------------------------------------------------------------------------------------------------

...

in job.properties:

Code Block

         VAR=job-properties

in config-default.xml:

Code Block
xml
xml

         <property><name>VAR</name><value>config-default-xml</value></property>

in workflow.xml:

Code Block
xml
xml

         <property><name>VARIABLE</name><value>${VAR}</value></property>

...

in job.properties:

Code Block

         variable4=job-properties

in config-default.xml:

Code Block
xml
xml

         <property><name>variable4</name><value>config-default-xml</value></property>

in workflow.xml:

Code Block
xml
xml

         <job-xml>mr3-job.xml</job-xml>
         ... ...
         <property><name>variable4</name><value>grideng</value></property>

in mr3-job.xml:

Code Block
xml
xml

          <property><name>mapred.job.queue.name</name><value>${variable4}</value></property>

...

in workflow.xml:

Code Block
xml
xml

         <job-xml>mr3-job.xml</job-xml>
         ... ...
         <property><name>mapred.job.queue.name</name><value>grideng</value></property>

in mr3-job.xml:

Code Block
xml
xml

          <property><name>mapred.job.queue.name</name><value>bogus</value></property>

...

in job.properties:

Code Block

         nameNode=hdfs://abc.xyz.yahoo.com:8020
         outputDir=output-allactions

in workflow.xml:

Code Block
xml
xml

        <case to="end">${fs:exists(concat(concat(concat(concat(concat(nameNode,"/user/"),wf:user()),"/"),wf:conf("outputDir")),"/streaming/part-00000")) and (fs:fileSize(concat(concat(concat(concat(concat(nameNode,"/user/"),wf:user()),"/"),wf:conf("outputDir")),"/streaming/part-00000")) gt 0) == "true"}</case>

...

  1. change mapred.mapper.class to mapreduce.map.class
  2. change mapred.reducer.class to mapreduce.reduce.class
  3. add mapred.output.key.class
  4. add mapred.output.value.class
  5. and, include the following property into MR action configuration

    Code Block
    xml
    xml
    
          <property>
              <name>mapred.reducer.new-api</name>
              <value>true</value>
            </property>
            <property>
              <name>mapred.mapper.new-api</name>
              <value>true</value>
          </property>
    

The changes to be made in workflow.xml file are highlighted below:

Code Block
xml
xml

<map-reduce xmlns="uri:oozie:workflow:0.2">
  <job-tracker>abc.xyz.yahoo.com:50300</job-tracker>
  <name-node>hdfs://abc.xyz.yahoo.com:8020</name-node>
  <prepare>
    <delete path="hdfs://abc.xyz.yahoo.com:8020/user/ninja/yoozie_test/output-mr20-fail" />
  </prepare>
  <configuration>

    <!-- BEGIN: SNIPPET TO ADD IN ORDER TO MAKE USE OF HADOOP 20 API -->
    <property>
      <name>mapred.mapper.new-api</name>
      <value>true</value>
    </property>
    <property>
      <name>mapred.reducer.new-api</name>
      <value>true</value>
    </property>
    <!-- END: SNIPPET -->

    <property>
       <name>mapreduce.map.class</name>
       <value>org.myorg.WordCount$TokenizerMapper</value>
    </property>
    <property>
       <name>mapreduce.reduce.class</name>
       <value>org.myorg.WordCount$IntSumReducer</value>
    </property>
    <property>
       <name>mapred.output.key.class</name>
       <value>org.apache.hadoop.io.Text</value>
    </property>
    <property>
       <name>mapred.output.value.class</name>
       <value>org.apache.hadoop.io.IntWritable</value>
    </property>
    <property>
      <name>mapred.map.tasks</name>
      <value>1</value>
    </property>
    <property>
      <name>mapred.input.dir</name>
      <value>/user/ninja/yoozie_test/input-data</value>
    </property>
    <property>
      <name>mapred.output.dir</name>
      <value>/user/ninja/yoozie_test/output-mr20/mapRed20</value>
    </property>
    <property>
      <name>mapred.job.queue.name</name>
      <value>grideng</value>
    </property>
    <property>
      <name>mapreduce.job.acl-view-job</name>
      <value>*</value>
    </property>
    <property>
      <name>oozie.launcher.mapreduce.job.acl-view-job</name>
      <value>*</value>
    </property>
  </configuration>
</map-reduce>

...

The changes made in the workflow.xml are highlighted below:

Code Block
xml
xml

<workflow-app xmlns='uri:oozie:workflow:0.2' name='java-wf'>
    <start to='mr1' />

    <action name='mr1'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.SampleMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.myorg.SampleReducer</value>
                </property>

                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}/streaming-output</value>
                </property>
                <property>
                  <name>mapred.job.queue.name</name>
                  <value>${queueName}</value>
                </property>

               <property>
                  <name>mapred.child.java.opts</name>
                  <value>-Xmx1024M</value>
               </property>

            </configuration>
        </map-reduce>
        <ok to="java1" />
        <error to="fail" />
    </action>

    <action name='java1'>
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
               <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>org.myorg.MyTest</main-class>

            <!-- BEGIN: SNIPPET TO ADD TO ACCESS HADOOP COUNTERS DEFINED IN PREVIOUS ACTIONS -->
            <arg>${hadoop:counters("mr1")["COMMON"]["COMMON.ERROR_ACCESS_DH_FILES"]}</arg>
            <!-- END: SNIPPET TO ADD -->

            <capture-output/>
        </java>
        <ok to="pig1" />
        <error to="fail" />
    </action>

    <kill name="fail">
        <value>${wf:errorCode("wordcount")}</value>
    </kill>
    <end name='end' />
</workflow-app>

...

A property mapred.child.java.opts can be defined in workflow.xml as below:

Code Block
xml
xml

<workflow-app xmlns='uri:oozie:workflow:0.2' name='streaming-wf'>
    <start to='streaming1' />
    <action name='streaming1'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <streaming>
                <mapper>/bin/cat</mapper>
                <reducer>/usr/bin/wc</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}/streaming-output</value>
                </property>
                <property>
                  <name>mapred.job.queue.name</name>
                  <value>${queueName}</value>
                </property>

                <!-- BEGIN: SNIPPET TO ADD TO INCREASE MEMORY FOR THE HADOOP JOB-->
                <property>
                  <name>mapred.child.java.opts</name>
                  <value>-Xmx1024M</value>
                </property>
                                <!-- END: SNIPPET TO ADD -->

            </configuration>
        </map-reduce>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <value>${wf:errorCode("wordcount")}</value>
    </kill>
    <end name='end' />
</workflow-app>

...

In order to define and use a custom input format in the map-reduce action, the property mapred.input.format.class needs to be included in the workflow.xml as highlighted below:

Code Block
xml
xml

<workflow-app xmlns='uri:oozie:workflow:0.1' name='streaming-wf'>
    <start to='streaming1' />
    <action name='streaming1'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <streaming>
                <mapper>/bin/cat</mapper>
                <reducer>/usr/bin/wc</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}/streaming-output</value>
                </property>
                <property>
                  <name>mapred.job.queue.name</name>
                  <value>${queueName}</value>
                </property>

                <!-- BEGIN: SNIPPET TO ADD TO DEFINE A CUSTOM INPUT FORMAT -->
                <property>
                  <name>mapred.input.format.class</name>
                  <value>com.yahoo.ymail.antispam.featurelibrary.TextInputFormat</value>
                </property>
                                <!-- END: SNIPPET TO ADD -->

            </configuration>
        </map-reduce>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <value>${wf:errorCode("wordcount")}</value>
    </kill>
    <end name='end' />
</workflow-app>

NOTE: It should be noted that the jar file containing the custom input format class must be placed in the workflow's lib/ directory.

CASE-6: HOW TO SPECIFY SYMBOLIC LINKS FOR FILES AND ARCHIVES

MapReduce applications can specify symbolic names for files and archives passed through the options files and €“archives using # such as below:

Code Block

$ hadoop jar hadoop-examples.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output

...

Oozie supports these by allowing <file> and <archive> tags that can be defined in the workflow.xml as below:

Code Block
xml
xml

<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.2">
    <start to='wordcount'/>
    <action name='wordcount'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="hdfs://abc.xyz.yahoo.com:8020/user/ninja/test/output" />
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.WordCount.Map</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.myorg.WordCount.Reduce</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>

            <!-- BEGIN: SNIPPET TO ADD TO DEFINE FILE/ARCHIVE TAGS -->
            <file>testdir1/dict.txt#dict1</file>
            <archive>testtar.tgz#tgzdir</archive>
            <!-- END: SNIPPET TO ADD -->

        </map-reduce>
        <ok to='end'/>
        <error to='end'/>
    </action>
    <kill name='kill'>
       <value>${wf:errorCode("wordcount")}</value>
    </kill/>
    <end name='end'/>
</workflow-app>

...

Hadoop Streaming allows the user to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer (instead of providing the mapper and reducer as conventional java classes). The commandline way of launching such a Hadoop MapReduce streaming job is as follows:

Code Block

$ hadoop jar lib/hadoop-streaming-0.20.1.3006291003.jar -D mapred.job.queue.name=unfunded -input /user/ninja/input-data -output /user/ninja/output-dir -mapper /bin/cat -reducer /usr/bin/wc

In order to accomplish the same using Oozie, the following <streaming> element needs to be included inside the <map-reduce> action.

Code Block
xml
xml

<streaming>
        <mapper>/bin/cat</mapper>
        <reducer>/usr/bin/wc</reducer>
</streaming>

The complete workflow.xml file looks like below with the highlighted addition:

Code Block
xml
xml

<workflow-app xmlns='uri:oozie:workflow:0.2' name='streaming-wf'>
    <start to='streaming1' />
    <action name='streaming1'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>

            <!-- BEGIN: SNIPPET TO ADD FOR HADOOP STREAMING ACTION -->
            <streaming>
                <mapper>/bin/cat</mapper>
                <reducer>/usr/bin/wc</reducer>
            </streaming>
            <!-- END: SNIPPET TO ADD -->

            <configuration>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}/streaming-output</value>
                </property>
                <property>
                  <name>mapred.job.queue.name</name>
                  <value>${queueName}</value>
                </property>
               <property>
                  <name>mapred.child.java.opts</name>
                  <value>-Xmx1024M</value>
               </property>

            </configuration>
        </map-reduce>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <value>${wf:errorCode("wordcount")}</value>
    </kill>
    <end name='end' />
</workflow-app>

...