Celery 部署小记

Celery 部署小记

参考版本: 4.0.2

概念

旧版中文翻译

最新版英文文档

以下摘自官方文档的翻译:

Celery - 分布式任务队列

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具

它是一个专注于实时处理的任务队列,同时也支持任务调度

CeleryPython的一个“开箱即用”的任务队列模块,易用且提供与多语言对接的集成方案

角色列表:

  1. task(任务)
  2. broker(中间人)
  3. worker(消费者)
  4. result backend(结果后端),可选
  5. celerybeat(任务调度器),用于定时任务

使用场景

  • 处理高功耗、高延时的并发实时操作。可进入celery任务队列,由workers去执行,属于应用层的任务调度
  • 可以在应用层(通过任务调度器celerybeat进程)执行定时任务

实时任务的建立&执行的过程:

  1. 创建task文件,定义一些任务方法(@app.task),这些是任务实际执行的代码
  2. 在配置文件中定义brokerresult backend的实体(用于存储信息,可以是RabbitMQ, Redis, 数据库等),以及一些workers执行时需要遵循的规定
  3. 启动workers进程,它会加载配置、绑定task文件中的任务方法,然后会监控broker中的每个请求数据包
  4. 客户端将任务方法名、参数列表等参数包装好,投递到broker中,然后不等待执行结果返回,就继续往下执行主程序
  5. workers进程检测到broker中有任务需要执行,故从中取得数据传递到相应的方法执行,执行完将结果存储到result backend中(如果有设置结果后端)
  6. 在执行每个任务的时候,workers中还维护了该任务的执行状态、执行结果、错误信息等数据项,便于客户端随时调用getStatus等方法来查询结果

上述的workers进程泛指一个监控中间人、执行实际任务的消费者、存数据至结果后端、维护任务信息的进程集合,实际的实现可能是多个子进程或多个子线程共同完成

定时任务的过程: 通过启动额外的进程celerybeat(任务调度器),每当有到时间执行的任务,就通知workers执行,其他过程与实时任务大致相同

快速开始

参考: http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps

环境:ubuntu 16.04 virtualenv python3

【1】 创建mytasks.py文件,键入如下代码:

from celery import Celery
import celeryconfig

app = Celery('mytasks')
# 分离配置到celeryconfig文件
app.config_from_object(celeryconfig)


@app.task
def add(x, y):
    return x + y

【2】 创建celeryconfig.py配置文件,键入如下代码:

broker_url = 'redis://localhost'
result_backend = 'redis://localhost'
# result_backend = 'db+mysql://{user}:{pass}@{host}:{port}/{database}'
include = ['mytasks']
task_serializer = 'json'
result_serializer = 'json'
timezone = 'Asia/Shanghai'
enable_utc = False

除了基本的broker, result backend之外,还有一些其他配置,规定workers的执行

【3】 在项目目录下执行celery -A mytasks worker --loglevel=info (如提示缺少redis类库,使用pip安装即可)

(env) tyruschin@tyruschin-B85M-DS3H-A:~/Desktop/celerytask$ celery -A mytasks worker --loglevel=info

 -------------- celery@tyruschin-B85M-DS3H-A v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-87-generic-x86_64-with-debian-stretch-sid 2017-09-06 11:54:49
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         mytasks:0x7fbbf5bbedd8
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . mytasks.add

[2017-09-06 11:54:49,511: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-09-06 11:54:49,520: INFO/MainProcess] mingle: searching for neighbors
[2017-09-06 11:54:50,538: INFO/MainProcess] mingle: all alone
[2017-09-06 11:54:50,546: INFO/MainProcess] celery@tyruschin-B85M-DS3H-A ready.
[2017-09-06 11:54:52,887: INFO/MainProcess] Events of group {task} enabled by remote.

可以看到,进程启动的时候,加载了[config],建立了队列[queues],绑定了任务列表[tasks],然后执行了主进程MainProcess

【4】 在另一终端创建客户端程序client.py,键入如下代码:

import mytasks
mytasks.add.delay(1, 3)

python client.py,此时查看celery进程执行的终端,多了两行log:

[2017-09-06 12:03:01,874: INFO/MainProcess] Received task: mytasks.add[bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07]  
[2017-09-06 12:03:01,884: INFO/ForkPoolWorker-2] Task mytasks.add[bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07] succeeded in 0.007293057162314653s: 4

可以知道主进程MainProcess接收到任务(task idbf3ea8c0-a053-48fb-ad14-25f0f1cc8c07),从进程池中选择一个worker,即ForkPoolWorker-2来执行任务,得到结果为4,经过了约0.007s

【5】 打开redis-cli查看result backendtask idredis key相对应,具体如下:

127.0.0.1:6379> keys *
...
6) "celery-task-meta-bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07"
127.0.0.1:6379> get celery-task-meta-bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07
"{\"children\": [], \"result\": 4, \"status\": \"SUCCESS\", \"task_id\": \"bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07\", \"traceback\": null}"

使用的细节

celery的配置

4.0版本之后,配置建议采用小写,与旧版有些变化,文档摸我

一些用到的配置:

  • broker_url: 中间人配置(默认使用RabbitMQ)
  • result_backend: 结果后端(默认不保存结果)
  • include: list结构,通常用于task实体方法与配置不在同一个文件的情况,与Pythonimport类似,且支持用parent_mod.module_name作为值来引入子模块

简单证明调用任务.delay()是不等待结果直接返回的(异步非阻塞)

之前使用到了两个文件client.pymytasks.py,修改它们:

# client.py
import mytasks

res = mytasks.add.delay(1, 3)
print("继续执行,不会阻塞,返回值为", res)

获取了任务返回的结果,并输出了一行文字

# mytasks.py
from celery import Celery
import celeryconfig
import time

app = Celery('mytasks')
app.config_from_object(celeryconfig)


@app.task
def add(x, y):
    time.sleep(10)
    return x + y

在任务方法体内,先模拟一个高延时的情况,10秒后再处理返回结果

重启celery,使其重新绑定任务方法

(env) tyruschin@tyruschin-B85M-DS3H-A:~/Desktop/celerytask$ python client.py 
继续执行,不会阻塞,返回值为 564d1adb-9c87-4a29-9c9c-a8a1ba116b0b

执行client.py发现文字马上输出了,且返回值是task id而不是计算结果(此时结果还没算出来)

上述步骤证明了非阻塞

[2017-09-06 16:09:33,936: INFO/MainProcess] Received task: mytasks.add[564d1adb-9c87-4a29-9c9c-a8a1ba116b0b]  
[2017-09-06 16:09:43,947: INFO/ForkPoolWorker-2] Task mytasks.add[564d1adb-9c87-4a29-9c9c-a8a1ba116b0b] succeeded in 10.00852725515142s: 4

注意到,在celery进程中,过了约10.009s才返回计算结果,期间客户端可以通过获取状态来检出结果(此处略去)

上述两个步骤证明了异步

使用监控管理程序

最简单的实践是flower,它可以监控任务的执行过程、统计相关信息、控制和修改workers等(但是flower进程断开,相关数据会丢失)

中文文档
最新英文文档

# install
pip install flower

# execute
flower -A mytasks --port=5555

特别注意:必须配置result backend,方可启动flower,且flower中的数据与result backend不关联

其他的一些

1. 自定义状态

...
# 增加引入这两个模块
from celery.exceptions import Ignore
from celery import current_task

@app.task
def test():
    ...
    # 改变当前状态
    current_task.update_state(state='WRONG_CODE', meta=result)
    # 使当前状态成为最终的状态
    raise Ignore()
    # 如果return的话,状态会转变成success
    # return result

根据我的测试,celery默认只有两种最终状态,即successfailure,如果要增加最终状态,则必须更新状态之后抛出Ignore异常

update_state中的meta参数为记录到结果后端的元数据,记录的格式以配置result_serializer为准,4.0版本之后默认为json,之前为pickle

此处有一个坑:

flower不认为Ignore是一个结束状态,所以一直处于“处理中”的状态,故自定义状态不会被flower记录到,只能记录在result backend

2. 使用MySQL作为结果后端

result_backend = 'db+mysql://{user}:{pass}@{host}:{port}/{database}

数据库需要事先建立,在第一次跑的时候,会默认生成两张表celery_taskmetacelery_tasksetmeta,以上的案例只会涉及到第一张表

此处有两个坑:

之前提到配置result_serializer,发现即使修改为json(并且4.0默认就是),MySQL中仍然使用了pickle来序列化。配置timezoneenable_utcMySQL的存储中也同样是失效的

正因上面的坑,有些客户端没有做MySQL结果的获取适配(如:celery-php),导致结果后端可以写入但不能读取

可能的解决方案:自定义数据表来存储相应的结果到MySQL

3. 其他客户端

参考链接

支持node, PHP

相关资料和参考

什么是“自带电池”("batteries included")

怎样理解阻塞非阻塞与同步异步的区别?

celery最佳实践

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容