1. 介绍
Celery是一个强大的分布式任务队列,他可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常用来实现异步任务和定时任务。异步任务比如发送邮件,文件上传图像处理等;定时任务就是需要在特定时间执行的任务。
- 任务队列
任务队列是一种跨线程,跨机器工作的一种机制,任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获取新的任务并处理。 - 任务模块
包含异步任务和定时任务。异步任务通常在业务逻辑中被触发并发往任务队列;定时任务由Celery Beat进程周期性地将任务发往任务队列。 - 消息中间件Broker
Broker,就是任务调度队列,接收任务生产者发来的消息(任务),将任务存入到队列。Celery本身不提供队列服务,官方推荐使用RabbitMQ和Redis等。 - 任务执行单元Worker
Worker是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。 - 任务结果存储Backend
Backend用于存储任务的执行结果,以供查询。同消息中间件一样,也可使用RabbitMQ和Redis,MySql等。
2. 使用步骤
2.1 使用Celery实现异步任务
a. 创建Celery实例
b. 启动Celery Worker,通过delay()或者apply_async()将任务发布到broker
c. 应用程序调用异步任务
d. 存储结果
Celery Beat: 任务调度器,Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
2.2 使用Celery定时任务
a. 创建Celery实例
b. 配置文件中配置任务,发送任务celery -A xxx beat
c. 启动Celery Worker celery -A xxx worker -l info -P eventlet
d. 存储结果
3. 代码实现
3.1 test1.py
from .. import app
import time
def test11():
time.sleep(1)
print('test11')
def test22():
time.sleep(2)
print('test22')
test11()
@app.task
def test1_run():
test11()
test22()
3.2 test2.py
from .. import app
import time
def test33():
time.sleep(3)
print('test33')
def test44():
time.sleep(4)
print('test44')
test33()
@app.task
def test2_run():
test33()
test44()
3.3 celery_task.__init__.py
# 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句
# 该代码中,名字是不一样的,最好也要不一样
from __future__ import absolute_import
from celery import Celery
app = Celery('tasks')
app.config_from_object('celery_task.celeryconfig')
3.4 celeryconfig.py
from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta
# 使用redis存储任务队列
broker_url = 'redis://127.0.0.1:6379/7'
# 使用redis存储结果
result_backend = 'redis://127.0.0.1:6379/8'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
# 时区设置
timezone = 'Asia/Shanghai'
# celery默认开启自己的日志
# False表示不关闭
worker_hijack_root_logger = False
# 存储结果过期时间,过期后自动删除
# 单位为秒
result_expires = 60 * 60 * 24
# 导入任务所在文件
imports = [
'celery_task.app_scripts.test1',
'celery_task.app_scripts.test2'
]
# 需要执行任务的配置
beat_schedule = {
'test1': {
# 具体需要执行的函数
# 该函数必须要使用@app.task装饰
'task': 'celery_task.app_scripts.test1.test1_run',
# 定时时间
# 每分钟执行一次,不能为小数
'schedule': crontab(minute='*/1'),
# 或者这么写,每小时执行一次
# "schedule": crontab(minute=0, hour="*/1")
# 执行的函数需要的参数
'args': ()
},
'test2': {
'task': 'celery_task.app_scripts.test2.test2_run',
# 设置定时的时间,10秒一次
'schedule': timedelta(seconds=10),
'args': ()
}
}
如果大写的话,需要写成:
CELERYBEAT_SCHEDULE = {
'celery_app.task.task1': {
'task': 'celery_app.task.task1',
'schedule': timedelta(seconds=20),
'args': (1, 10)
},
'celery_app.task.task2': {
'task': 'celery_app.task.task2',
'schedule': crontab(minute='*/2'),
'args': ()
}
}
4. 执行定时任务
4.1 发布任务
在celery_task同级目录下,执行命令:
celery -A celery_task beat
4.2 执行任务
在celery_task同级目录下,执行命令:
celery -A celery_task worker -l info -P eventlet
可以看到输出:
[2018-09-07 16:54:57,809: WARNING/MainProcess] test33
[2018-09-07 16:55:00,002: INFO/MainProcess] Received task: celery_task.app_scrip
ts.test1.test1_run[0134cb52-29a3-4f57-890e-9730feac19e7]
[2018-09-07 16:55:01,069: WARNING/MainProcess] test11
[2018-09-07 16:55:01,821: WARNING/MainProcess] test44
[2018-09-07 16:55:03,083: WARNING/MainProcess] test22
[2018-09-07 16:55:04,234: WARNING/MainProcess] test11
如果同时在<b>两个虚拟环境(服务器)</b>中都执行定时任务,都可以看到有以上LOG打印。
4.3 celery相关命令
发布任务
celery -A celery_task beat
执行任务
celery -A celery_task worker -l info -P eventlet
将以上两条合并
celery -B -A celery_task worker
后台启动celery worker进程
celery multi start work_1 -A appcelery
停止worker进程,如果无法停止,加上-A
celery multi stop WORKNAME
重启worker进程
celery multi restart WORKNAME
查看进程数
celery status -A celery_task
4.4 定时方式
from celery.schedules import crontab
from datetime import timedelta
# 1 每10秒钟执行一次
'schedule':timedelta(seconds=30)
# 2 每分钟执行一次
'schedule':crontab(minute='*/1')
5.参考
https://blog.csdn.net/Shyllin/article/details/80940643
https://blog.csdn.net/zhangfh1990/article/details/77164499