Apache Oozie is a workload scheduler for Hadoop: http://incubator.apache.org/oozie/.
You may have noticed Hive has its own internal workflow system. Hive converts a query into one or more stages, such as a map reduce stage or a move task stage. If a stage fails, Hive cleans up the process and reports the errors. If a stage succeeds, Hive executes subsequent stages until the entire job is done. Also, multiple Hive statements can be placed inside an HQL file and Hive will execute each query in sequence until the file is completely processed.
Hive’s system of workflow management is excellent for single jobs or
jobs that run one after the next. Some workflows need more than this. For
example, a user may want to have a process in which step one is a custom
MapReduce job, step two uses the output of step one and processes it using
Hive, and finally step three uses distcp
to copy the output from step 2 to a remote cluster. These kinds of workflows
are candidates for management as Oozie Workflows.
Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions. Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability. An important feature of Oozie is that the state of the workflow is detached from the client who launches the job. This detached (fire and forget) job launching is useful; normally a Hive job is attached to the console that submitted it. If that console dies, the job is half complete.
Oozie has several prebuilt actions. Some are listed below with their description:
The user supplies the MapperClass, the ReducerClass, and sets
conf
variables
A shell command with arguments is run as an action
A Java class with a main method is launched with optional arguments
A Pig script is run
A Hive HQL query is run
Run a distcp
command to copy data to or
from another HDFS cluster
The built-in Hive action works well but it has some drawbacks. It uses Hive as a fat client. Most of the Hive distributions, including JARs and configuration files, need to be copied into the workflow directory. When Oozie launches an action, it will launch from a random TaskTracker node. There may be a problem reaching the metastore if you have your metastore setup to only allow access from specific hosts. Since Hive can leave artifacts like the hive-history file or some /tmp entries if a job fails, make sure to clean up across your pool of TaskTrackers.
The fat-client challenges of Hive have been solved (mostly) by
using Hive Thrift Service (see Chapter 16). The HiveServiceBAction
(Hive Service “plan B”
Action) leverages the Hive Thrift Service to launch jobs. This has the
benefits of funneling all the Hive operations to a predefined set of
nodes running Hive service:
$
cd
~
$
git
clone
git
:
//
github
.
com
/
edwardcapriolo
/
hive_test
.
git
$
cd
hive_test
$
mvn
wagon
:
download
-
single
$
mvn
exec
:
exec
$
mvn
install
$
cd
~
$
git
clone
git
:
//
github
.
com
/
edwardcapriolo
/
m6d_oozie
.
git
$
mvn
install
A workflow is created by setting up a specific directory hierarchy with required JAR files, a job.properties file and a workflow.xml file. This hierarchy has to be stored in HDFS, but it is best to assemble the folder locally and then copy it to HDFS:
$
mkdir myapp$
mkdir myapp/lib$
cp$HIVE_HOME
/lib/*.jar myapp/lib/$
cp m6d_oozie-1.0.0.jar myapp/lib/$
cp hive_test-4.0.0.jar myapp/lib/
The job.properties sets the name of the filesystem and the JobTracker. Also, additional properties can be set here to be used as Hadoop Job Configuration properties:
The job.properties file:
nameNode=hdfs://rs01.hadoop.pvt:34310 jobTracker=rjt.hadoop.pvt:34311 queueName=default oozie.libpath=/user/root/oozie/test/lib oozie.wf.application.path=${nameNode}/user/root/oozie/test/main
The workflow.xml is the file where actions are defined:
<workflow-app
xmlns=
"uri:oozie:workflow:0.2"
name=
"java-main-wf"
>
<start
to=
"create-node"
/>
<!--The create-node actual defines a table if it does not
already exist-->
<action
name=
"create-node"
>
<java>
<job-tracker>
${jobTracker}</job-tracker>
<name-node>
${nameNode}</name-node>
<configuration>
<property>
<name>
mapred.job.queue.name</name>
<value>
${queueName}</value>
</property>
</configuration>
<main-class>
com.m6d.oozie.HiveServiceBAction</main-class>
<arg>
rhiveservice.hadoop.pvt</arg>
<arg>
10000</arg>
<arg>
CREATE TABLE IF NOT EXISTS zz_zz_abc (a int, b int)</arg>
</java>
<!-- on success proceded to query_node action -->
<ok
to=
"query_node"
/>
<!-- on fail end the job unsuccessfully-->
<error
to=
"fail"
/>
</action>
<!-- populate the contents of the table with an
insert overwrite query -->
<action
name=
"query_node"
>
<java>
<job-tracker>
${jobTracker}</job-tracker>
<name-node>
${nameNode}</name-node>
<configuration>
<property>
<name>
mapred.job.queue.name</name>
<value>
${queueName}</value>
</property>
</configuration>
<main-class>
com.m6d.oozie.HiveServiceBAction</main-class>
<arg>
rhiveservice.hadoop.pvt</arg>
<arg>
10000</arg>
<arg>
INSERT OVERWRITE TABLE zz_zz_abc SELECT dma_code,site_id FROM BCO WHERE dt=20120426 AND offer=4159 LIMIT 10</arg>
</java>
<ok
to=
"end"
/>
<error
to=
"fail"
/>
</action>
<kill
name=
"fail"
>
<message>
Java failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end
name=
"end"
/>
</workflow-app>
The Oozie web console is helpful for troubleshooting jobs. Oozie launches each action inside a map task and captures all the input and output. Oozie does a good job presenting this information as well as providing links to job status pages found on the Hadoop JobTracker web console.
Here is a screenshot of the Oozie web console.
A workflow based on completely static queries is useful but
not overly practical. Most of the use cases for Oozie run a series of
processes against files for today or this week. In the previous workflow,
you may have noticed the KILL
tag and
the interpolated variable inside of it:
<kill
name=
"fail"
>
<message>
Java failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
Oozie provides an ETL to access variables. Key-value pairs defined in job.properties can be referenced this way.
Oozie also has a tag
<captureOutput/>
that can be placed inside an
action. Output captured can be emailed with an error or sent to another
process. Oozie sets a Java property in each action that can be used as a
filename to write output to. The code below shows how this property is
accessed:
private
static
final
String
OOZIE_ACTION_OUTPUT_PROPERTIES
=
"oozie.action.output.properties"
;
public
static
void
main
(
String
args
[])
throws
Exception
{
String
oozieProp
=
System
.
getProperty
(
OOZIE_ACTION_OUTPUT_PROPERTIES
);
}
Your application can output data to that location.
We have discussed both capturing output and Oozie variables; using them together provides what you need for daily workflows.
Looking at our previous example, we see that we are selecting data
from a hardcoded day FROM BCO WHERE
dt=20120426
. We would like to run this workflow every day so we
need to substitute the hardcoded dt=20120426
with a
date:
<action
name=
"create_table"
>
<java>
<job-tracker>
${jobTracker}</job-tracker>
<name-node>
${nameNode}</name-node>
<configuration>
<property>
<name>
mapred.job.queue.name</name>
<value>
${queueName}</value>
</property>
</configuration>
<main-class>
test.RunShellProp</main-class>
<arg>
/bin/date</arg>
<arg>
+x=%Y%m%d</arg>
<capture-output
/>
</java>
<ok
to=
"run_query"
/>
<error
to=
"fail"
/>
</action>
This will produce output like:
$
date +x=
%Y%m%dx
=
20120522
You can then access this output later in the process:
<arg>
You said ${wf:actionData('create_table')['x']}</arg>
There are many more things you can do with Oozie, including integrating Hive jobs with jobs implemented with other tools, such as Pig, Java MapReduce, etc. See the Oozie website for more details.