airflow学习2_定义Pipeline的例子

笔记原始文章https://airflow.apachecn.org/#/zh/tutorial

直接上代码,然后把对应代码含义标记在注释中
这个 Airflow 的 Python 脚本实际上只是一个将 DAG 的结构指定为代码的配置文件

#首先,导入需要的库

#导入DAG对象,我们要用它来实例化一个 DAG
from airflow import DAG
#Operators; 我们需要利用这个对象去执行流程!
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

#一些默认参数
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

#实例化一个DAG
# tutorial是dag_id,它用作DAG的唯一标识符
# 将刚才定义的默认参数传给default_args
# 设置调度间隔,这里设置为每天一次
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1、t2 和 t3 是通过实例化 Operators 创建的任务示例
# 在实例化 operator(执行器)时会生成任务。第一个参数task_id充当任务的唯一标识符。

#任务1,
# 注意到bash_command是BaseOperator特有的参数,
# retries是所有的operator 构造函数中都会有的一个参数,不写就是默认值
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

#这里retries用3来覆盖了原本的默认值
t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

# 这里用到了jinja,模板中的特殊占位符允许编写类似Python语法的代码。然后传递模板数据以呈现最终文档
# 双花括号,用于替换
templated_command = """
    { % for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    { % end for %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'}, #  hook 允许您将参数或对象的字典传递给您的模板
    dag=dag)  

# 设置依赖关系
t2.set_upstream(t1) # 这意味着 t2 会在 t1 成功执行之后才会执行
t3.set_upstream(t1)

说明一:任务参数的优先规则如下

  • 明确传递参数
  • default_args字典中存在的值
  • operator 的默认值(如果存在)

任务必须包含或继承参数task_id和owner,否则 Airflow 将出现异常。

说明二:依赖关系说明

# 这意味着 t2 会在 t1 成功执行之后才会执行
t1.set_downstream(t2)
# 与下面这种写法相等
t2.set_upstream(t1)

# 位移运算符也可用于链式运算
t1 >> t2
# 位移运算符可以链接多个依赖关系,使其变得简洁
t1 >> t2 >> t3

# 下面的这些操作都具有相同的效果:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容