概念定义
- Action:
具体的可执行任务(比如MR、Hive、Pig 和shell命令)
- Workflow:
任务的有向无环图,DAG
- Workflow Definition:
任务流定义
- Workflow Job:
任务流实例
控制节点
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<start to="[NODE-NAME]"/>
...
</workflow-app>
demo
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<start to="firstHadoopJob"/>
...
</workflow-app>
开始控制节点
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<end name="[NODE-NAME]"/>
...
</workflow-app>
demo
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<end name="end"/>
</workflow-app>
结束控制节点
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<kill name="[NODE-NAME]">
<message>[MESSAGE-TO-LOG]</message>
</kill>
...
</workflow-app>
demo
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<kill name="killBecauseNoInput">
<message>Input unavailable</message>
</kill>
...
</workflow-app>
杀死控制节点
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<decision name="[NODE-NAME]">
<switch>
<case to="[NODE_NAME]">[PREDICATE]</case>
...
<case to="[NODE_NAME]">[PREDICATE]</case>
<default to="[NODE_NAME]"/>
</switch>
</decision>
...
</workflow-app>
demo
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<decision name="mydecision">
<switch>
<case to="reconsolidatejob">
${fs:fileSize(secondjobOutputDir) gt 10 * GB}
</case> <case to="rexpandjob">
${fs:fileSize(secondjobOutputDir) lt 100 * MB}
</case>
<case to="recomputejob">
${ hadoop:counters('secondjob')[RECORDS][REDUCE_OUT] lt 1000000 }
</case>
<default to="end"/>
</switch>
</decision>
...
</workflow-app>
选择节点
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<fork name="[FORK-NODE-NAME]">
<path start="[NODE-NAME]" />
...
<path start="[NODE-NAME]" />
</fork>
...
<join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
...
</workflow-app>
demo
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<fork name="forking">
<path start="firstparalleljob"/>
<path start="secondparalleljob"/>
</fork>
<action name="firstparallejob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<job-xml>job1.xml</job-xml>
</map-reduce>
<ok to="joining"/>
<error to="kill"/>
</action>
<action name="secondparalleljob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<job-xml>job2.xml</job-xml>
</map-reduce>
<ok to="joining"/>
<error to="kill"/>
</action>
<join name="joining" to="nextaction"/>
...
</workflow-app>
任务节点
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
...
<action name="[NODE-NAME]">
<fs>
<delete path='[PATH]'/>
...
<mkdir path='[PATH]'/>
...
<move source='[SOURCE-PATH]' target='[TARGET-PATH]'/>
...
<chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
...
<touchz path='[PATH]' />
...
<chgrp path='[PATH]' group='[GROUP]' dir-files='false' />
</fs>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
demo
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.5">
...
<action name="hdfscommands">
<fs>
<delete path='hdfs://foo:8020/usr/tucu/temp-data'/>
<mkdir path='archives/${wf:id()}'/>
<move source='${jobInput}' target='archives/${wf:id()}/processed-input'/>
<chmod path='${jobOutput}' permissions='-rwxrw-rw-' dir-files='true'><recursive/></chmod>
<chgrp path='${jobOutput}' group='testgroup' dir-files='true'><recursive/></chgrp>
</fs>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>
Fs (HDFS) action
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<ssh>
<host>[USER]@[HOST]</host>
<command>[SHELL]</command>
<args>[ARGUMENTS]</args>
...
<capture-output/>
</ssh>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
demo
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myssjob">
<ssh>
<host>foo@bar.com<host>
<command>uploaddata</command>
<args>jdbc:derby://bar.com:1527/myDB</args>
<args>hdfs://foobar.com:8020/usr/tucu/myData</args>
</ssh>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>
Ssh Action
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<sub-workflow>
<app-path>[WF-APPLICATION-PATH]</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
</sub-workflow>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
demo
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="a">
<sub-workflow>
<app-path>child-wf</app-path>
<configuration>
<property>
<name>input.dir</name>
<value>${wf:id()}/second-mr-output</value>
</property>
</configuration>
</sub-workflow>
<ok to="end"/>
<error to="kill"/>
</action>
...
</workflow-app>
Sub-workflow Action
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<java>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[JOB-XML]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<main-class>[MAIN-CLASS]</main-class>
<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
<arg>ARGUMENT</arg>
...
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
<capture-output />
</java>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
demo
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstjavajob">
<java>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>org.apache.oozie.MyFirstMainClass</main-class>
<java-opts>-Dblah</java-opts>
<arg>argument1</arg>
<arg>argument2</arg>
</java>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>
java action
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
...
<action name="[NODE-NAME]">
<map-reduce>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<streaming>
<mapper>[MAPPER-PROCESS]</mapper>
<reducer>[REDUCER-PROCESS]</reducer>
<record-reader>[RECORD-READER-CLASS]</record-reader>
<record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
...
<env>[NAME=VALUE]</env>
...
</streaming>
<!-- Either streaming or pipes can be specified for an action, not both -->
<pipes>
<map>[MAPPER]</map>
<reduce>[REDUCER]</reducer>
<inputformat>[INPUTFORMAT]</inputformat>
<partitioner>[PARTITIONER]</partitioner>
<writer>[OUTPUTFORMAT]</writer>
<program>[EXECUTABLE]</program>
</pipes>
<job-xml>[JOB-XML-FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<config-class>com.example.MyConfigClass</config-class>
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
</map-reduce> <ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
demo
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstHadoopJob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="hdfs://foo:8020/usr/tucu/output-data"/>
</prepare>
<job-xml>/myfirstjob.xml</job-xml>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>/usr/tucu/input-data</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/usr/tucu/input-data</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>${firstJobReducers}</value>
</property>
<property>
<name>oozie.action.external.stats.write</name>
<value>true</value>
</property>
</configuration>
</map-reduce>
<ok to="myNextAction"/>
<error to="errorCleanup"/>
</action>
...
</workflow-app>
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="firstjob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${output}"/>
</prepare>
<streaming>
<mapper>/bin/bash testarchive/bin/mapper.sh testfile</mapper>
<reducer>/bin/bash testarchive/bin/reducer.sh</reducer>
</streaming>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>${input}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${output}</value>
</property>
<property>
<name>stream.num.map.output.key.fields</name>
<value>3</value>
</property>
</configuration>
<file>/users/blabla/testfile.sh#testfile</file>
<archive>/users/blabla/testarchive.jar#testarchive</archive>
</map-reduce>
<ok to="end"/>
<error to="kill"/>
</action>
...
</workflow-app>
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="firstjob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${output}"/>
</prepare>
<pipes>
<program>bin/wordcount-simple#wordcount-simple</program>
</pipes>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>${input}</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>${output}</value>
</property>
</configuration>
<archive>/users/blabla/testarchive.jar#testarchive</archive>
</map-reduce>
<ok to="end"/>
<error to="kill"/>
</action>
...
</workflow-app>
mr action