Airflow中DAG的用法

airflow官方文档:https://www.kancloud.cn/apachecn/airflow-doc-zh/1944253
个人觉得airflow写得挺好的一篇文章:https://www.cnblogs.com/woshimrf/p/airflow-first-dag.html
最近在做大数据测试,涉及到airflow,一直听说这个专有名词,但是却从来不知道是什么,带着这个疑问,随着工作慢慢深入,对airflow在网上去进行百度了一下,发现就是一个调度框架,但是有个DAG,DAG就是python脚本,根据写好的python脚本进行DAG任务执行,在airflow这个平台上进行调度执行。

1.什么是数据调度系统

数据中台是一个很高大上的一个概念,对这个概念基本从来没有搞明白过。无意中看到一篇文章这么描述的:数据中台就是收集各个零散的数据,进行标准化、服务化,提供统一的数据服务。那这些数据的整理和处理,就需要有个东西来进行统一调度,即调度系统。
数据调度系统:将不同的异构数据互相同步,可以按照规划去执行数据处理和任务调度,airflow就根据这个需求产生了,即airflow就是一个任务调度平台。

2.调度系统的相关概念

1.任务定义
2.任务实例
3.执行日期
4.任务依赖
5.任务补录backfill
6.任务重跑

  • 1.任务定义
    任务定义:定义一个任务的具体内容,比如打印当前时间:{{ds}}----{{今天的时间戳}}
  • 2.任务实例
    任务实例:任务设定了运行时间,每次运行时会生成一个实例,如dag-task-executiondate标记一个任务实例。
  • 3.执行日期
    执行日期:任务实例运行所代表的任务时间,即execute-date或bizdate,类似hive表的分区。任务时间代表的是任务要处理的数据时间,所以一般都是T-1的时间,即昨天以前的时间所产生的数据。
    调度系统中的ds(execution date)通常是过去的一个周期,即本周期执行上周期的任务
  • 4.任务依赖
    任务依赖:最简单的任务模型etl(Extract & Transformation &Loading,即数据抽取、转换、加载)最少分3步。这3个任务之间有先后顺序,必须前一个执行完毕之后,后一个才可以执行,即存在依赖关系。
  • 5.任务补录backfill
    airflow里有个功能backfill,可以执行过去时间的任务,类似操作叫补录或补数。
  • 6.任务重跑
    在airflow里,通过点击任务实例的clear按钮,删除这个任务实例,然后调度系统会再次创建并执行这个实例

3.DAG对象详解

DAG实例

Airflow最核心的一个就是DAG,DAG就是一个python脚本,一个将DAG的结构指定为代码的配置文件。

  • DAG 对象; 我们将需要它来实例化一个 DAG
    from airflow import DAG

  • Operators; 我们需要利用这个对象去执行流程!
    from airflow.operators.bash_operator import BashOperator

  • DAG的默认参数,每一个DAG都有的配置
    from datetime import datetime, timedelta
    default_args = {
    'owner': 'airflow', # dag属于哪个用户
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1), #dag开始的时间
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    }

  • 实例化一个DAG
    实例化DAG对象来进行任务:
    1.dag_id=tutorial,唯一的字符串
    2.传递 定义好的默认参数字典default_args
    3.DAG定义schedule_interval,设置调度间隔为每天一次。schedule_interval为每天调度时间。
    dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

  • TASK任务
    在实例化operator(执行器)时会生成任务,从一个operator(执行器)实例化出来的对象的过程,被称为一个构造方法。其中task_id充当任务的唯一标识符。
    t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)
    任务参数优先级:
    1.明确传递参数
    2.default_args字典中存在的值
    3.operator的默认值(如果存在)

4.命令行元数据验证常用命令

airflow list_dags #打印出所有正在活跃状态的DAGS
airflow list_tasks tutorial #打印出tutorial DAG中所有的任务
airflow list_tasks tutorial --tree #打印出tutorial DAG的任务层次结构

5.时程表的基本格式

*  *  *  *  *  program
分  时  日  月  周  命令
第1列表示分钟1~59 每分钟用*或者 */1表示
第2列表示小时1~23(0表示0点)
第3列表示日期1~31
第4列表示月份1~12
第5列标识号星期0~6(0表示星期天)
第6列要运行的命令

当第1列 为 * 时表示每分钟都要执行 program,第2列为 * 时表示每小时都要执行程式,其余类推
当第1列为 a-b 时表示从第 a 分钟到第 b 分钟这段时间内要执行,第2列为 a-b 时表示从第 a 到第 b 小时都要执行,其余类推
当第1列为 */n 时表示每 n 分钟个时间间隔执行一次,第2列 为 */n 表示每 n 小时个时间间隔执行一次,其余类推
当第1列为 a, b, c,... 时表示第 a, b, c,... 分钟要执行,第2列 为 a, b, c,... 时表示第 a, b, c...个小时要执行,其余类推

举例说明:着重看空格
30 21 * * * /usr/lighttpd restart:每晚9点30重启lighttpd
45 4 1,10,22 * * /usr/lighttpd restart:每月1、10、22号的4点45分重启lighttpd
10 1 * * 6,0 /usr/lighttpd restart :每周六、日的1点10分重启lighttpd
0,30 18-23 * * * /usr/lighttpd restart:每天18-23点之间每隔30分钟重启lighttpd
* */1 * * * /usr/lighttpd restart:每隔一小时重启lighttpd

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容