Airflow和DAG实例中的相关概念

DAG:主要是用于数据处理、调度和数据压缩。
Airflow中文文档:https://airflow.apachecn.org/#/
在知乎中看到一篇讲airflow中很好的文章:https://zhuanlan.zhihu.com/p/336458279
确实其中有几个概念需要先了解清楚,做一件事情,一定要了解他的基本概念,再去实践,清晰的了解自己在做什么很重要。

1.Airflow中的几个概念

DAG:有向无环图(Directed Acyclic Graph),是将所有需要运行的Task按照依赖组合起来,描述的是所有Task执行的顺序和依赖关系。
Operator:用于定义具体任务的算子,如BashOpertar对应的bash命令,PythonOperator对应的python命令。
Task:具体某一时刻要执行的任务实例,是Operator的实例化
Task Instance:Task的进一步实例化,单次Task有失败的可能,存在重复调度,每次task运行就是不同的task instance,并有不同的执行状态,包括:running、success、failed、skipped、up for retry等。


Airflow常用的几个相关概念

2.Dag实例中的几个相关概念

官方文档中的DAG实例,但是其中具体想要实现的问题,可能是对于我们初学者要去着重留意的。
实例代码和实例相关概念查看下面具体的代码项的内容。

2.1官方-DAG实例

 """
Airflow 教程代码位于:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

parameters = [{'label':'跑批时间','name':'batch_date'},{'label':'跑批方式(inc:增量 all:全量)','name':'batch_type'},]

dag = DAG(
    'tutorial', 
    default_args=default_args, 
    schedule_interval=timedelta(days=1),
    external_params=parameters
)

run_this_first = DummyOperator(
    task_id='run_this_first ',
    dag=dag,
)

run_this_last = DummyOperator(
    task_id='run_this_last ',
    dag=dag,
)


# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    { % f or i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
    { % e ndfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

2.2 DAG中实例中几个相关概念

在官网中,写的都是很理论性的语术,我们在实际工作中不一定能对得上或者不容易理解,所以需要特别注意的点我整理了一下。

1.default_args字段参数的相关概念
2.创建dag中的相关概念
3.DumpyOperator空算子不做任何操作,开始和结束使用
4.Operator执行命令的算子
5.设置可以在web页面传递的参数内容

  • 1.default_args 是dag的默认配置,每一个dag都有的配置
    owner:dag属于哪个用户
    start_date:dag的开始时间
    depends_on_past:关注整个DAG过去执行的情况,一旦某一次失败了,后面就不执行了。
default_args = {
    @owner:dag属于哪个用户
    'owner': 'airflow',
    @depends_on_past:dag级别的触发规则,为True时,只有上一个dagrun被成功执行,下一个dagrun才能被执行。
    'depends_on_past': False,
    @start_date:dag的开始时间
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
  • 2.创建dag
    schedule_interval:每天调度的时间
    max_active_runs:每次活动的dag
dag = DAG(
   dag_id= 'tutorial', 
   default_args=default_args, 
   schedule_interval=timedelta(days=1),
max_active_runs=1
)
  • 3.一般还有空的算子,不做任何操作,一般开始和结束使用
@一般还有空的算子,不做任何操作,一般开始和结束使用
run_this_first = DummyOperator(
    task_id='run_this_first ',
    dag=dag,
)

run_this_last = DummyOperator(
    task_id='run_this_last ',
    dag=dag,
)
  • 4.Operator执行bash命令的算子
    templated_command :针对bash操作,用户可以动态生成脚本命令
    bash_command:需要执行的命令,如果这个地方写的是shell脚本路径,一定要在脚本名称后边加空格
@针对bash操作,用户可以动态生成脚本命令
templated_command = """
    { % f or i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
    { % e ndfor %}
"""

t3 = BashOperator(
    task_id='templated',
    @需要执行的命令,如果这个地方写的是shell脚本路径,一定要在脚本名称后边加空格
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
  • 5.Airflow页面输入参数parameters
    Airflow除了schedule定时执行任务以外,还有一些特定的参数需要输入,执行特定的任务。比如:重跑某个特定的任务,跑特定某个时间的任务等。
    比如这里配置的参数配置,在dag定义时通过参数external_params=parameters传入。
dag = DAG(
    dag_id='tutorial', 
    default_args=default_args, 
    schedule_interval=timedelta(days=1),
    @通过变量external_params把参数传入进去
    external_params=parameters
)
@DAG中可以进行参数设置
parameters = [{'label':'数据日期(yyyy-MM-dd)','name':'batch_date'},{'label':'跑批时间(yyyy-MM-dd)','name':'batch_type'},]
web页面传递参数内容
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容