celery的使用
项目地址
https://github.com/ShiSiLang/celery_task.git
测试环境
- 环境:python3.6.8+django2.2.1+redis3.2.1+celery4.3.0+flower0.9.3
- 系统:MacOS10.14
- 依赖文件:requirements.txt
使用方式
- settings.py同级目录下建立
celery.py
,代码如下:
from __future__ import absolute_import, unicode_literals
import os
from django.conf import settings
from celery import Celery
# 设置 Django 的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')
# 创建 celery 实例
app = Celery('app')
# 引用配置文件 settings 中的配置
app.config_from_object('django.conf:settings')
# 搜索所有 app 中的异步任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
- settings.py中celery配置选项如下:
# celery配置项
# 任务队列
BROKER_URL = 'redis://127.0.0.1:6379/7'
# 任务结果队列
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/8'
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
# 启动时区配置
CELERY_ENABLE_UTC = True
# 并发的worker数量
CELERYD_CONCURRENCY = 5
# worker每次去redis取任务的数量,默认值是4
CELERYD_PREFETCH_MULTIPLIER = 5
# 每个worker执行了多少次任务后就会死掉,建议数量大一些
CELERYD_MAX_TASKS_PER_CHILD = 200
# celery任务执行结果的超时时间
# `CELERY_TASK_RESULT_EXPIRES = 1200`
# 单个任务的运行时间限制,否则会被杀死
# `CELERYD_TASK_TIME_LIMIT = 60`
#
# 将任务结果使用'pickle'序列化成'json'格式
# 任务序列化方式
# `CELERY_TASK_SERIALIZER = 'pickle'`
# 任务执行结果序列化方式
# `CELERY_RESULT_SERIALIZER = 'json'`
# 也可以直接在Celery对象中设置序列化方式
# `app = Celery('tasks', broker='...', task_serializer='yaml')`
#
# 关闭限速
# `CELERY_DISABLE_RATE_LIMITS = True`
-
tasks.py
中书写普通的异步方式(模拟不可控任务)以及延时异步方式(为了实现任务的可控制性)
import time
from celery_test.celery import app
@app.task
def celery_add(num1, num2):
result = int(num1) + int(num2)
print('异步加法的结果是', str(result))
@app.task
def celery_mul(num1, num2):
result = int(num1) * int(num2)
print('result is:', str(result))
print('stop 30s...')
time.sleep(30)
print('异步加法的结果是', str(result))
-
views.py
文件中书写任务调用的视图函数
def c_add(request):
# 异步加法
num1 = request.GET.get('value1')
num2 = request.GET.get('value2')
print('num1 is {}, num2 is {}'.format(num1, num2))
task = celery_add.delay(num1=num1, num2=num2)
print('dir(task):', dir(task))
print('task id is:', task.task_id)
return JsonResponse({'status': 200, 'msg': '这是异步加法...', '任务id': task.task_id})
def c_mul(request):
# 异步乘法
num1 = request.GET.get('value1')
num2 = request.GET.get('value2')
print('num1 is {}, num2 is {}'.format(num1, num2))
task = celery_mul.delay(num1=num1, num2=num2)
print('task id is:', task.task_id)
# return JsonResponse({'status': 200, 'msg': '这是异步乘法...', '任务id': task.task_id})
return render(request, 'start.html', {'id': task.task_id})
HTML页面代码以及其他不太重要的代码在此处省略
开启异步任务
celery -A celery_test.celery:app worker -l info
开启flower,打开浏览器输入
127.0.0.1:5555
即可看到任务的执行情况
celery -A celery_test.celery:app flower -l info
开启服务器
python manage.py runserver
执行低耗时操作(异步加法)得到以下打印结果
# django端
num1 is 1, num2 is 2
dir(task): ['TimeoutError', '__class__', '__copy__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_args__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_cache', '_get_task_meta', '_ignored', '_iter_meta', '_maybe_reraise_parent_error', '_maybe_set_cache', '_on_fulfilled', '_parents', '_set_cache', '_to_remote_traceback', 'app', 'args', 'as_tuple', 'backend', 'build_graph', 'children', 'collect', 'date_done', 'failed', 'forget', 'get', 'get_leaf', 'graph', 'id', 'ignored', 'info', 'iterdeps', 'kwargs', 'maybe_reraise', 'maybe_throw', 'name', 'on_ready', 'parent', 'queue', 'ready', 'result', 'retries', 'revoke', 'state', 'status', 'successful', 'supports_native_join', 'task_id', 'then', 'throw', 'traceback', 'wait', 'worker']
task id is: c2b7d145-7180-47cc-9dfb-23ccf7d6fed0
[10/May/2019 14:01:51] "GET /ce_add/?value1=1&value2=2 HTTP/1.1" 200 123
# celery端
[2019-05-10 14:01:51,556: INFO/MainProcess] Received task: app01.tasks.celery_add[c2b7d145-7180-47cc-9dfb-23ccf7d6fed0]
[2019-05-10 14:01:51,560: WARNING/ForkPoolWorker-4] 异步加法的结果是
[2019-05-10 14:01:51,561: WARNING/ForkPoolWorker-4] 3
[2019-05-10 14:01:51,564: INFO/ForkPoolWorker-4] Task app01.tasks.celery_add[c2b7d145-7180-47cc-9dfb-23ccf7d6fed0] succeeded in 0.005102032999275252s: None
可以看到任务在极少时间内便完成了,且无返回值,显示None
- 执行耗时操作(异步乘法中存在time.sleep(30))得到以下结果
# django端
num1 is 23, num2 is 32
task id is: 988ca357-2093-4a52-b435-9d708ca317f1
[10/May/2019 14:04:59] "GET /ce_mul/?value1=23&value2=32 HTTP/1.1" 200 553
# [2019-05-10 14:04:59,320: INFO/MainProcess] Received task: app01.tasks.celery_mul[988ca357-2093-4a52-b435-9d708ca317f1]
[2019-05-10 14:04:59,323: WARNING/ForkPoolWorker-5] result is:
[2019-05-10 14:04:59,324: WARNING/ForkPoolWorker-5] 736
[2019-05-10 14:04:59,325: WARNING/ForkPoolWorker-5] stop 30s...
[2019-05-10 14:05:29,326: WARNING/ForkPoolWorker-5] 异步加法的结果是
[2019-05-10 14:05:29,327: WARNING/ForkPoolWorker-5] 736
[2019-05-10 14:05:29,332: INFO/ForkPoolWorker-5] Task app01.tasks.celery_mul[988ca357-2093-4a52-b435-9d708ca317f1] succeeded in 30.00901433097897s: None
这个任务在执行耗时比较多,有时可能会出现无法预料的错误导致卡死,有可能我们需要对正在执行的任务进行终止操作,此时我们就如下获取任务执行的唯一标识,通过该唯一标识杀死该任务
- 终止任务代码如下
def s_task(request):
# 终止正在执行的任务
task_id = request.POST.get('task_id')
print('接收到的task_id:', task_id)
status = app.AsyncResult(task_id).status
result = app.AsyncResult(task_id).result
print('该异步任务的状态是:{}, 结果是{}'.format(status, result))
# 终止之后不会再次执行
result = app.control.revoke(task_id, terminate=True)
# 终止之后还会继续执行
# result = app.control.revoke(task_id, terminate=False)
print('停止任务的结果是:', result)
return JsonResponse({'status': 200, 'msg': '任务已经停止...'})
继续执行上面的异步乘法操作,在等待过程中点击终止任务的按钮,调用实现终止的视图函数,得到如下打印输出:
# django端
接收到的task_id: 674f42e8-fdab-43d9-b5df-ea84b7f8015e
该异步任务的状态是:PENDING, 结果是None
停止任务的结果是: None
[10/May/2019 14:10:58] "POST /stop_task/ HTTP/1.1" 200 65
# celery端
[2019-05-10 14:10:52,188: INFO/MainProcess] Received task: app01.tasks.celery_mul[674f42e8-fdab-43d9-b5df-ea84b7f8015e]
[2019-05-10 14:10:52,193: WARNING/ForkPoolWorker-3] result is:
[2019-05-10 14:10:52,194: WARNING/ForkPoolWorker-3] 7544328889
[2019-05-10 14:10:52,194: WARNING/ForkPoolWorker-3] stop 30s...
[2019-05-10 14:10:58,705: INFO/MainProcess] Terminating 674f42e8-fdab-43d9-b5df-ea84b7f8015e (Signals.SIGTERM)
[2019-05-10 14:10:58,731: ERROR/MainProcess] Task handler raised error: Terminated(15,)
Traceback (most recent call last):
File "/xxx/workspace/for_celery/lib/python3.6/site-packages/billiard/pool.py", line 1728, in _set_terminated
raise Terminated(-(signum or 0))
billiard.exceptions.Terminated: 15
以上灵感来源于大佬:https://www.cnblogs.com/huxianglin/p/10517665.html
以下内容为拷贝内容:
celery 管理工具flower里面好像有停止celery task的功能,于是去找flower的源码,找到接口的源码如下:
logger.info("Revoking task '%s'", taskid)
terminate = self.get_argument('terminate', default=False, type=bool)
self.capp.control.revoke(taskid, terminate=terminate)
self.write(dict(message="Revoked '%s'" % taskid))
核心代码是self.capp.control.revoke
想到去celery里面找寻revoke
函数,发现有两处比较可疑,第一个是celery.worker.control.revoke
,第二个是celery.app.control.Control.revoke
,直觉来看,应该是第二个方法,但是第二个方法是在一个类里面的,要调用这个方法首先需要获取到celery app的实例,后来去celery 配置里面找,发现在init.py文件里面有__all__ = ['celery_app']
这么一句,于是找到突破点了,引用这个包就能获取到celery_app了。
from test.ceyery_proj import celery_app
celery_app.control.revoke(task_id, terminate=True)
通过这个方法就能终止正在执行的task,至于task_id在执行任务的时候返回了,我将这个id存储在数据库中,这样就可以被拿来控制task的执行了。
-
flower看到的截图如下所示
- 解决Celery进程重启后,正在进行中的任务丢失或者标记为失败(暂时未测试,结果未知)
来源:https://www.waitig.com/%E8%A7%A3%E5%86%B3celery%E8%BF%9B%E7%A8%8B%E9%87%8D%E5%90%AF%E5%90%8E%EF%BC%8C%E6%AD%A3%E5%9C%A8%E8%BF%9B%E8%A1%8C%E4%B8%AD%E7%9A%84%E4%BB%BB%E5%8A%A1%E4%B8%A2%E5%A4%B1%E6%88%96%E8%80%85%E6%A0%87.html
以下内容为拷贝内容
修改配置如下:
# 当worker进程意外退出时,task会被放回到队列中
task_reject_on_worker_lost = True
# 当worker完成了这个task时,任务才被标记为ack状态
task_acks_late = True
该配置可以保证task不丢失,中断的task在下次启动时将会重新执行。
需要说明的是,backend最好使用rabbitmq等支持ACK状态的消息中间件。