0.安装CELERY
pip install celery
1.文件结构
--- demo文件夹
--- current_app
--- __init__.py
--- task1.py
--- task2.py
--- celeryconfig.py
--- client.py
- Borker
消息代理,作为临时存储任务的中间媒介,为Celery提供队列服务。生产者将任务发送到Broker,消息者再从Broker获取任务。 - Beat
任务调度器,负责调度并触发Celery定时周期任务。Beat进程读取celeryconfig.py中定义的定时周期任务列表,将到期需要执行的定时任务发送到任务队列中。 - Worker
任务执行单元,实际负责执行任务的服务进程,每一个Worker都有一个并发池(Prefork、Eventlet、Gevent、Thread)来支持多并发。Worker会监听订阅的任务队列,当队列中有任务时,就会获取任务并执行。 - Result Backend/Store
任务执行状态和结果存储,Celery支持任务实时处理,也就是说Celery可以把任务执行的实时状态和最终结果回传生产者。这种回传也需要通过中间存储媒介。
2.创建Celery对象
broker = 'redis://127.0.0.1:6379' # 用redis做broker,中间件
backend = 'redis://127.0.0.1:6379/0' # 用redis做broken,用来保存结果
app = Celery('tasks',broker=broker,backend=backend)
但一般不会这么写,会把相关配置写到一个配置文件中,然后加载出来。
celeryconfig.py
BROKER_URL = 'redis://127.0.0.1:6379' # 指定broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定backend
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = ( # 指定需要导入的任务模块
'celery_app.task1', # task1和task2是模块名称
'celery_app.task2',
)
__init__.py
from celery import Celery
app = Celery('celery_app')
app.config_from_object('celery_app.celeryconfig') # 使用config_from_object函数加载一个配置文件
3.具体任务
app是Celery对象,app.task的作用是创建一个任务
task1.py
import time
from celery_app import app
@(CELERY)app.task
def add(x,y):
time.sleep(2)
return x + y
task2.py
import time
from celery_app import app
@app.task
def multiply(x,y):
time.sleep(2)
return x * y
4.运行Celery职程服务器
切换到程序的主目录下,即当前程序的demo目录下,然后执行以下命令:
celery -A celery_app worker --loglevel=info -P eventlet
在win10 + celery4.x 上,如果不加上 eventlet 会报错,
ERROR/MainProcess] Task handl or(‘not enough values to unpack (expected 3, got 0)’,)
所以需要先安装
pip install eventlet
<b>纠正</b>: 结合第一部分看,该处是使用了并发池,也可以换成另外三个,例如
celery -A celery_app worker -l info -P gevent
选项
-A/--app
,指定Celery项目,如果参数是一个Package,那么celery cmd会自动搜索celery.py模块,也可以使用app所在的module_name作为参数。celery cmd最终要加载的实际是app对象。
5.启动程序
client.py
from celery_app import task1
from celery_app import task2
print('start')
res1 = task1.add.apply_async(args=[2,8])
res2 = task2.multiply.apply_async(args=[3,7])
print(res1.get()) # get函数可以获取返回值
print(res2.get())
print('End')
输出结果:
start
10
21
End
6.参考
http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#celerytut-troubleshooting
https://blog.csdn.net/chenqiuge1984/article/details/80127446
https://blog.csdn.net/Jmilk/article/details/79052669