Airflow 是 Airbnb 公司开源的任务调度系统, 通过使用 Python 开发 DAG, 非常方便的调度计算任务. 介绍一下在 Airflow 提供的 Operator 不满足需求的场景下, 如何自己开发 Operator.
0x00 DAG 的最基本执行单元: Operator
在 Airflow 的一个 DAG 中, 最基本的执行单元是 Operator
. 例如如下示例 DAG 中, 使用的都是 BashOperator
, 执行一个 bash 脚本.
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
Airflow 实现了很多 Operator(参见 Airflow 源代码), 涵盖了常用的功能, 例如执行 Hive 查询, 执行 bash 脚本等. 有几种特殊的 Operator:
-
XXXSensor
用作其他外界条件的 sensor, 实现也很简单, 在 Operator 的execute
方法中进行 long poll, 直到poke
方法返回 True 则完成执行.
# BaseSensorOperator 部分源码
def poke(self, context):
'''
Function that the sensors defined while deriving this class should
override.
'''
raise AirflowException('Override me.')
def execute(self, context):
started_at = datetime.now()
while not self.poke(context):
sleep(self.poke_interval)
if (datetime.now() - started_at).seconds > self.timeout:
raise AirflowSensorTimeout('Snap. Time is OUT.')
logging.info("Success criteria met. Exiting.")
-
PythonOperator
用来执行 Python 函数, 这也是使用 Python 代码来定义 DAG 的好处 -
BranchPythonOperator
用来支持分支, 通过函数返回要执行的分支
Airflow Operator 相关 class 继承关系如下:
.
└── BaseOperator
├── BaseSensorOperator
│ └── ...Sensor
├── PythonOperator
│ ├── BranchPythonOperator
│ └── ShortCircuitOperator
└── ...Operator
0x01 Operator 开发
如果官方的 Operator 都不满足需求, 那么我们就要来开发一个 Operator. 开发 Operator 也很简单, 直接继承 BaseOperator
并实现 execute
方法即可.
from airflow.models import BaseOperator
class DemoOperator(BaseOperator):
def __init__(*args, **kwargs):
super(DemoOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print "hello"
除了 execute
方法必须实现外, 还有一个 hook 方法:
-
pre_execute
: 在execute
方法前调用, 实现点儿准备逻辑 -
post_execute
: 在execute
方法完成后调用, cleanup 一下 -
on_kill
: 在 task 被 kill 的时候执行.
Operator 获取模板变量
Aiflow 是支持 Templating with Jinja 功能的, 具体来说就是 Operator 中支持模板语言 Jinja, 写一些 for 循环, 或者通过 {{param}}
语法替换一些变量等(例如 {{ds}}
被替换成执行任务的日期)
# 官方示例的 jinja 语句
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{% endfor %}
那么, 自己开发的 Operator 中如何使用这个功能呢?
其实也很简单, 在自己的 Operator
中添加属性 template_fields = (attributes_to_be_rendered_with_jinja)
. 在任务被执行前, Airflow 会自动渲染 template_fields
中的属性再执行任务.
# 节选自 Airflow 中 BashOperator 源码
class BashOperator(BaseOperator):
# 这里定义需要被渲染的属性名称
template_fields = ('bash_command', 'env')
Operator 部署
开发的 Operator
代码作为一个 Python 的 Package, 使用 distutil 打包安装到 Airflow 对应的服务器上即可.
0x02 Operator 跟其他系统交互
Airflow 考虑到为了跟外界环境隔离, 提出了 Connection
概念: 将配置信息放到 Airflow Server 本身配置, 在 DAG 中使用 connection_id
来引用. 例如, 我们配置一个 HiveServer 的 Connection
, 使用 liulishuo_hiveserver1
作为 connection_id
, 这样同一个 DAG 文件就可以在测试环境和生成环境调用对应环境中的 HiveServer 服务了. 总结来说, 这就是架构设计模式中的 External Configuration Store Pattern 的标准实现.
那么如果自己开发的 Operator
如何调用这些 Connection
呢? 这里 Airflow 又引入了一个 Hook 的概念:
Hooks are interfaces to external platforms and databases like Hive, S3, MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when possible, and act as a building block for operators. They also use the airflow.models.Connection model to retrieve hostnames and authentication information. Hooks keep authentication code and information out of pipelines, centralized in the metadata database.
我个人觉得这个概念的引入有点儿臃肿了, 没有任何意义. 开发者在 Airflow Web 中看到的也是 Connection
的配置, 那么开发 DAG 时第一个想到的就是去找 Connection
相关的 class, 再多一个 Hook
的概念有点儿绕.
那么 Operator
中想使用对应的 Connection
, 直接根据 connection_id
创建对应的 Hook
就好了(讲真, 真绕), 例如, 想使用 HiveServer2
的 Connection
, 创建一个 HiveServer2Hook
即可.
# Operator 调用 Connection 示例代码
class LiulishuoDemoOperator(BaseOperator):
def __init__(self, hive_server2_connection_id, *args, **kwargs):
super(LiulishuoDemoOperator, self).__init__(*args, **kwargs)
self.hive_server2_connection_id = hive_server2_connection_id
def execute(self, context):
hive_server2 = HiveServer2Hook(self.hive_server2_connection_id)
hive_serve2.get_records('SELECT * FROM testdb.table1 LIMIT 20')
# ....
HiveServer2Hook
设计有还有一个贴心之处就是, 在创建 HiveServer2Hook
时根本不涉及真正连接 HiveServer2 的逻辑, 只有真正调用其get_records
等方法时才会真正去连接 HiveServer2, 这样就给单元测试 mock 带来很大的方便, 毕竟在 CI 环境中构建一个隔离的专门用于跑自己的 test-case 的 HiveServer2 也不是那么容易的.
def test_operator_with_mock(self):
with mock.patch.object(HiveServer2Hook, 'get_records') as mock_get_records:
# 这里设置 mock 的返回值
mock_get_records.return_value = [['Location: ', 's3://test-bucket/valid_table']]
hive_server_id = 'test-hive-server'
# 这里测试对应的 Operator 代码
0x03 总结
Airflow 通过精简的抽象, 将 DAG 开发简化到了会写 Python 基本就没问题的程度, 还是值得点赞的. 自己开发一个 Operator
也是很简单, 不过自己开发 Operator
也仅仅是技术选型的其中一个方案而已, 复杂逻辑也可以通过暴露一个 Restful API 的形式, 使用 Airflow 提供的 SimpleHttpOperator
调用对应的服务执行任务.