下载安装airflow
使用pip 安装
pip install apache-airflow
将默认的数据库sqlite 改为mysql
如果想用 mysql 作为 数据库,则 执行以下命令
pip install 'apache-airflow[mysql]'
- 下载 mysql(已经有了则跳过)
- 更改airflow.cfg 文件,更改以下这两个配置,executor 改为local
executor = LocalExecutor
# their website
sql_alchemy_conn = mysql://root:PASSWORD@localhost/Airflow
Airflow 是提前建好的数据库。
- 执行
airflow initdb
命令。如果报错Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
则需要在mysql 的配置文件my.cnf ,mysqld 下增加explicit_defaults_for_timestamp = 1
。然后重启mysqld 。如果是 brew 安装的mysql 。执行
$ brew services list
$ brew services restart SERVICE_NAME
quick start
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler
# visit localhost:8080 in the browser and enable the example dag in the home page
Example
官方文档的例子
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
实例化一个DAG
tutorial 是 dag_id
. 唯一标识你的DAG。
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
Tasks
当实例化 operators 时,就会生成 Tasks。 An object instantiated from an operator is called a constructor. 参数task_id
是 作为task 的唯一标识。
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
一个task 必须包含或者继承task_id
和 owner
, 不然Airflow 会引发异常。
Templating with Jinja
Airflow 提供了一些内置的 参数和宏。
建立依赖
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
当发现有环,或者引用多次,airflow就会引起异常。
Testing
运行脚本
将上面的代码放到一个文件 tutorial.py
在DAG 文件夹下,这个文件夹在airflow.cfg 配置好了
。默认是~/airflow/dags
python ~/airflow/dags/tutorial.py
命令行元数据验证
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
Testing
airflow test tutorial print_date 2015-06-01
Backfill
执行backfill 会真正执行 dag, 在UI 上可以看到执行状态。
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
airflow Guides
Config
配置文件 airflow.cfg. 也可以用过环境变量 $AIRFLOW__{SECTION}__{KEY}
比如
数据库的连接,在airflow.cfg 如下
[core]
sql_alchemy_conn = my_conn_string
也可以新建一个环境变量,像
AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string
也可以 在key 后面添加_cmd
后面跟bash 命令,比如
[core]
sql_alchemy_conn_cmd = bash_command_to_run
支持_cmd
的有
sql_alchemy_conn
in[core]
sectionfernet_key
in[core]
sectionbroker_url
in[celery]
sectionresult_backend
in[celery]
sectionpassword
in[atlas]
sectionsmtp_password
in[smtp]
sectionbind_password
in[ldap]
sectiongit_password
in[kubernetes]
section
配置选项的优先级顺序:
- 环境变量
- 在airflow.cfg的配置
- 在airflow.cfg 的命令
- Airflow 默认的内置配置