原生的 Azkaban 提供了强大的任务调度功能,但是面对复杂的业务要求,原生的Azkaban还是不能完全满足我们的需求。在对比Oozie、Airflow等流行的任务调度框架后,我们决定对Azkaban进行二次开发,以满足我们各种各样的功能要求。
一、日期自定义替换
在实际的业务场景中,很多数据都是按天甚至按小时来分区的,比如
/data/logstat/2017/05/10/input/xxx.log
在进行MR计算的时候,就需要传递日期作为路径,DE版改造后的功能支持在参数里写如下的配置
input.path= /data/logstat/${DD:(YYYY)-(MM)-(DD-1)}/input/*
例如当前的日期是 2017-05-10,则 Azkaban 在解析的时候就会把路径解析为:
input.path= /data/logstat/2017/05/09/input/*
二、hadoopMR任务提交方式优化
MR任务包含Map、Reduce和一个main类,负责把map和reduce配置成一个完整的任务,包括输入输出格式,输入输出路径等。但是每次都需要编写main类有些繁琐,DE版改造的 Azkaban 支持把main里的代码以配置的方式写到参数里,示例如下:
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,${azkaban.home}/lib/*,${azkaban.home}/extlib/*
tmpjars=file://${azkaban.home}/extlib/ad-tracker-mr2-1.0.0-SNAPSHOT-jar-with-dependencies.jar
input.path=/user/tracking/event_logserver/${DD:(YYYY)/(MM)/(DD-1)}/*/input/self-event*
output.path=/user/tracking/event_logserver/${DD:(YYYY)/(MM)/(DD)}/00/UserCustomEvent
calculate.date=${DD:(YYYY)-(MM)-(DD)}
job.class=com.dataeye.tracker.mr.mapred.customEvent.CustomEventMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.customEvent.CustomEventMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.customEvent.CustomEventReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=org.apache.hadoop.io.Text
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
通过以上配置,用户只需要编写 CustomEventMapper 和 CustomEventReducer两个类即可提交到Azkaban中运行。开启这个功能的开关的参数是
job.extend=true
需要特别关注 tmpjars 这个参数,用户打包后的jar文件,一般也会包含所依赖的jar包,这样就会导致上传到azkaban中的文件比较大,最后导致上传时间过程或者失败。所以一般会打包成两个jar文件,一个是用户的java源码文件,另一个是所有依赖的jar文件的汇总文件,把汇总文件上传到Azkaban机器的指定目录后,使用tmpjars 参数指向该文件即可(注意要带上 "file://")。
三、hive支持参数传递
在hive任务中,经常存在把数据导入到指定表中,而且是按分区的。很多业务场景下,都是按天分区,这样,动态指定hive的导入分区就变得非常重要。DE版改造的Azkaban支持在hive中传递日期参数。
1、在hive.job文件中编写以下参数:
yesterday=${DD:(YYYY)-(MM)-(DD-1)}
2、在hive.sql文件中就可以这样使用 yesterday 参数
INSERT OVERWRITE TABLE
adt_user_active_match PARTITION (day_p='${yesterday}')
SELECT appid,country,province,city,uid
FROM adt_logstat_active where adType=1;
四、任务支持冻结与激活
原生的Azkaban在提交定时任务后,就会立即开始执行,但是很多时候,我们只需要先把任务的定时时间准备好而已,还没计划执行;或者某个定时任务不想再继续了,但是又不想删掉,只是为了以后方面查看。这是就需要把定时任务进行冻结或者激活。
任务刚提交的时候,状态是冻结的,需要点击Active按钮进行激活,Azkaban才能进行调度
激活后的任务可以通过点击DeActive进行冻结
五、支持实例参数修改
原生的Azkaban只支持修改模版参数,但是在实际开发中,任务执行失败后,需要不断修改参数进行尝试。这样修改实例参数就变得非常重要。
DE 版改造的Azkaban在失败的任务点击右键,可以看到Open Job to Edit功能,这里就可以进入修改该失败节点的参数,再次运行的时候使用的就是新的参数值了。
六、支持单个节点重复执行
如上第五点所述的,如果任务的实例参数修改完成后,需要再次执行该任务,这时只需要右键点击该任务,点击 "Execute Job"按钮即可。
七、支持失败任务重复执行
如果整个flow失败了,但是其中有些job执行成功,有些job执行失败,这时想重新把失败的job节点运行,就用到了重复执行任务的功能,如下:
点击retry按钮后,系统会生成新的execid,但是在新的任务中,azkaban会跳过已经执行成功的节点,只会执行上一次执行失败了的节点,如下: