airflow官方文档:https://www.kancloud.cn/apachecn/airflow-doc-zh/1944253
个人觉得airflow写得挺好的一篇文章:https://www.cnblogs.com/woshimrf/p/airflow-first-dag.html
最近在做大数据测试,涉及到airflow,一直听说这个专有名词,但是却从来不知道是什么,带着这个疑问,随着工作慢慢深入,对airflow在网上去进行百度了一下,发现就是一个调度框架,但是有个DAG,DAG就是python脚本,根据写好的python脚本进行DAG任务执行,在airflow这个平台上进行调度执行。
1.什么是数据调度系统
数据中台是一个很高大上的一个概念,对这个概念基本从来没有搞明白过。无意中看到一篇文章这么描述的:数据中台就是收集各个零散的数据,进行标准化、服务化,提供统一的数据服务。那这些数据的整理和处理,就需要有个东西来进行统一调度,即调度系统。
数据调度系统
:将不同的异构数据互相同步,可以按照规划去执行数据处理和任务调度,airflow就根据这个需求产生了,即airflow就是一个任务调度平台。
2.调度系统的相关概念
1.任务定义
2.任务实例
3.执行日期
4.任务依赖
5.任务补录backfill
6.任务重跑
- 1.任务定义
任务定义:定义一个任务的具体内容,比如打印当前时间:{{ds}}----{{今天的时间戳}} - 2.任务实例
任务实例:任务设定了运行时间,每次运行时会生成一个实例,如dag-task-executiondate标记一个任务实例。 - 3.执行日期
执行日期:任务实例运行所代表的任务时间,即execute-date或bizdate,类似hive表的分区。任务时间代表的是任务要处理的数据时间,所以一般都是T-1的时间,即昨天以前的时间所产生的数据。
调度系统中的ds(execution date)通常是过去的一个周期,即本周期执行上周期的任务 - 4.任务依赖
任务依赖:最简单的任务模型etl(Extract & Transformation &Loading,即数据抽取、转换、加载)最少分3步。这3个任务之间有先后顺序,必须前一个执行完毕之后,后一个才可以执行,即存在依赖关系。 - 5.任务补录backfill
airflow里有个功能backfill,可以执行过去时间的任务,类似操作叫补录或补数。 - 6.任务重跑
在airflow里,通过点击任务实例的clear按钮,删除这个任务实例,然后调度系统会再次创建并执行这个实例
3.DAG对象详解
Airflow最核心的一个就是DAG,DAG就是一个python脚本,一个将DAG的结构指定为代码的配置文件。
DAG 对象; 我们将需要它来实例化一个 DAG
from airflow import DAGOperators; 我们需要利用这个对象去执行流程!
from airflow.operators.bash_operator import BashOperatorDAG的默认参数,每一个DAG都有的配置
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # dag属于哪个用户
'depends_on_past': False,
'start_date': datetime(2015, 6, 1), #dag开始的时间
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}实例化一个DAG
实例化DAG对象来进行任务:
1.dag_id=tutorial,唯一的字符串
2.传递 定义好的默认参数字典default_args
3.DAG定义schedule_interval,设置调度间隔为每天一次。schedule_interval为每天调度时间。
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))TASK任务
在实例化operator(执行器)时会生成任务,从一个operator(执行器)实例化出来的对象的过程,被称为一个构造方法。其中task_id充当任务的唯一标识符。
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
任务参数优先级:
1.明确传递参数
2.default_args字典中存在的值
3.operator的默认值(如果存在)
4.命令行元数据验证常用命令
airflow list_dags
#打印出所有正在活跃状态的DAGS
airflow list_tasks tutorial
#打印出tutorial DAG中所有的任务
airflow list_tasks tutorial --tree
#打印出tutorial DAG的任务层次结构
5.时程表的基本格式
* * * * * program
分 时 日 月 周 命令
第1列表示分钟1~59 每分钟用*或者 */1表示
第2列表示小时1~23(0表示0点)
第3列表示日期1~31
第4列表示月份1~12
第5列标识号星期0~6(0表示星期天)
第6列要运行的命令
当第1列 为 * 时表示每分钟都要执行 program,第2列为 * 时表示每小时都要执行程式,其余类推
当第1列为 a-b 时表示从第 a 分钟到第 b 分钟这段时间内要执行,第2列为 a-b 时表示从第 a 到第 b 小时都要执行,其余类推
当第1列为 */n 时表示每 n 分钟个时间间隔执行一次,第2列 为 */n 表示每 n 小时个时间间隔执行一次,其余类推
当第1列为 a, b, c,... 时表示第 a, b, c,... 分钟要执行,第2列 为 a, b, c,... 时表示第 a, b, c...个小时要执行,其余类推
举例说明:着重看空格
30 21 * * * /usr/lighttpd restart
:每晚9点30重启lighttpd
45 4 1,10,22 * * /usr/lighttpd restart
:每月1、10、22号的4点45分重启lighttpd
10 1 * * 6,0 /usr/lighttpd restart
:每周六、日的1点10分重启lighttpd
0,30 18-23 * * * /usr/lighttpd restart
:每天18-23点之间每隔30分钟重启lighttpd
* */1 * * * /usr/lighttpd restart
:每隔一小时重启lighttpd