Chapter 20. Hive Integration with Oozie

Apache Oozie is a workload scheduler for Hadoop: http://incubator.apache.org/oozie/.

You may have noticed Hive has its own internal workflow system. Hive converts a query into one or more stages, such as a map reduce stage or a move task stage. If a stage fails, Hive cleans up the process and reports the errors. If a stage succeeds, Hive executes subsequent stages until the entire job is done. Also, multiple Hive statements can be placed inside an HQL file and Hive will execute each query in sequence until the file is completely processed.

Hive’s system of workflow management is excellent for single jobs or jobs that run one after the next. Some workflows need more than this. For example, a user may want to have a process in which step one is a custom MapReduce job, step two uses the output of step one and processes it using Hive, and finally step three uses distcp to copy the output from step 2 to a remote cluster. These kinds of workflows are candidates for management as Oozie Workflows.

Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions. Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability. An important feature of Oozie is that the state of the workflow is detached from the client who launches the job. This detached (fire and forget) job launching is useful; normally a Hive job is attached to the console that submitted it. If that console dies, the job is half complete.

Oozie Actions

Oozie has several prebuilt actions. Some are listed below with their description:

MapReduce

The user supplies the MapperClass, the ReducerClass, and sets conf variables

Shell

A shell command with arguments is run as an action

Java action

A Java class with a main method is launched with optional arguments

Pig

A Pig script is run

Hive

A Hive HQL query is run

DistCp

Run a distcp command to copy data to or from another HDFS cluster

Hive Thrift Service Action

The built-in Hive action works well but it has some drawbacks. It uses Hive as a fat client. Most of the Hive distributions, including JARs and configuration files, need to be copied into the workflow directory. When Oozie launches an action, it will launch from a random TaskTracker node. There may be a problem reaching the metastore if you have your metastore setup to only allow access from specific hosts. Since Hive can leave artifacts like the hive-history file or some /tmp entries if a job fails, make sure to clean up across your pool of TaskTrackers.

The fat-client challenges of Hive have been solved (mostly) by using Hive Thrift Service (see Chapter 16). The HiveServiceBAction (Hive Service “plan B” Action) leverages the Hive Thrift Service to launch jobs. This has the benefits of funneling all the Hive operations to a predefined set of nodes running Hive service:

$ cd ~
$ git clone git://github.com/edwardcapriolo/hive_test.git
$ cd hive_test
$ mvn wagon:download-single
$ mvn exec:exec
$ mvn install

$ cd ~
$ git clone git://github.com/edwardcapriolo/m6d_oozie.git
$ mvn install

A Two-Query Workflow

A workflow is created by setting up a specific directory hierarchy with required JAR files, a job.properties file and a workflow.xml file. This hierarchy has to be stored in HDFS, but it is best to assemble the folder locally and then copy it to HDFS:

$ mkdir myapp
$ mkdir myapp/lib
$ cp $HIVE_HOME/lib/*.jar myapp/lib/
$ cp m6d_oozie-1.0.0.jar myapp/lib/
$ cp hive_test-4.0.0.jar myapp/lib/

The job.properties sets the name of the filesystem and the JobTracker. Also, additional properties can be set here to be used as Hadoop Job Configuration properties:

The job.properties file:

nameNode=hdfs://rs01.hadoop.pvt:34310
jobTracker=rjt.hadoop.pvt:34311
queueName=default
oozie.libpath=/user/root/oozie/test/lib
oozie.wf.application.path=${nameNode}/user/root/oozie/test/main

The workflow.xml is the file where actions are defined:

<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-wf">
    <start to="create-node"/>
    <!--The create-node actual defines a table if it does not
    already exist-->
    <action name="create-node">
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>com.m6d.oozie.HiveServiceBAction</main-class>
            <arg>rhiveservice.hadoop.pvt</arg>
            <arg>10000</arg>
            <arg>CREATE TABLE IF NOT EXISTS zz_zz_abc (a int, b int)</arg>
        </java>
        <!-- on success proceded to query_node action -->
        <ok to="query_node"/>
        <!-- on fail end the job unsuccessfully-->
        <error to="fail"/>
    </action>

    <!-- populate the contents of the table with an
    insert overwrite query -->
    <action name="query_node">
       <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>com.m6d.oozie.HiveServiceBAction</main-class>
            <arg>rhiveservice.hadoop.pvt</arg>
            <arg>10000</arg>
            <arg>INSERT OVERWRITE TABLE zz_zz_abc SELECT dma_code,site_id
        FROM BCO WHERE dt=20120426 AND offer=4159 LIMIT 10</arg>
        </java>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Java failed, error message
        [${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

Oozie Web Console

The Oozie web console is helpful for troubleshooting jobs. Oozie launches each action inside a map task and captures all the input and output. Oozie does a good job presenting this information as well as providing links to job status pages found on the Hadoop JobTracker web console.

Here is a screenshot of the Oozie web console.

image with no caption

Variables in Workflows

A workflow based on completely static queries is useful but not overly practical. Most of the use cases for Oozie run a series of processes against files for today or this week. In the previous workflow, you may have noticed the KILL tag and the interpolated variable inside of it:

<kill name="fail">
  <message>Java failed, error message
    [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>

Oozie provides an ETL to access variables. Key-value pairs defined in job.properties can be referenced this way.

Capturing Output

Oozie also has a tag <captureOutput/> that can be placed inside an action. Output captured can be emailed with an error or sent to another process. Oozie sets a Java property in each action that can be used as a filename to write output to. The code below shows how this property is accessed:

private static final String
OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";

public static void main(String args[]) throws Exception {
  String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
}

Your application can output data to that location.

Capturing Output to Variables

We have discussed both capturing output and Oozie variables; using them together provides what you need for daily workflows.

Looking at our previous example, we see that we are selecting data from a hardcoded day FROM BCO WHERE dt=20120426. We would like to run this workflow every day so we need to substitute the hardcoded dt=20120426 with a date:

<action name="create_table">
    <java>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <main-class>test.RunShellProp</main-class>
        <arg>/bin/date</arg>
        <arg>+x=%Y%m%d</arg>
        <capture-output />
    </java>
    <ok to="run_query"/>
    <error to="fail"/>
</action>

This will produce output like:

$ date +x=%Y%m%d
x=20120522

You can then access this output later in the process:

<arg>You said ${wf:actionData('create_table')['x']}</arg>

There are many more things you can do with Oozie, including integrating Hive jobs with jobs implemented with other tools, such as Pig, Java MapReduce, etc. See the Oozie website for more details.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset