Airflow 动态创建Task

import airflow
import MySQLdb
from impala.util import as_pandas
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator

# todo 优化脚本

def get_datas():
    db = MySQLdb.connect(host="xxx.xx.xx.xx", port=4313, user="xxxx", password="xxxx",
                         database="xxxx", charset='utf8')
    cur = db.cursor()
    cur.execute(
        "select * from xxx ")
    return as_pandas(cur).fillna(0).to_dict(orient='records')


def multitasking_task(xx, dag):
    python_command = "python3"
    py_path = "/data/airflow/dag_scripts/xxxx.py"
    return BashOperator(
        task_id='handle_task_with_id_{}'.format(xx['id']),
        bash_command="{} {} {} {}".format(python_command, py_path, "{{ ds }}", xx['id']),
        dag=dag,
    )


def running():
    default_args = {
        'owner': 'xx',
        'start_date': airflow.utils.dates.days_ago(1),
        'email': ['xx@xx.cn'],
        'email_on_failure': False,
        'email_on_retry': False,
    }

    dag = DAG(dag_id='xx',
              default_args=default_args,
              schedule_interval='10 05 * * *', )

    start = DummyOperator(
        task_id="start",
        dag=dag
    )

    end = DummyOperator(
        task_id="end",
        dag=dag
    )

    # todo 优化task过多情况,可以动态创建dag任务

    for data in get_datas():
        start >> multitasking_task(data , dag) >> end

    return dag


dag = running()

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容