DAG:主要是用于数据处理、调度和数据压缩。
Airflow中文文档:https://airflow.apachecn.org/#/
在知乎中看到一篇讲airflow中很好的文章:https://zhuanlan.zhihu.com/p/336458279
确实其中有几个概念需要先了解清楚,做一件事情,一定要了解他的基本概念,再去实践,清晰的了解自己在做什么很重要。
1.Airflow中的几个概念
DAG:有向无环图(Directed Acyclic Graph),是将所有需要运行的Task按照依赖组合起来,描述的是所有Task执行的顺序和依赖关系。
Operator:用于定义具体任务的算子,如BashOpertar对应的bash命令,PythonOperator对应的python命令。
Task:具体某一时刻要执行的任务实例,是Operator的实例化
Task Instance:Task的进一步实例化,单次Task有失败的可能,存在重复调度,每次task运行就是不同的task instance,并有不同的执行状态,包括:running、success、failed、skipped、up for retry等。
Airflow常用的几个相关概念
2.Dag实例中的几个相关概念
官方文档中的DAG实例,但是其中具体想要实现的问题,可能是对于我们初学者要去着重留意的。
实例代码和实例相关概念查看下面具体的代码项的内容。
2.1官方-DAG实例
"""
Airflow 教程代码位于:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
parameters = [{'label':'跑批时间','name':'batch_date'},{'label':'跑批方式(inc:增量 all:全量)','name':'batch_type'},]
dag = DAG(
'tutorial',
default_args=default_args,
schedule_interval=timedelta(days=1),
external_params=parameters
)
run_this_first = DummyOperator(
task_id='run_this_first ',
dag=dag,
)
run_this_last = DummyOperator(
task_id='run_this_last ',
dag=dag,
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
2.2 DAG中实例中几个相关概念
在官网中,写的都是很理论性的语术,我们在实际工作中不一定能对得上或者不容易理解,所以需要特别注意的点我整理了一下。
1.default_args字段参数的相关概念
2.创建dag中的相关概念
3.DumpyOperator空算子不做任何操作,开始和结束使用
4.Operator执行命令的算子
5.设置可以在web页面传递的参数内容
- 1.default_args 是dag的默认配置,每一个dag都有的配置
owner:dag属于哪个用户
start_date:dag的开始时间
depends_on_past:关注整个DAG过去执行的情况,一旦某一次失败了,后面就不执行了。
default_args = {
@owner:dag属于哪个用户
'owner': 'airflow',
@depends_on_past:dag级别的触发规则,为True时,只有上一个dagrun被成功执行,下一个dagrun才能被执行。
'depends_on_past': False,
@start_date:dag的开始时间
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
- 2.创建dag
schedule_interval:每天调度的时间
max_active_runs:每次活动的dag
dag = DAG(
dag_id= 'tutorial',
default_args=default_args,
schedule_interval=timedelta(days=1),
max_active_runs=1
)
- 3.一般还有空的算子,不做任何操作,一般开始和结束使用
@一般还有空的算子,不做任何操作,一般开始和结束使用
run_this_first = DummyOperator(
task_id='run_this_first ',
dag=dag,
)
run_this_last = DummyOperator(
task_id='run_this_last ',
dag=dag,
)
- 4.Operator执行bash命令的算子
templated_command :针对bash操作,用户可以动态生成脚本命令
bash_command:需要执行的命令,如果这个地方写的是shell脚本路径,一定要在脚本名称后边加空格
@针对bash操作,用户可以动态生成脚本命令
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator(
task_id='templated',
@需要执行的命令,如果这个地方写的是shell脚本路径,一定要在脚本名称后边加空格
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
- 5.Airflow页面输入参数parameters
Airflow除了schedule定时执行任务以外,还有一些特定的参数需要输入,执行特定的任务。比如:重跑某个特定的任务,跑特定某个时间的任务等。
比如这里配置的参数配置,在dag定义时通过参数external_params=parameters传入。
dag = DAG(
dag_id='tutorial',
default_args=default_args,
schedule_interval=timedelta(days=1),
@通过变量external_params把参数传入进去
external_params=parameters
)
@DAG中可以进行参数设置
parameters = [{'label':'数据日期(yyyy-MM-dd)','name':'batch_date'},{'label':'跑批时间(yyyy-MM-dd)','name':'batch_type'},]
web页面传递参数内容