Python 通过 Celery 框架实现分布式任务队列

Celery 是一个简单、灵活且可靠的分布式消息处理系统,主要用来作为任务队列对海量消息数据进行实时的处理,在多个程序线程或者主机之间传递和分发工作任务。同时也支持计划任务等需求。

一、环境配置

Celery 框架自身并不对传入的消息进行存储,因此在使用前需要先安装第三方的 Message Broker。如 RabbitMQRedis 等。

安装 RabbitMQ

对于 Linux 系统,执行以下命令:

$ sudo apt-get install rabbitmq-server    # 安装 RabbitMQ
$ sudo rabbitmqctl add_user myuser mypassword    # 添加用户 myuser/mypassword
$ sudo rabbitmqctl add_vhost myvhost    # 添加 vhost
$ sudo rabbitmqctl set_user_tags myuser mytag
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"    # 为用户 myuser 设置访问 myvhost 的权限

通过 Docker 安装的步骤如下:

$ docker pull rabbitmq:3.8-management    # 拉取 docker 镜像(包含 web 管理)
# 启动 rabbitmq 容器
$ docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit \
-e RABBITMQ_DEFAULT_VHOST=myvhost \
-e RABBITMQ_DEFAULT_USER=myuser \
-e RABBITMQ_DEFAULT_PASS=mypassword rabbitmq:3.8-management
安装 Redis

$ sudo apt-get install redis-server

安装 Celery

$ pip install celery

二、创建 Celery 应用

Celery 应用是该框架所能提供的所有功能(如管理 tasks 和 workers 等)的入口,须确保它可以被其他模块导入。
以下是一段简单的 Celery app 代码 tasks.py

# tasks.py
from celery import Celery

app = Celery('tasks',
             broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',
             backend='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

使用 RabbitMQ 作为 broker 接收和发送任务消息,使用 Redis 作为 backend 存储计算结果。

运行 Celery worker 服务

$ celery -A tasks worker --loglevel=info

$ celery -A tasks worker --loglevel=info

 -------------- celery@skitarniu-ubuntu18 v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Linux-4.15.0-60-generic-x86_64-with-debian-buster-sid 2019-11-01 07:21:34
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f4f30b84a90
- ** ---------- .> transport:   amqp://myuser:**@localhost:5672/myvhost
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2019-11-01 07:21:35,316: INFO/MainProcess] Connected to amqp://myuser:**@127.0.0.1:5672/myvhost
[2019-11-01 07:21:35,367: INFO/MainProcess] mingle: searching for neighbors
[2019-11-01 07:21:36,535: INFO/MainProcess] mingle: all alone
[2019-11-01 07:21:36,782: INFO/MainProcess] celery@skitarniu-ubuntu18 ready.
任务测试

进入 Python Shell,执行以下命令发布任务并获取结果:

>>> from tasks import add
>>> result = add.delay(4, 4)
>>> result
<AsyncResult: 6f435bc7-f194-469c-837f-54d77f880ace>
>>> result.ready()
True
>>> result.get()
8
>>> result.traceback
>>>

delay() 方法用于发布任务消息,它是 apply_async() 方法的简写,即以异步的方式将任务需求提交给前面启动好的 worker 去处理。delay() 方法返回一个 AsyncResult 对象。
result.ready() 方法可以用来检查提交的任务是否已经完成,返回布尔值。

result.get() 方法则用于获取执行完成后的结果。如任务未完成,则程序会一直等待直到有结果返回。因此该方法是阻塞的,并不常用。可以传入 timeout 参数指定等待的时间上限。
result.get(timeout=1),尝试获取任务执行后的结果,等待 1 秒。若 1 秒之后结果仍未返回,抛出 celery.exceptions.TimeoutError: The operation timed out. 异常。

如果任务执行过程中有抛出异常,则使用 get() 方法获取结果时会重新抛出该异常导致程序中断。可以通过修改 propagate 参数避免此情况:
result.get(propagate=False)
result.traceback 则用于获取任务的 traceback 信息。

三、Calling Tasks

Celery 定义了一些可供 task 实例调用的通用的 Calling API,包括三个方法和一些标准的执行选项:

  • apply_async(args[, kwargs[, ...]]):发送任务消息给 worker
  • delay(*args, **kwargs):发送任务消息的简写形式,不支持执行选项
  • calling (__call__):即在本地进程中直接执行任务函数,不通过 worker 异步执行

以下是一些常见的调用示例:

  • T.delay(arg, kwarg=value)
  • T.apply_async((arg,), {'kwarg': value})
  • T.apply_async(countdown=10)
    10 秒之后开始执行某个任务
  • T.apply_async(eta=now + timedelta(seconds=10))
    10 秒之后开始执行某个任务
  • T.apply_async(countdown=60, expires=120)
    预计 1 分钟后开始执行,但 2 分钟后还未执行则失效
  • T.apply_async(expires=now + timedelta(days=2))
    2 天后失效

通过 countdown 设置任务的延迟执行:

>>> from tasks import add
>>> result = add.apply_async((2, 3))
>>> result.get()
5
>>> delay_result = add.apply_async((2, 3), countdown=15)
>>> delay_result.ready()
False
>>> delay_result.ready()
False
>>> delay_result.ready()
False
>>> delay_result.ready()
True
>>> delay_result.get()
5

还可以通过 eta(estimated time of arrival) 设置延迟执行的时间:

>>> from datetime import datetime, timedelta
>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 3), eta=tomorrow)
<AsyncResult: c7dc6d7f-8b87-49d1-8077-73d7f046d709>

此时 worker 在命令行的日志输出如下:

[2019-11-06 05:16:21,362: INFO/MainProcess] Received task: tasks.add[c7dc6d7f-8b87-49d1-8077-73d7f046d709]
ETA:[2019-11-07 05:16:06.652736+00:00]

四、计划任务

Celery 允许像使用 crontab 那样按计划地定时执行某个任务。参考代码如下:

# tasks.py
from celery import Celery

app = Celery('tasks',
             broker='pyamqp://myuser:mypassword@localhost:5672/myvhost',
             backend='redis://localhost:6379/1')

app.conf.beat_schedule = {
    'add-every-60-seconds': {
        'task': 'tasks.add',
        'schedule': 60.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

@app.task
def add(x, y):
    print(x + y)

运行 celery -A tasks worker -B 启动 worker 服务。
-B 选项表示 beat,即 celery beat 服务,负责执行计划任务。

输出如下(每隔一分钟执行一次):

$ celery -A tasks worker -B
...
[2019-11-06 05:41:34,057: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:42:33,998: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:43:34,056: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:44:34,105: WARNING/ForkPoolWorker-3] 32
[2019-11-06 05:45:34,157: WARNING/ForkPoolWorker-3] 32
...

同时 Celery 也支持更复杂的 crontab 类型的时间规划:

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Crontab 表达式支持的语法如下:

Example Meaning
crontab() 每分钟执行一次
crontab(minute=0, hour=0) 每天半夜 0 点执行
crontab(minute=0, hour='*/3') 每隔 3 小时执行一次(从 0 时开始)
crontab(minute=0, hour='0,3,6,9,12,15,18,21') 同上一条
crontab(day_of_week='sunday') 只在周日执行,每隔一分钟执行一次
crontab(minute='*', hour='*', day_of_week='sun') 同上一条
crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') 只在周四、周五的 3、17、22 时执行,每隔 10 分钟执行一次
crontab(minute=0, hour='*/2,*/3') 只在能被 2 或者 3 整除的整点执行
crontab(minute=0, hour='*/3,8-17') 在能被 3 整除的整点,和 8-17 点之间的整点执行
crontab(0, 0, day_of_month='2') 在每个月的第二天的 0 时执行
crontab(0, 0, day_of_month='11', month_of_year='5') 在每年的 5 月 11 号 0 点执行

参考资料

Celery 4.3.0 documentation

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容