airflow 使用之Operators

Operators 简介

Operators 允许生成特定类型的任务,这些任务在实例化时成为 DAG 中的任务节点。所有的 Operator 均派生自 BaseOperator,并以这种方式继承许多属性和方法。
Operator 主要有三种类型:

  • 执行一项操作或在远程机器上执行一项操作。
  • 将数据从一个系统移动到另一个系统
  • 类似传感器,是一种特定类型 Operator,它将持续运行,直到满足某种条件。例如在 HDFS 或 S3 中等待特定文件到达,在 Hive 中出现特定的分区或一天中的特定时间,继承自 BaseSensorOperator。

BaseOperator 简介

所有的 Operator 都是从 BaseOperator 派生而来,并通过继承获得更多功能。这也是引擎的核心,所以有必要花些时间来理解 BaseOperator 的参数,以了解 Operator 基本特性。

先看一下构造函数的原型:

class airflow.models.BaseOperator(task_id, owner='Airflow', email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1, weight_rule=u'downstream', queue='default', pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, trigger_rule=u'all_success', resources=None, run_as_user=None, task_concurrency=None, executor_config=None, inlets=None, outlets=None, *args, **kwargs)


需要注意的是参数 start_date。start_date 决定了任务第一次运行的时间,最好的实践是设置 start_date 在 schedule_interval 的附近。比如每天跑的任务开始日期设为'2018-09-21 00:00:00',每小时跑的任务设置为 '2018-09-21 05:00:00',airflow 将 start_date 加上 schedule_interval 作为执行日期。需要注意的是任务的依赖需要及时排除,例如任务 A 依赖任务 B,但由于两者 start_date 不同导致执行日期不同,那么任务 A 的依赖永远不会被满足。如果你需要执行一个日常任务,比如每天下午 2 点开始执行,你可以在 DAG中使用 cron 表达式

schedule_interval="0 14 * * *"

BashOperator

官方提供的 DAG 示例-tutorial 就是一个典型的 BashOperator,调用 bash 命令或脚本,传递模板参数就可以参考 tutorial

"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](http://pythonhosted.org/airflow/tutorial.html)
"""
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta


# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',   #这里也可以是一个 bash 脚本文件
    bash_command='date',
    dag=dag)

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""

dag.doc_md = __doc__

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag)

templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

这里 t1 和 t2 都很容易理解,直接调用的是 bash 命令,其实也可以传入带路径的 bash 脚本, t3 使用了 Jinja 模板,"{% %}" 内部是 for 标签,用于循环操作。"{{ }}" 内部是变量,其中 ds 是执行日期,是 airflow 的宏变量,params.my_param 是自定义变量。根据官方提供的模板,稍加修改即可满足我们的日常工作所需。

PythonOperator

PythonOperator 可以调用 Python 函数,由于 Python 基本可以调用任何类型的任务,如果实在找不到合适的 Operator,将任务转为 Python 函数,再使用 PythonOperator 也是一种选择。下面是官方文档给出的 PythonOperator 使用的样例。

from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

import time
from pprint import pprint

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_python_operator', default_args=args,
    schedule_interval=None)


def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag)

    task.set_upstream(run_this)

通过以上代码我们可以看到,任务 task 及依赖关系都是可以动态生成的,这在实际使用中会减少代码编写数量,逻辑也非常清晰,非常方便使用。PythonOperator 与 BashOperator 基本类似,不同的是 python_callable 传入的是 Python 函数,而后者传入的是 bash 指令或脚本。通过 op_kwargs 可以传入任意多个参数。

HiveOperator

hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的 sql 查询功能,可以将 sql 语句转换为 MapReduce 任
务进行运行。在 airflow 中调用 hive 任务,首先需要安装依赖

pip install apache-airflow[hive]

下面是使用示例:

t1 = HiveOperator(
    task_id='simple_query',
    hql='select * from cities',
    dag=dag)

常见的 Operator 还有 DockerOperator,OracleOperator,MysqlOperator,DummyOperator,SimpleHttpOperator 等使用方法类似,不再一一介绍。

如何自定义Operator

如果官方的 Operator 仍不满足需求, 那么我们就自己开发一个 Operator。 开发 Operator 比较简单,继承 BaseOperator 并实现 execute 方法即可:

from airflow.models import BaseOperator

class MyOperator(BaseOperator):

    def __init__(*args, **kwargs):
        super(MyOperator, self).__init__(*args, **kwargs)
    
    def execute(self, context):
        ###do something here

除了 execute 方法外,还可以实现以下方法:
on_kill: 在 task 被 kill 的时候执行。

airflow 是支持Jinjia模板语言的,那么如何在自定义的 Operator 中加入Jinjia模板语言的支持呢?
其实非常简单,只需要在自定义的Operator类中加入属性

template_fields = (attributes_to_be_rendered_with_jinja)

即可,例如官方的 bash_operator中是这样的:

template_fields = ('bash_command', 'env')

这样,在任务执行之前,airflow 会自动渲染 bash_command 或 env 中的属性再执行任务。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容

  • 本文将介绍 Airflow 这一款优秀的调度工具。主要包括 Airflow 的服务构成、Airflow 的 Web...
    a7f00a9019ae阅读 62,020评论 6 42
  • Airflow 是 Airbnb 公司开源的任务调度系统, 通过使用 Python 开发 DAG, 非常方便的调度...
    haitaoyao阅读 9,775评论 2 7
  • 1. 听说凤凰古城是“艳遇”频率高发的一个地方,作为大龄资深单身狗一枚,朵朵对它倾注了各种憧憬和向往,就想着某天,...
    花子鱼阅读 768评论 1 0
  • 六、无心插柳柳成荫 早晨的阳光直射到窗台上,窗外的小麻雀叽叽喳喳叫了起来。夏满荷像往常一样长长地伸了一个懒腰后,挥...
    蜗牛也是牛6267阅读 361评论 0 3
  • 那是一条吵闹的大街,熙熙攘攘的人群一直川流不息。有一个胖乎乎的小男孩,一边走一边吃着橘子。“小胖,把核都吐手里拿着...
    穗拾阅读 715评论 1 8