airflow DAG配置文件

更多airflow资料,可查看:airflow从入门到精通学习笔记系列

DAG概念

DAG(有向无环图),在airflow中定义一个有依赖的作业执行集合,包含有一组特定的作业任务,每个任务都是一系列具体的操作命令。

  • Task为DAG中具体的作业任务,任务一般是一个具体的操作,如执行某条shell命令、执行某个python脚本等;
  • DAG中包含有多个任务Task及Task之间的执行依赖关系、调度时间;

官方样例

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

配置说明

  • DAG类定义了一个tutorial名称的DAG;
  • DAG类的schedule_interval参数定义了调度策略为每天执行一次;
  • t1、t2、t3定义了三个Bash执行器的Task;
  • t1的bash_command参数指定这个任务的具体操作,即执行date命令;
  • t3任务的使用了jinja传入参数;
  • set_upstream定义了依赖关系,即t2、t3依赖于t1,需等待t1执行完发触发执行;

常用的配置文件管理命令

  • 配置文件的存放路径:$AIRFLOW_HOME/dags
[root@node0 data]# ls $AIRFLOW_HOME/dags
__pycache__  tutorial.py
  • 查看已存在的FDAG配置
[root@node0 data]# airflow list_dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
tutorial
  • 查看指定DAG配置中的子任务Task
[root@node0 data]# airflow list_tasks tutorial
print_date
sleep
templated

更多airflow资料,可查看:airflow从入门到精通学习笔记系列

如发现文中有错误,望留言指明,万分感激;
如对此文章内容感兴趣,想进一步探讨,可以留言交流;
如想转发此文章,请留言协商一下,切勿不指明出处的转发,尊重原创;
如阅读过程中有收获,并想感谢一下,欢迎打赏;
----小林帮

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 本文将介绍 Airflow 这一款优秀的调度工具。主要包括 Airflow 的服务构成、Airflow 的 Web...
    a7f00a9019ae阅读 63,142评论 6 42
  • 在快速启动部分中设置很简单,构建生产级环境需要更多的工作,下面来了解一下。 1. 设置配置选项 第一次运行Airf...
    路小漫阅读 9,679评论 0 3
  • 我们使用 Airflow 作为任务调度引擎, 那么就需要有一个 DAG 的定义文件, 每次修改 DAG 定义, 提...
    haitaoyao阅读 4,033评论 0 7
  • 简介 airflow是airbnb家的基于DAG(有向无环图)的任务管理系统, 最简单的理解就是一个高级版的cro...
    lao男孩阅读 49,417评论 1 21
  • 本月主题公路小说,给我找个恰当的理由出行,体验在路途上,带上《格列佛游记》。 《格列佛游记》可以说是一部政治寓言小...
    山果花开阅读 358评论 2 5