目录
- 简介
- 架构
- 实践
- 腾讯云执行
- 参考文章
简介
- Celery是一个开源的分布式任务队列,主要用于处理大规模任务的分布式调度。它能够简化异步任务处理、定时任务调度和任务结果的处理,通常用于处理网络请求、数据处理、定时任务等等
- celery类似java的定时任务,elastic job等框架,当然有所不同,但是地位可以简单的做一个对比,不使用java的分布式定时任务框架时因为在机器学习,异常数据检测的场景下,pyhton提供的包更合适,所以用python的轻量级框架
- 如何需要更大量数据处理,要使用pyflink pyspark大数据框架
架构
- producer:生产者,专门用来生产任务(task)
- celery beat:任务调度器,调度器进程会读取配置文件的内容,周期性地将配置文件里面到期需要执行的任务发送给消息队列,说白了就是生产定时任务
- broker:任务队列,用于存放生产者和调度器生产的任务。一般使用消息队列或者 Redis 来存储,当然具有存储功能的数据库也是可以的。这一部分是 celery 所不提供的,需要依赖第三方。作用就是接收任务,存进队列
- worker:任务的执行单元,会将任务从队列中顺序取出并执行
- backend:用于在任务结束之后保存状态信息和结果,以便查询,一般是数据库,当然只要具备存储功能都可以作为 backend
实践
-
项目目录
- 下面代码只是一个demo项目
- 代码init.py 空
- 代码celery.py
from __future__ import absolute_import
from celery import Celery
#app是Celery类的实例,创建的时候添加了proj.tasks这个模块,也就是包含了proj/tasks.py这个文件
app = Celery('celery_proj', include=['celery_proj.tasks'])
#另一种配置方式,这里就大概写一下
#app = Celery('任务名',broker='xxx',backend='xxx',include=['xxx','xxx'])
#去找配置文件
app.config_from_object('celery_proj.config')
- 代码config.py
from __future__ import absolute_import
#使用redis作为消息代理
BROKER_URL = 'redis://127.0.0.1:6379/1'
#把任务结果存放在redis2号库
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
##以下内容可以不用配置,这些都是默认配置,除非自定义需要配置,我用 -- 框住
#任务序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'json'
#读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_RESULT_SERIALIZER = 'json'
#任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_TASK_RESULT_EXPIRES = 60*60*24
#指定接受的内容类型
CELERY_ACCEPT_CONTENT = ['json']
#定时任务配置
from datetime import timedelta
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC=True
#ps:友情提示 CELERYBEAT_SCHEDULE 千万不要写错了,我之前写成 CELERY_BEAT_SCHEDULE导致定时任务执行不了,惨痛的教训!!!
CELERYBEAT_SCHEDULE = {
#这个名字(print_test)随便写,无所谓
'print_test': {
'task': 'celery_proj.tasks.print_test', #绑定的定时任务的函数
'schedule': timedelta(seconds=30), #设置定时时间,这里是2秒执行一次
'args':() #用来给函数传参
}
}
- 代码README.md
- 安装依赖
pip install -r requirements.txt
切换到celery_proj上级目录去执行,并且celery要pip --updare下不然会有兼容问题
celery -A celery_proj worker --loglevel=info --logfile="/usr/local/celery_proj/test.txt"
celery -A celery_proj beat --loglevel=info --logfile="/usr/local/celery_proj/test1.txt"
- requirements.txt
celery==5.1.0
redis==5.0.14
- tasks.py
from __future__ import absolute_import
from celery_proj.celery import app
from datetime import datetime
@app.task
def print_test():
now = datetime.now()
current_time_str = now.strftime("%Y-%m-%d %H:%M:%S")
print("nict try" + current_time_str)
data_to_process = prepare_data_for_processing()
# 使用apply_async调用另一个任务
process_data.apply_async(args=(data_to_process + current_time_str,))
return 'hello' + current_time_str
@app.task
def process_data(data):
print(f'Processing data: {data}')
# 这里添加处理数据的逻辑
def prepare_data_for_processing():
# 这里添加生成数据的逻辑
return "Some data to process "
腾讯云执行
- test.txt test1.txt是单纯的日志文件
- redis单台启动以及python环境安装自行百度
- 切换到root后 pip install redis和celery
- 上传到/usr/local下面,并且在/usr/local分别执行,都要用root用户
- 运行beat定时任务
celery -A celery_proj beat --loglevel=info --logfile="/usr/local/celery_proj/test1.txt"
- 运行work任务,因为是分布式的当然work启动多个
celery -A celery_proj worker --loglevel=info --logfile="/usr/local/celery_proj/test.txt"
-
work日志
-
beat日志