在最近的工作中,涉及到很多的数据流程,需要去维护和监控,耗费了不少的人力,但是效果却不理想。数据接入作为工作中基础的一环,数据接入的可靠性和稳定性,对后期业务的影响很大。为了加强对数据任务的管理,提升数据质量,同时减少开发同学在工作中,流程维护上的投入成本,我们准备引入流程管理平台。基于现实情况,参考业界和公司内部的产品情况,经过一段时间的调研,在众多的产品中我们选择了Airflow。
参考官方文档的部署方案,Airflow平台迅速的完成了搭建,搭建过程还算比较顺利。调整airflow.cfg的中时区配置,修改如下:
调整以后,虽然数据库中所有任务的执行时间、运行时间和执行时间,都为本地运行时间,但是run_id和WEBUI上的时间展示问题,仍然都为标准的UTC时间。
在查阅啦相关文档以后,发现平台是为了更好的去适应冬令时和夏令时的问题。BUT,在我们国家不存在这个问题,为了更方便、直观的去查阅任务的运行时间和调度时间,避免每次人工大脑加八小时的痛苦,我们决定从根本上,去解决这个问题。
首先,在WEBUI的Taskinstance页面,进入到前端调试模式,获取后端是直接将数据和页面作为响应,直接发回来的;
进入到www/view.py下,找到相关路由方法,查看在Model下的TaskInstance类中,所有时间相关的字段定义都为UtcDateTime;如下所示:
找到了转换时间的类,在源码中可以看到UtcDateTime类中,时区的默认参数为标准UTC时间;根据setting.py中的代码,我们对此处进行修改,让其从配置文件中获取,代码修改如下:
做了这个修改以后,运行以后发现UI上的数据同数据库内的数据,仍然不一致;查看了相关代码后,发现针对数据库的连接,airflow内一直有心跳,去监控连接的状态,这一块设定啦数据的timezone为标准的UTC时间,我们修改如下:
调整以后,重启airflow, 任务的启动时间、结束时间都正常,数据库内的也正常;BUT,当人工触发任务的时候,任务的RUN_ID仍为标准UTC时间,并且在触发过程中,会抛出页面异常,如下:
跟着界面暴露的trigger接口定位进去,根据打印的日志,定位问题是发生在当程序往mysql dag_run表插入数据后,在刷新状态时,出错导致的;
有两种可能,会导致抛出异常;一是session异常为空,二是在数据库内,查询不到刚刚插入的数据记录;通过日志,排掉session为空的情况;确定是第二种情况;通过代码梳理,看到self.execution_date取的是timezone.utcnow(),所以在这里需要转换成配置文件内的时区;
再重新启动后,发现webUI上的时间OK啦,都为设置的本地时间;
数据库内的时间也为本地时间,问题解决啦~
后期,针对Airflow内,DAG管理和任务的回填机制,准备做一些优化,方便同学的使用~