DAG在执行之前,往往存在很多依赖,需要按顺序进行执行下去。Airflow的Sensor(传感器)可用于保持在一段时间间隔内处于执行中,当满足条件时执行成功,当超时时执行失败。
1.DAG之间的依赖(DAG2需要在DAG1执行成功后在执行)
DAG之间的依赖关系可以通过编写ExternalTaskSensor()
去指定依赖哪个dag的id和task。
dag1_check_task=ExternalTaskSensor(
task_id="dag1_check_task", #指定该dag的名称,在airflow中的列表名称显示的就是这个名字
external_dag_id='dag1', #指定依赖具体dag的id
external_task_id=None, #指定依赖dag的哪一个task任务
allowed_states=['success'], #列出允许的states,default是success
execution_delta=timedelta(hours=8), #与执行的external任务的时间差,即往前推8个小时内有一个成功的dag1的记录
dag=dag
)
具体代码如下:
import airflow
from airflow.models import DAG
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import timedelta
dag = DAG(
dag_id= dag2,
)
dag1_check_task=ExternalTaskSensor(
@指定该dag的名称,在airflow列表页面显示的就是这个任务的id名称
task_id="dag1_check_task",
@指定依赖哪一个dag的id
external_dag_id='dag1',
@指定依赖dag的哪一个task任务
external_task_id=None,
@列出允许的states,default是success
allowed_states=['success'],
@与执行的external任务的时间差,即往前推8个小时内有一个成功的dag1的记录
execution_delta=timedelta(hours=8),
dag=dag
)
dg2_task_2=BashOperator(...)
start >> dag1_check_task >> dag2_task_2 >> end