Celery(https://www.celerycn.io/v/4.4.0/ru-men/celery-jin-jie-shi-yong):
一:特点:
高可用:如果出现丢失连接或连接失败,职程(Worker)和客户端会自动重试,并且中间人通过 主/主 主/从 的方式来进行提高可用性
快速: 单个 Celery 进行每分钟可以处理数以百万的任务,而且延迟仅为亚毫秒(使用 RabbitMQ、 librabbitmq 在优化过后)
灵活:Celery 的每个部分几乎都可以自定义扩展和单独使用,例如自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间人(Broker)等
功能:
监控:可以针对整个流程进行监控,内置的工具或可以实时说明当前集群的概况
调度:可以通过调度功能在一段时间内指定任务的执行时间 datetime,也可以根据简单每隔一段时间进行执行重复的任务,支持分钟、小时、星期几,也支持某一天或某一年的Crontab表达式
工作流:可以通过“canvas“进行组成工作流,其中包含分组、链接、分块等等。简单和复杂的工作流程可以使用一组“canvas“组成,其中包含分组、链接、分块等
资源(内存)泄漏保护: --max-tasks-per-child 参数适用于可能会出现资源泄漏(例如:内存泄漏)的任务
时间和速率的限制:您可以控制每秒/分钟/小时执行任务的次数,或者任务执行的最长时间,也将这些设置为默认值,针对特定的任务或程序进行定制化配置
自定义组件:开发者可以定制化每一个职程(Worker)以及额外的组件。职程(Worker)是用 “bootsteps” 构建的-一个依赖关系图,可以对职程(Worker)的内部进行细粒度控制
二: 用法
2.1创建一个文件夹proj, 在proj目录下创建3个py文件__init__.py, celery.py, tasks.py
在celery.py代码:
from __future__ import absolute_import, unicode_literals
from celery import Celery
# app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks'])
app = Celery('proj', backend='redis://localhost', broker='redis://localhost:6379', include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
采用的是redis作为中间件(broker)sudo apt-get install redis-server sudo service redis-server restart
task.py:
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
运行:celery -A tasks proj --loglevel=info
在proj目录外面新建一个脚本proj_test.py:
from proj.tasks import *
from celery import group
from celery import chain, chord
# delay() 实际上为 apply_async() 的快捷使用, apply_async() 可以指定调用时执行的参数,例如运行的时间,使用的任务队列等
# res 参数:res.state, res.successful(), res.failed()
res = add.delay(2, 2) # add.apply_async((2, 2), queue='lopri', countdown=10)
# 任务执行引发异常,可以进行检查异常以及溯源,默认情况下 result.get() 会抛出异常,
# 如果不希望 Celery 抛出异常,可以通过设置 propagate 来进行禁用
result_add = res.get(timeout=1) # 或者res.get(propagate=False)
# r_add2 = res.get(propagate=False)
print(res.failed())
print(result_add)
# print(add(2, 2))
print(res.id) # 每一个任务都有一个id, 获取任务的ID
# 一个任务只能有当前只能有一个状态,但他的执行过程可以为多个状态,一个典型的阶段是:
# PENDING -> STARTED -> SUCCESS
# 重试任务比较复杂,为了证明,一个任务会重试两次,任务的阶段为:
# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
from proj.celery import app
res2 = app.AsyncResult('this-id-does-not-exist')
print(res2.state)
2.2:# Canvas:设计工作流程
s1 = add.signature((2, 2), countdown=1) # add.s(2, 2)
res3 = s1.delay()
print(res3.get())
s2 = add.s(2)
res4 = s2.delay(8)
print(res4.get())
# 组:Groups
# 一个 group 并行调用任务列表,返回一个特殊的结果实例,可以将结果作为一个列表进行查看,并且通过索引进去获取返回值。
g1 = group(add.s(i, i) for i in range(10))().get()
g2 = group(add.s(i) for i in range(10))
res5 = g2(10).get()
print(res5)
2.3# 链:Chains可以将任务链接在一起,在一个人返回后进行调用另外一个任务
c1 = chain(add.s(4, 4) | mul.s(8))().get()
print(c1)
2.4:# 和弦:Chords 和弦是一个带有回调的组:
c2 = chord((add.s(i, i) for i in range(10)), xsum.s())().get()
print(c2)
c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
# c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
print(c3)
三:在django中使用celery
环境: python3.7 ,django3.0.6
安装: pip install django-celery
在settings.py中配置:
import djcelery
djcelery.setup_loader()
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = ('app.tasks', )
BROKER_URL = 'redis://127.0.0.1:6379/8'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
INSTALLED_APPS = [
...
'djcelery',
]
当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。
BROKER_URL ='redis://127.0.0.1:6379/6'
broker是代理人,它负责分发任务给worker去执行。我使用的是Redis作为broker,当然你也可以用其它的broker,比如官方就比较推荐使用RabbitMQ.
有的博客中提到要配置关键字:CELERY_RESULT_BACKEND,例如:
CELERY_RESULT_BACKEND='amqp://guest@localhost//'#可以不用写
我没有配置这个关键字。因为如果没有配置,此时Django会使用默认的数据库(也是你指定的orm数据库),作为它的结果作为它的backend。因此你也可以不用写,使用Django默认设置的数据库就很好。
CELERY_IMPORTS = ('app.tasks', )
CELERY_TIMEZONE = TIME_ZONE
CELERYBEAT_SCHEDULER ='djcelery.schedulers.DatabaseScheduler'
上面第一句是导入目标任务文件,第二句是设置时区,第三句表示使用了django-celery默认的数据库调度模型,任务执行周期都被存在默认指定的orm数据库中.
更深入的Celery配置:(http://www.cnblogs.com/ajianbeyourself/p/4950758.html)
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
#定时任务一: 每24小时周期执行任务(del_redis_data)
u'删除过期的redis数据': {
"task":"app.tasks.del_redis_data","schedule": crontab(hour='*/24'),"args": (), },
上面是设置定时的时间配置,关于crontab的具体用法,celery的官方文档讲解的十分详尽(表格):
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
我选的三个任务,是我特意挑选的,非常有代表性。第一个是周期任务,它会每隔一个固定时间周期去执行一次相应的task,比如隔1分钟,隔1小时等; 第二个和第三个都是定时任务,定时在每个时间点,比如每天的6点,或者定时在每个月的1号。
周期任务和定时任务有小小的差别,这也是crontab的强大之处,它同时支持这两种。
同步数据库:
python manage.py makemigrations
python manage.py migrate
在app路径下创建task.py脚本, 内容如下:
from __future__ import absolute_import
from celery import task
from celery import shared_task
from data_draw.script.database_operation import *
from django.conf import settings
import os
@task()
def select_evaluation(data_name_dict):
oss_data_name_list = select_dir(settings.EVALUATION_DIR, 0)
# print(f"oss_data_name_list:{oss_data_name_list}")
print("*"*200)
print("异步执行 select_evaluation 方法")
for data_name in oss_data_name_list:
oss_data_path = os.path.join(settings.EVALUATION_DIR, data_name)
eval_oss_data_path = os.path.join(settings.EVALUATION_DIR, data_name, 'evaluation/')
item = {}
mp4_name = select_video(oss_data_path)
if mp4_name:
mp4 = True
video_name = mp4_name[0]
else:
mp4 = False
video_name = ''
file_name_list = select_dir(eval_oss_data_path, 1)
for key, value in settings.MEED_EVALUATION_FILE.items():
if value in file_name_list:
item[key] = True
else:
item[key] = False
try:
value = data_name_dict[data_name]
update_data_result(data_name, mp4, video_name, item)
except:
add_data_result(data_name, mp4, video_name, item)
在view.py脚本引用:
from data_draw.task import select_evaluation
select_evaluation.delay(data_name_dict)
运行:
python manage.py runserver 0.0.0.0:8001#启动django的应用,可以动态的使用django-admin来管理任务
python manage.py celery beat #应该是用来监控任务变化的
python manage.py celery worker -c 6 -l debug #任务执行进程,worker进程
运行时报的第一个错误, async导入错误:
from . import async, base
^
SyntaxError: invalid syntax
这是因为在 python 3.7 中将 async 作为了关键字,所以当 py 文件中出现类似 from . import async, base 这类不符合python语法的语句时,Python会报错
解决:
在 celery 官方的提议下,建议将 async.py 文件的文件名改成 asynchronous。
所以我们只需要将 celery\backends\async.py 改成 celery\backends\asynchronous.py,并且把 celery下代码中的所有 async 改成 asynchronous 就可以了
运行时报的第二个错误:
File "/home/python/.virtualenvs/django_class/lib/python3.5/site-packages/redis/_compat.py", line 123, in iteritems
return iter(x.items())
AttributeError: 'str' object has no attribute 'items'
这是因为以前版本的redis太高(3.0.1),所以重新加载了redis
解决:
pip install redis==2.10.6
参考文档:
https://www.celerycn.io/v/4.4.0/ru-men/celery-chu-ci-shi-yong
https://www.jianshu.com/p/e97ca5315c90