airflow 动态创建task

airflow 动态创建task

通过http接口获取一个列表结果,遍历列表值,每条记录动态创建一个task

实现方式

动态创建task需要写两个dag实现,auto_rebuild_cube通过http的task获取到需要遍历的列表,提取name到xcom中。
第二个dag文件auto_build 通过 XCom.get_one 方法指定dag文件和execution_date,其中execution_date因为需要指定,所以我这里通过pendulum.now('Asia/Shanghai')直接拿的当前时间。

文件:auto_rebuild_cube

# -*- coding: utf-8 -*-
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.http_operator import SimpleHttpOperator

from airflow import DAG
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.settings import json

import common
from common import china_days_ago

result = []

dag = DAG(
    dag_id='auto_rebuild_cube',
    default_args=common.default_args,
    start_date=china_days_ago(1),
    description='获取遍历列表',
    schedule_interval="9 * * * *",
)

get_all_cubes = SimpleHttpOperator(
    task_id="get_all_cubes",
    endpoint='kylin/api/cubes',
    headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
    data={"limit": "100"},
    method='GET',
    xcom_push=True,
    http_conn_id=common.global_kylin_http_id
)


def multitasking_task(**kwargs):
    xcom = kwargs['task_instance'].xcom_pull(task_ids="get_all_cubes")
    for data in json.loads(xcom):
        result.append(data['name'])
    kwargs['task_instance'].xcom_push(key="cubeNames", value=result)


multi_task = PythonOperator(
    task_id='multi_task',
    python_callable=multitasking_task,
    dag=dag,
    provide_context=True
)

push_task = BashOperator(
    task_id='push_task',
    bash_command="echo {{ task_instance.xcom_pull(key='cubeNames') }} ",
    dag=dag
)

trigger_build_cube = TriggerDagRunOperator(
    task_id='trigger_build_cube',
    trigger_dag_id="build_cube",
    python_callable=common.conditionally_trigger,
    params={'condition_param': True, 'message': '获取列表成功,即将开始动态创建task'},
    dag=dag
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

latest_only >> get_all_cubes >> multi_task >> push_task >> trigger_build_cube

文件:auto_cube

import json

from airflow import DAG
from airflow.models import XCom
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.http_operator import SimpleHttpOperator
from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum

import common
from common import china_days_ago

dag = DAG(dag_id='build_cube',
          default_args=common.default_args,
          start_date=china_days_ago(1),
          schedule_interval=None)


def get_data():
    execution_date = pendulum.now('Asia/Shanghai')
    print("the execution_date is {}", execution_date)
    cube_names = XCom.get_one(dag_id='auto_rebuild_cube', key='cubeNames', execution_date=execution_date,
                              include_prior_dates=True)
    print("cubeNames is {}", cube_names)
    return cube_names


def multitasking_task(data):
    return SimpleHttpOperator(
        task_id="rebuild_cube_{}".format(data),
        endpoint='kylin/api/cubes/{}/rebuild'.format(data),
        headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
        data=json.dumps({"buildType": "BUILD"}),
        method='PUT',
        http_conn_id=common.global_kylin_http_id
    )


start = DummyOperator(
    task_id="start",
    dag=dag
)

end = DummyOperator(
    task_id="end",
    dag=dag
)

for data in get_data():
    start >> [multitasking_task(data)] >> end

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容