应用场景
ETL 差不多是数据处理的基础,要求非常稳定,容错率高,而且能够很好的监控,其全称是 Extract,Transform,Load, 一般情况下是将乱七八糟的数据进行预处理,然后放到储存空间上。
一般过程上,在数据进入后需要人工的去将数据的按流程处理一遍,调用各种工具。这个过程有些机械化,所以可以考虑使用脚本或者其它工具进行控制。
airflow 是能进行数据pipeline的管理,甚至是可以当做更高级的cron job 来使用。它是用python写的,能进行工作流的调度,提供更可靠的流程,而且它还有自带的UI。
下载与安装
安装:
pip install airflow[all]
初始化:
airflow initdb
启动:
airflow webserve
访问8080可以看到:
airflow的重要概念:DAG
DAG是directed asyclic graph,在很多机器学习里有应用,也就是所谓的有向非循环。但是在airflow里你可以看做是一个小的工程,小的流程,因为每个小的工程里可以有很多“有向”的task,最终达到某种目的。
在官网中的介绍里说dag的特点:
- Scheduled: each job should run at a certain scheduled interval
- Mission critical: if some of the jobs aren’t running, we are in trouble
- Evolving: as the company and the data team matures, so does the data processing
- Heterogenous: the stack for modern analytics is changing quickly, and most companies run multiple systems that need to be glued together
加入自己的DAG
airflow会在默认文件夹下生成airflow文件夹,然后你只要在里面新建一个文件dag就可以了。
然后创建一个自己的dag文件,写好之后,只要将这个dag放入之前建立好的dag文件夹
运行自己的DAG
我们使用官网给出的样板:
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
为了验证其准确性,可以在本地的python开发环境下进行一次运行。
官网给出的测试方法是:
python ~/airflow/dags/tutorial.py
对DAG进行查看:
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
- 查看所有DAG
- 查看某DAG中的tasks
list_tasks 后填自己的DAG名,模板给的是tutorial
- 查看tasks间的依赖关系:
- 测试task
airflow test tutorial templated 2015-06-01
测试所有task无问题后,运行DAG
首先启动airflow调度:
airflow scheduler
你之前创建的DAG就从local dag被调度到了平台中。
运行DAG:
将最前方的滑至On
点击运行
在点击查看图
结果如下:
然后点击task,再点击log就可以查看task的日志输出:
样例结果: