前言
公司的DataX已经用了2年多了,性能以及基本功能上没有太大问题。但是有一个问题一直困扰着我,就是DataX的错误告警。DataX的日志问题,一直令人头疼。随着job的逐渐增多,一个调度程序或者脚本打印出来的日志实在太多,假如中途有哪个job执行报错,根本无法排查,虽然DataX自身会记录每个job执行的日志,在log目录下,命名规则:脚本名称+执行时间,但是文件数目过多排查起来还是挺困难的。
设计思路
有了需求,就可以开始设计,给DataX添加告警功能有两种办法,修改源码和捕获日志,当时为了不侵入源码,采用了捕获日志的方式。
代码改动
修改DataX启动类,即bin/datax.py,将datax.py打印出来的日志放入pipe(管道)缓存中,根据最后执行返回的code判断任务成功还是失败,失败则需要调用告警功能。具体修改后的代码如下:
if __name__ == "__main__":
printCopyright()
parser = getOptionParser()
options, args = parser.parse_args(sys.argv[1:])
if options.reader is not None and options.writer is not None:
generateJobConfigTemplate(options.reader,options.writer)
sys.exit(RET_STATE['OK'])
if len(args) != 1:
parser.print_help()
sys.exit(RET_STATE['FAIL'])
startCommand = buildStartCommand(options, args)
# print startCommand
child_process = subprocess.Popen(startCommand, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
register_signal()
start=time.time()
errmsg=[]
while child_process.poll() == None:
line=child_process.stdout.readline()
print(line), # 使用,是为了使打印出来的日志不换行
errmsg.append(line)
if time.time() - start >= 1800: # 超时时间30分钟
# 可以添加一些超时的信息
break
if child_process.returncode != 0:
# 执行具体的告警操作,可以从启动参数中获取到具体的脚本名称
# 解析errmsg或者直接输出
sys.exit(child_process.returncode)
总结
- 上述代码只是在datax.py里面新增一些退出前的操作,并不影响主要逻辑,可以放心修改。
- 这里面有个比较坑的是,即使异常信息,但是在stderr(标准错误)中还是无法捕获。因此,所有的信息都只能从stdout(标准输出)中读取,然后解析或者直接输出到告警模块。
- 我个人在项目中只是简单的加了个邮件告警,内容为该次job任务的所有日志信息,方便从日志中分析是什么原因造成的错误。
- 另外,使用DataX采集阿里云ADB(AnalyticDB 3.0)有极小的概率会出现死循环的现象(就是一直在读取写入重复的数据),使用mysql或者RDS不会有类似情况发生,可能是我的DataX版本过低,毕竟2年前还没有ADB 3.0呢,可能最新版的已经修复了这个问题。