Airflow中sensor依赖(DAG依赖链路梳理)

DAG在执行之前,往往存在很多依赖,需要按顺序进行执行下去。Airflow的Sensor(传感器)可用于保持在一段时间间隔内处于执行中,当满足条件时执行成功,当超时时执行失败。

1.DAG之间的依赖(DAG2需要在DAG1执行成功后在执行)

DAG之间的依赖关系可以通过编写ExternalTaskSensor()去指定依赖哪个dag的id和task。
dag1_check_task=ExternalTaskSensor(
task_id="dag1_check_task", #指定该dag的名称,在airflow中的列表名称显示的就是这个名字
external_dag_id='dag1', #指定依赖具体dag的id
external_task_id=None, #指定依赖dag的哪一个task任务
allowed_states=['success'], #列出允许的states,default是success
execution_delta=timedelta(hours=8), #与执行的external任务的时间差,即往前推8个小时内有一个成功的dag1的记录
dag=dag
)
具体代码如下:

import airflow
from airflow.models import DAG
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import timedelta

dag = DAG(
   dag_id= dag2, 
)

dag1_check_task=ExternalTaskSensor(
    @指定该dag的名称,在airflow列表页面显示的就是这个任务的id名称
    task_id="dag1_check_task", 
    @指定依赖哪一个dag的id
    external_dag_id='dag1',     
    @指定依赖dag的哪一个task任务
    external_task_id=None, 
    @列出允许的states,default是success
    allowed_states=['success'], 
    @与执行的external任务的时间差,即往前推8个小时内有一个成功的dag1的记录
    execution_delta=timedelta(hours=8), 
    dag=dag
)

dg2_task_2=BashOperator(...)
start >> dag1_check_task >> dag2_task_2 >> end
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容