Tips:在学习Celery过程中,使用的系统为Windows 10、Celery版本为3.1.18①、中间人使用RabbitMQ。
什么是任务队列
任务队列是一种在线程或者机器之间分发任务的机制。
消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
Celery使用消息通信,通信一般使用中间人(Broker)在客户端和职程之间斡旋。这个过程从客户端想队列中添加消息开始,之后中间人将消息派送给职程。
Celery是Python编写的,但协议可以使用任何语言实现。
需要什么
Celery需要一个发送和接受消息的传述者。RabbbitMQ和Redis中间人的消息支持所有的特性,我们主要是使用RabbitMQ作中间人(关于中间人RabbitMQ的安装可以网上搜索,有很多详细的教程)。
Celery优势
在程序运行过程中,我们经常会遇到一些耗时耗资源的操作,为了避免阻塞主程序,我们会采用异步或者多线程来处理任务。比如在主程序中调用一个函数,并从该函数中获取函数返回值。如果这个函数不能很快执行完成并返回,那么主程序就会阻塞,知直到函数返回。
Celery是一个强大的分布式任务队列,它可以让人物的执行完全脱离主程序,甚至可以被分配到其他的主机上运行。
Celery架构:
从图上可以看出Celery包含几个模块:
- 任务模块
主要包异步任务和定时任务,异步任务通常在业务逻辑中被触发并发送到任务队列中,而定时任务是由Celery Beat进程周期性的将任务发往任务队列。 - 消息中间件Broker
Broker就是任务调度队列,接受任务生产者发送来的消息,将任务存入队列,之所以需要中间人的原因是Celrey本身是不提供消息队列的服务,所以需要第三方组件实现。 - 任务执行单元Worker
Worker是执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。 - 任务存储Backend
Backend用于存储任务只想的结果,存储可以使用RabbitMQ或者Redis或者数据库等。
安装RabbitMQ
参考这篇文章
https://www.jianshu.com/p/a2c9a4242508
安装Celery
1、因为celery4.X以上版本不支持window,容易出错,所以这里直接安装celery3.1.18
pip install celery==3.1.18
安装成功后可以查看celery版本
celery --version
安装eventlet,后续报错问题解决可能需要安装eventlet
pip3 install eventlet
新建一个tasks.py示例
# coding:utf-8
from celery import Celery
app = Celery(
'tasks',# 是当前模块的名称,这个参数是必须的,这样的话名称可以自动生成
broker='amqp://guest@localhost//'#中间人的地址
)
@app.task ## 使用celery标识一个任务,多个任务都需要使用该装饰器
def add(x, y):
return x + y
启动Celery职程服务器(Worker)
#没有加上eventlet参数,有可能会在后续运行报错
celery -A tasks worker --loglevel=info
#如果报错,添加上eventlet参数重试
celery -A tasks worker -l info -P eventlet
参数-A指定了Celery实例的位置,这个实例是在tasks.py文件中,Celery会自动在该文件中查找Celery对象实例。
--loglevel指定日志的级别,默认是warning。
如果启动正常,就会看到下面的输出。
我们先从tasks.py文件中导入add任务对象,然后使用delay()方法将任务发送到消息中间件,我们之前开启的那个Worker会一直监控任务队列,知道有任务到来,就会执行。
新建一个manage.py运行python manage.py
from tasks import add
add.delay(2, 8)
或者
我们到Worker中可以看到多了几条日志信息:
说明我们的任务被调度并执行成功了。
获得结果
必须在这里加上backend参数方可运行成功
app = Celery('tasks',
broker='amqp://guest@localhost//',
backend="redis://10.0.0.3:6379/1" # 结果数据存放地址
)
刚我们在命令行中调用任务,很明显任务执行完成,但是我们并不知道任务执行后得到的结果是什么。如果我们想获得执行后的结果可以这样:
>>> result = add.delay(2, 8)
>>> result.ready() # 查看任务执行的状态,此刻任务没有执行完成,显示False
False
>>> result.ready()
True # 表示任务已经执行完成
>>> result.get() # 获取任务的执行结果
10
>>>