β

Oozie工作流程定义详解

简单之美 1652 阅读

Oozie工作流程定义是一个DAG(Directed Acyclical Graphs)图,它由控制流节点(Control Flow Nodes)或动作节点(Action Nodes)组成,各个节点又是通过表征转移的箭线(transitions
arrows)互相连通。对于工作流一般对应存在流程定义语言,例如jBPM是jPDL,大多数都是基于XML定义的,Oozie流程定义语言也是基于XML定义的,称为hPDL(Hadoop Process Definition Language)。
下面,我们详细说明工作流定义相关的内容:

工作流生命周期

在Oozie中,工作流的状态可能存在如下几种:

状态 含义说明
PREP 一个工作流Job第一次创建将处于PREP状态,表示工作流Job已经定义,但是没有运行。
RUNNING 当一个已经被创建的工作流Job开始执行的时候,就处于RUNNING状态。它不会达到结束状态,只能因为出错而结束,或者被挂起。
SUSPENDED 一个RUNNING状态的工作流Job会变成SUSPENDED状态,而且它会一直处于该状态,除非这个工作流Job被重新开始执行或者被杀死。
SUCCEEDED 当一个RUNNING状态的工作流Job到达了end节点,它就变成了SUCCEEDED最终完成状态。
KILLED 当一个工作流Job处于被创建后的状态,或者处于RUNNING、SUSPENDED状态时,被杀死,则工作流Job的状态变为KILLED状态。
FAILED 当一个工作流Job不可预期的错误失败而终止,就会变成FAILED状态。

上述各种状态存在相应的转移(工作流程因为某些事件,可能从一个状态跳转到另一个状态),其中合法的状态转移有如下几种,如下表所示:

转移前状态 转移后状态集合
未启动 PREP
PREP RUNNING、KILLED
RUNNING SUSPENDED、SUCCEEDED、KILLED、FAILED
SUSPENDED RUNNING、KILLED

明确上述给出的状态转移空间以后,可以根据实际需要更加灵活地来控制工作流Job的运行。

控制流节点(Control Flow Nodes)

工作流程定义中,控制工作流的开始和结束,以及工作流Job的执行路径的节点,它定义了流程的开始(start节点)和结束(end节点或kill节点),同时提供了一种控制流程执行路径的机制(decision决策节点、fork分支节点、join会签节点)。通过上面提到的各种节点,我们大概应该能够知道它们在工作流中起着怎样的作用。下面,我们看一下不同节点的语法格式:

动作节点(Action Nodes)

工作流程定义中,能够触发一个计算任务(Computation Task)或者处理任务(Processing Task)执行的节点。所有的动作(
Action)都有一些基本的特性,我先首先来看一下:

下面详细介绍Oozie内置支持的动作节点类型,如下所示:

map-reduce动作会在工作流Job中启动一个MapReduce Job任务运行,我们可以详细配置这个MapReduce Job。另外,可以通过map-reduce元素的子元素来配置一些其他的任务,如streaming、pipes、file、archive等等。
下面给出包含这些内容的语法格式说明:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<map-reduce>
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<streaming>
				<mapper>[MAPPER-PROCESS]</mapper>
				<reducer>[REDUCER-PROCESS]</reducer>
				<record-reader>[RECORD-READER-CLASS]</record-reader>
				<record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
				...
				<env>[NAME=VALUE]</env>
				...
			</streaming>
			<!-- Either streaming or pipes can be specified for an action, not both -->
			<pipes>
				<map>[MAPPER]</map>
				<reduce>
					[REDUCER]
				</reducer>
					<inputformat>[INPUTFORMAT]</inputformat>
					<partitioner>[PARTITIONER]</partitioner>
					<writer>[OUTPUTFORMAT]</writer>
					<program>[EXECUTABLE]</program>
			</pipes>
			<job-xml>[JOB-XML-FILE]</job-xml>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<file>[FILE-PATH]</file>
			...
			<archive>[FILE-PATH]</archive>
			...
		</map-reduce>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

Hive主要是基于类似SQL的HQL语言的,它能够方便地操作HDFS中数据,实现对海量数据的分析工作。HIve动作的语法格式如下所示:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
	...
	<action name="[NODE-NAME]">
		<hive xmlns="uri:oozie:hive-action:0.2">
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<script>[HIVE-SCRIPT]</script>
			<param>[PARAM-VALUE]</param>
			...
		</hive>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

Sqoop是一个能够在Hadoop和结构化存储系统之间进行数据的导入导出的工具,Sqoop动作的语法格式如下:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
	...
	<action name="[NODE-NAME]">
		<sqoop xmlns="uri:oozie:sqoop-action:0.2">
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<command>[SQOOP-COMMAND]</command>
			<file>[FILE-PATH]</file>
			...
		</sqoop>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

pig动作可以启动运行pig脚本实现的Job,在工作流定义中配置的语法格式说明如下:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
	...
	<action name="[NODE-NAME]">
		<pig>
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<job-xml>[JOB-XML-FILE]</job-xml>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<script>[PIG-SCRIPT]</script>
			<param>[PARAM-VALUE]</param>
			...
			<param>[PARAM-VALUE]</param>
			<argument>[ARGUMENT-VALUE]</argument>
			...
			<argument>[ARGUMENT-VALUE]</argument>
			<file>[FILE-PATH]</file>
			...
			<archive>[FILE-PATH]</archive>
			...
		</pig>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

Fs动作主要是基于HDFS的一些基本操作,如删除路径、创建路径、移动文件、设置文件全乡等等。
语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<fs>
			<delete path='[PATH]' />
			...
			<mkdir path='[PATH]' />
			...
			<move source='[SOURCE-PATH]' target='[TARGET-PATH]' />
			...
			<chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
			...
			<touchz path='[PATH]' />
		</fs>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

该动作主要是通过ssh登录到一台主机,能够执行一组shell命令,它在Oozie schema 0.2中已经被删除。
语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<ssh>
			<host>[USER]@[HOST]</host>
			<command>[SHELL]</command>
			<args>[ARGUMENTS]</args>
			...
			<capture-output />
		</ssh>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

Java动作,是执行一个具有main入口方法的应用程序,在Oozie工作流定义中,会作为一个MapReduce Job执行,这个Job只有一个Map任务。我们需要指定NameNode、JobTracker的信息,还有配置一个Java应用程序的JVM选项参数(java-opts),以及传给主函数(arg)。
语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<java>
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<job-xml>[JOB-XML]</job-xml>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<main-class>[MAIN-CLASS]</main-class>
			<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
			<arg>ARGUMENT</arg>
			...
			<file>[FILE-PATH]</file>
			...
			<archive>[FILE-PATH]</archive>
			...
			<capture-output />
		</java>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

Sub-workflow动作是一个子流程的动作,主流程执行过程中,遇到子流程节点执行时,会一直等待子流程节点执行完成后,才能继续跳转到下一个要执行的节点。
语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
	...
	<action name="[NODE-NAME]">
		<sub-workflow>
			<app-path>[WF-APPLICATION-PATH]</app-path>
			<propagate-configuration />
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
		</sub-workflow>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

Shell动作可以执行Shell命令,并通过配置命令所需要的参数。它的语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.4">
	...
	<action name="[NODE-NAME]">
		<shell xmlns="uri:oozie:shell-action:0.2">
			<job-tracker>[JOB-TRACKER]</job-tracker>
			<name-node>[NAME-NODE]</name-node>
			<prepare>
				<delete path="[PATH]" />
				...
				<mkdir path="[PATH]" />
				...
			</prepare>
			<configuration>
				<property>
					<name>[PROPERTY-NAME]</name>
					<value>[PROPERTY-VALUE]</value>
				</property>
				...
			</configuration>
			<exec>[SHELL-COMMAND]</exec>
			<argument>[ARGUMENT-VALUE]</argument>
			<capture-output />
		</shell>
		<ok to="[NODE-NAME]" />
		<error to="[NODE-NAME]" />
	</action>
	...
</workflow-app>

表达式语言函数(Expression Language Functions)

Oozie除了可以使用Properties文件定义一些属性之外,还提供了一些内置的EL函数,能够方便地实现流程的定义和控制,下面我们分组列表说明:

常量名称 含义说明
KB 1KB,类型为long。
MB 1MB,类型为long。
GB 1GB,类型为long。
TB 1TB,类型为long。
PB 1PB,类型为long。
函数声明 含义说明
String firstNotNull(String value1, String value2) 返回value1和value2中不为null的值,若都为null则返回null
String concat(String s1, String s2) 连接字符串s1和s2,如果s1或s2为null值,则使用空字符串替换null值
String replaceAll(String src, String regex, String replacement) 满足正则表达式regex,则使用replace替换src字符串中匹配上的部分
String appendAll(String src, String append, String delimeter) 将src中的分隔符delimeter替换为append
String trim(String s) 去掉字符串两边的空格,如果s为null则返回空字符串
String urlEncode(String s) 对字符串s使用URL UTF-8进行编码
String timestamp() 返回UTC当前时间字符串,格式为YYYY-MM-DDThh:mm:ss.sZ
String toJsonStr(Map) Oozie 3.3支持,将Map转转成一个XML编码的JSON表示形式
String toPropertiesStr(Map) Oozie 3.3支持,将Map转转成一个XML编码的Properties表示形式
String toConfigurationStr(Map) Oozie 3.3支持,将Map转转成一个XML编码的Configuration表示形式
函数声明 含义说明
String wf:id() 返回当前的工作流Job的ID
String wf:name() 返回当前的工作流Job的名称
String wf:appPath() 返回当前的工作流Job的应用路径
String wf:conf(String name) 返回当前的工作流Job的配置属性
String wf:user() 返回启动当前的工作流Job的用户名称
String wf:group() 返回当前的工作流Job的的用户组名称
String wf:callback(String stateVar) 返回当前的工作流Job的当前动作节点的回调URL
String wf:transition(String node) 返回转移节点,该节点是一个工作流动作节点触发的
String wf:lastErrorNode() 返回最后一个以ERROR状态退出的节点名称
String wf:errorCode(String node) 返回指定动作节点执行的错误码,如果没有则返回空
String wf:errorMessage(String message) 返回指定动作节点执行的错误信息,如果没有则返回空
int wf:run() 返回当前工作流Job的运行编号,正常的话返回0,如果执行过re-run则返回非0
Map wf:actionData(String node) 返回当前动作节点完成时输出的信息
int wf:actionExternalId(String node) 返回动作节点的外部ID
int wf:actionTrackerUri(String node) 返回跟踪一个动作节点的URI
int wf:actionExternalStatus(String node) 返回一个动作节点的状态
常量名称 含义说明
RECORDS Hadoop Record计数器组名称
MAP_IN Hadoop Mapper输入Record计数器名称
MAP_OUT Hadoop Mapper输出Record计数器名称
REDUCE_IN Hadoop Reducer输入Record计数器名称
REDUCE_OUT HadoopReducer输出Record计数器名称
GROUPS 1024 * Hadoop Mapper/Reducer输入Record组计数器名称
函数声明 含义说明
Map < String, Map > hadoop:counters(String node) 返回工作流Job某个动作节点的统计计数器信息,例如,MR的动作统计集合内容:
{
“ACTION_TYPE”: “MAP_REDUCE”,
“org.apache.hadoop.mapred.JobInProgress$Counter”: {
“TOTAL_LAUNCHED_REDUCES”: 1,
“TOTAL_LAUNCHED_MAPS”: 1,
“DATA_LOCAL_MAPS”: 1
},
“FileSystemCounters”: {
“FILE_BYTES_READ”: 1746,
“HDFS_BYTES_READ”: 1409,
“FILE_BYTES_WRITTEN”: 3524,
“HDFS_BYTES_WRITTEN”: 1547
},
“org.apache.hadoop.mapred.Task$Counter”: {
“REDUCE_INPUT_GROUPS”: 33,
“COMBINE_OUTPUT_RECORDS”: 0,
“MAP_INPUT_RECORDS”: 33,
“REDUCE_SHUFFLE_BYTES”: 0,
“REDUCE_OUTPUT_RECORDS”: 33,
“SPILLED_RECORDS”: 66,
“MAP_OUTPUT_BYTES”: 1674,
“MAP_INPUT_BYTES”: 1409,
“MAP_OUTPUT_RECORDS”: 33,
“COMBINE_INPUT_RECORDS”: 0,
“REDUCE_INPUT_RECORDS”: 33
}
}
则${hadoop:counters(“mr-node”)["FileSystemCounters"]["FILE_BYTES_READ"]},得到名称为mr-node的动作节点组的FILE_BYTES_READ计数器的值
选项 含义说明
boolean fs:exists(String path) path是否存在
boolean fs:isDir(String path) path是否是目录
long fs:dirSize(String path) 如果path不是目录或者path是一个文件,则返回-1,否则返回该path下所有文件的字节数
long fs:fileSize(String path) 如果path是目录,则返回-1,否则返回该path下所有文件的字节数
long fs:blockSize(String path) 如果path不是文件或者不存在则返回-1,否则返回文件的块大小字节数

参考链接

作者:简单之美
简单之美,难得简单,享受简单的唯美。
原文地址:Oozie工作流程定义详解, 感谢原作者分享。