airflow

下载安装airflow

使用pip 安装

pip install apache-airflow

将默认的数据库sqlite 改为mysql

如果想用 mysql 作为 数据库,则 执行以下命令

  1. pip install 'apache-airflow[mysql]'
  2. 下载 mysql(已经有了则跳过)
  3. 更改airflow.cfg 文件,更改以下这两个配置,executor 改为local
executor = LocalExecutor
 # their website
sql_alchemy_conn = mysql://root:PASSWORD@localhost/Airflow

Airflow 是提前建好的数据库。

  1. 执行airflow initdb 命令。如果报错 Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
    则需要在mysql 的配置文件my.cnf ,mysqld 下增加explicit_defaults_for_timestamp = 1。然后重启mysqld 。如果是 brew 安装的mysql 。执行
$ brew services list
$ brew services restart SERVICE_NAME

quick start

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

# start the scheduler
airflow scheduler

# visit localhost:8080 in the browser and enable the example dag in the home page

Example

官方文档的例子

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

实例化一个DAG

tutorial 是 dag_id. 唯一标识你的DAG。

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

Tasks

当实例化 operators 时,就会生成 Tasks。 An object instantiated from an operator is called a constructor. 参数task_id 是 作为task 的唯一标识。

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

一个task 必须包含或者继承task_idowner, 不然Airflow 会引发异常。

Templating with Jinja

Airflow 提供了一些内置的 参数和宏。

建立依赖

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

当发现有环,或者引用多次,airflow就会引起异常。

Testing

运行脚本

将上面的代码放到一个文件 tutorial.py 在DAG 文件夹下,这个文件夹在airflow.cfg 配置好了。默认是~/airflow/dags

python ~/airflow/dags/tutorial.py

命令行元数据验证

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

Testing

airflow test tutorial print_date 2015-06-01

Backfill

执行backfill 会真正执行 dag, 在UI 上可以看到执行状态。

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

airflow Guides

Config

配置文件 airflow.cfg. 也可以用过环境变量 $AIRFLOW__{SECTION}__{KEY}
比如
数据库的连接,在airflow.cfg 如下

[core]
sql_alchemy_conn = my_conn_string

也可以新建一个环境变量,像

AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string

也可以 在key 后面添加_cmd 后面跟bash 命令,比如

[core]
sql_alchemy_conn_cmd = bash_command_to_run

支持_cmd的有

  • sql_alchemy_conn in [core] section

  • fernet_key in [core] section

  • broker_url in [celery] section

  • result_backend in [celery] section

  • password in [atlas] section

  • smtp_password in [smtp] section

  • bind_password in [ldap] section

  • git_password in [kubernetes] section

配置选项的优先级顺序:

  1. 环境变量
  2. 在airflow.cfg的配置
  3. 在airflow.cfg 的命令
  4. Airflow 默认的内置配置
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 在快速启动部分中设置很简单,构建生产级环境需要更多的工作,下面来了解一下。 1. 设置配置选项 第一次运行Airf...
    路小漫阅读 9,807评论 0 3
  • 本文将介绍 Airflow 这一款优秀的调度工具。主要包括 Airflow 的服务构成、Airflow 的 Web...
    a7f00a9019ae阅读 63,941评论 6 42
  • 跑步坚持第二天
    九分伤阅读 197评论 0 0
  • 对不起 那就这样吧 早已放弃期待 只是 这一天来的有点快 原来也偷的半日浮生 该庆幸 曾一起
    yyfly阅读 138评论 0 0
  • 我是文科专业,能选择的理想的职业屈指可数。开始选的是法律,可能差个几分未被录取,调剂到一个医学院校的心理专业,当然...
    InspironI阅读 241评论 0 0

友情链接更多精彩内容