Airflow 动态指定DAG运行队列

Airflow版本 2.0.1
根据官网提示,airflow 支持hook函数,可以在DAG运行开始之前 根据我们触发的参数 指定队列
https://github.com/apache/airflow/discussions/14987
新建 airflow_local_settings.py 文件, 放在${AIRFLOW_HOME}/config/airflow_local_settings.py
无需重启scheduler 即可加载。

def task_instance_mutation_hook(task_instance):
    dag_run = task_instance.get_dagrun()
    conf = dag_run.conf
    # print(conf) 
    # conf为 trigger_dag 的 --conf  参数
    if conf['key4']:
        if 'change_queue' in conf:
            return
        else:
            task_instance.queue = conf['key4']
            conf["change_queue"] = 'true'

上面代码表示,在DAG 刚刚要运行的时候,会执行上面的hook函数 拿到conf 根据conf中的 key4 重新执行DAG的queue。

DAG触发参数为

 airflow dags trigger 'dag_test_bash' -r `uuidgen` --conf '{"key4":"test_param"}'

传入的参数 key4 的value 会赋值到 task_instance.queue 也就是在DAG运行前 为其动态分配了队列,实际生产中 可以用该种方式去路由DAG到指定ip的机器去运行,我们可以在每台机器起自己IP+业务前缀的worker,然后在这里指定,如指定queue为 prod_10.0.0.1的queue。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 什么是Airflow? Airflow 是一个使用 python 语言编写的 data pipeline 调度和监...
    什锦甜阅读 18,167评论 0 14
  • 1.Scheduler的启动和停止命令 1.1 Scheduler启动命令 对于Airflow的Scheduler...
    Mr_Wuuuuuuu阅读 2,080评论 0 1
  • 在快速启动部分中设置很简单,构建生产级环境需要更多的工作,下面来了解一下。 1. 设置配置选项 第一次运行Airf...
    路小漫阅读 9,727评论 0 3
  • 简介 airflow是airbnb家的基于DAG(有向无环图)的任务管理系统, 最简单的理解就是一个高级版的cro...
    lao男孩阅读 49,487评论 1 21
  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,628评论 0 11