Celery 部署小记
参考版本: 4.0.2
概念
以下摘自官方文档的翻译:
Celery - 分布式任务队列
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具
它是一个专注于实时处理的任务队列,同时也支持任务调度
Celery是Python的一个“开箱即用”的任务队列模块,易用且提供与多语言对接的集成方案
角色列表:
- task(任务)
- broker(中间人)
- worker(消费者)
- result backend(结果后端),可选
- celerybeat(任务调度器),用于定时任务
使用场景
- 处理高功耗、高延时的并发实时操作。可进入celery任务队列,由workers去执行,属于应用层的任务调度
- 可以在应用层(通过任务调度器
celerybeat进程)执行定时任务
实时任务的建立&执行的过程:
- 创建
task文件,定义一些任务方法(@app.task),这些是任务实际执行的代码 - 在配置文件中定义
broker和result backend的实体(用于存储信息,可以是RabbitMQ,Redis, 数据库等),以及一些workers执行时需要遵循的规定 - 启动
workers进程,它会加载配置、绑定task文件中的任务方法,然后会监控broker中的每个请求数据包 - 客户端将任务方法名、参数列表等参数包装好,投递到
broker中,然后不等待执行结果返回,就继续往下执行主程序 -
workers进程检测到broker中有任务需要执行,故从中取得数据传递到相应的方法执行,执行完将结果存储到result backend中(如果有设置结果后端) - 在执行每个任务的时候,
workers中还维护了该任务的执行状态、执行结果、错误信息等数据项,便于客户端随时调用getStatus等方法来查询结果
上述的
workers进程泛指一个监控中间人、执行实际任务的消费者、存数据至结果后端、维护任务信息的进程集合,实际的实现可能是多个子进程或多个子线程共同完成
定时任务的过程: 通过启动额外的进程celerybeat(任务调度器),每当有到时间执行的任务,就通知workers执行,其他过程与实时任务大致相同
快速开始
参考: http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps
环境:ubuntu 16.04 virtualenv python3
【1】 创建mytasks.py文件,键入如下代码:
from celery import Celery
import celeryconfig
app = Celery('mytasks')
# 分离配置到celeryconfig文件
app.config_from_object(celeryconfig)
@app.task
def add(x, y):
return x + y
【2】 创建celeryconfig.py配置文件,键入如下代码:
broker_url = 'redis://localhost'
result_backend = 'redis://localhost'
# result_backend = 'db+mysql://{user}:{pass}@{host}:{port}/{database}'
include = ['mytasks']
task_serializer = 'json'
result_serializer = 'json'
timezone = 'Asia/Shanghai'
enable_utc = False
除了基本的broker, result backend之外,还有一些其他配置,规定workers的执行
【3】 在项目目录下执行celery -A mytasks worker --loglevel=info (如提示缺少redis类库,使用pip安装即可)
(env) tyruschin@tyruschin-B85M-DS3H-A:~/Desktop/celerytask$ celery -A mytasks worker --loglevel=info
-------------- celery@tyruschin-B85M-DS3H-A v4.1.0 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-87-generic-x86_64-with-debian-stretch-sid 2017-09-06 11:54:49
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: mytasks:0x7fbbf5bbedd8
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. mytasks.add
[2017-09-06 11:54:49,511: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-09-06 11:54:49,520: INFO/MainProcess] mingle: searching for neighbors
[2017-09-06 11:54:50,538: INFO/MainProcess] mingle: all alone
[2017-09-06 11:54:50,546: INFO/MainProcess] celery@tyruschin-B85M-DS3H-A ready.
[2017-09-06 11:54:52,887: INFO/MainProcess] Events of group {task} enabled by remote.
可以看到,进程启动的时候,加载了[config],建立了队列[queues],绑定了任务列表[tasks],然后执行了主进程MainProcess
【4】 在另一终端创建客户端程序client.py,键入如下代码:
import mytasks
mytasks.add.delay(1, 3)
python client.py,此时查看celery进程执行的终端,多了两行log:
[2017-09-06 12:03:01,874: INFO/MainProcess] Received task: mytasks.add[bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07]
[2017-09-06 12:03:01,884: INFO/ForkPoolWorker-2] Task mytasks.add[bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07] succeeded in 0.007293057162314653s: 4
可以知道主进程MainProcess接收到任务(task id为bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07),从进程池中选择一个worker,即ForkPoolWorker-2来执行任务,得到结果为4,经过了约0.007s
【5】 打开redis-cli查看result backend,task id与redis key相对应,具体如下:
127.0.0.1:6379> keys *
...
6) "celery-task-meta-bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07"
127.0.0.1:6379> get celery-task-meta-bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07
"{\"children\": [], \"result\": 4, \"status\": \"SUCCESS\", \"task_id\": \"bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07\", \"traceback\": null}"
使用的细节
celery的配置
4.0版本之后,配置建议采用小写,与旧版有些变化,文档摸我
一些用到的配置:
- broker_url: 中间人配置(默认使用
RabbitMQ) - result_backend: 结果后端(默认不保存结果)
- include:
list结构,通常用于task实体方法与配置不在同一个文件的情况,与Python的import类似,且支持用parent_mod.module_name作为值来引入子模块
简单证明调用任务.delay()是不等待结果直接返回的(异步非阻塞)
之前使用到了两个文件client.py和mytasks.py,修改它们:
# client.py
import mytasks
res = mytasks.add.delay(1, 3)
print("继续执行,不会阻塞,返回值为", res)
获取了任务返回的结果,并输出了一行文字
# mytasks.py
from celery import Celery
import celeryconfig
import time
app = Celery('mytasks')
app.config_from_object(celeryconfig)
@app.task
def add(x, y):
time.sleep(10)
return x + y
在任务方法体内,先模拟一个高延时的情况,10秒后再处理返回结果
重启celery,使其重新绑定任务方法
(env) tyruschin@tyruschin-B85M-DS3H-A:~/Desktop/celerytask$ python client.py
继续执行,不会阻塞,返回值为 564d1adb-9c87-4a29-9c9c-a8a1ba116b0b
执行client.py发现文字马上输出了,且返回值是task id而不是计算结果(此时结果还没算出来)
上述步骤证明了非阻塞
[2017-09-06 16:09:33,936: INFO/MainProcess] Received task: mytasks.add[564d1adb-9c87-4a29-9c9c-a8a1ba116b0b]
[2017-09-06 16:09:43,947: INFO/ForkPoolWorker-2] Task mytasks.add[564d1adb-9c87-4a29-9c9c-a8a1ba116b0b] succeeded in 10.00852725515142s: 4
注意到,在celery进程中,过了约10.009s才返回计算结果,期间客户端可以通过获取状态来检出结果(此处略去)
上述两个步骤证明了异步
使用监控管理程序
最简单的实践是flower,它可以监控任务的执行过程、统计相关信息、控制和修改workers等(但是flower进程断开,相关数据会丢失)
# install
pip install flower
# execute
flower -A mytasks --port=5555
特别注意:必须配置result backend,方可启动flower,且flower中的数据与result backend不关联
其他的一些
1. 自定义状态
...
# 增加引入这两个模块
from celery.exceptions import Ignore
from celery import current_task
@app.task
def test():
...
# 改变当前状态
current_task.update_state(state='WRONG_CODE', meta=result)
# 使当前状态成为最终的状态
raise Ignore()
# 如果return的话,状态会转变成success
# return result
根据我的测试,celery默认只有两种最终状态,即success和failure,如果要增加最终状态,则必须更新状态之后抛出Ignore异常
update_state中的meta参数为记录到结果后端的元数据,记录的格式以配置result_serializer为准,4.0版本之后默认为json,之前为pickle
此处有一个坑:
flower不认为Ignore是一个结束状态,所以一直处于“处理中”的状态,故自定义状态不会被flower记录到,只能记录在result backend中
2. 使用MySQL作为结果后端
result_backend = 'db+mysql://{user}:{pass}@{host}:{port}/{database}
数据库需要事先建立,在第一次跑的时候,会默认生成两张表celery_taskmeta和celery_tasksetmeta,以上的案例只会涉及到第一张表
此处有两个坑:
之前提到配置result_serializer,发现即使修改为json(并且4.0默认就是),MySQL中仍然使用了pickle来序列化。配置timezone与enable_utc在MySQL的存储中也同样是失效的
正因上面的坑,有些客户端没有做MySQL结果的获取适配(如:celery-php),导致结果后端可以写入但不能读取
可能的解决方案:自定义数据表来存储相应的结果到MySQL中
3. 其他客户端
支持node, PHP等