python分布式任务框架celery

目录

  • 简介
  • 架构
  • 实践
  • 腾讯云执行
  • 参考文章

简介

  • Celery是一个开源的分布式任务队列,主要用于处理大规模任务的分布式调度。它能够简化异步任务处理、定时任务调度和任务结果的处理,通常用于处理网络请求、数据处理、定时任务等等
  • celery类似java的定时任务,elastic job等框架,当然有所不同,但是地位可以简单的做一个对比,不使用java的分布式定时任务框架时因为在机器学习,异常数据检测的场景下,pyhton提供的包更合适,所以用python的轻量级框架
  • 如何需要更大量数据处理,要使用pyflink pyspark大数据框架

架构

架构.png
  • producer:生产者,专门用来生产任务(task)
  • celery beat:任务调度器,调度器进程会读取配置文件的内容,周期性地将配置文件里面到期需要执行的任务发送给消息队列,说白了就是生产定时任务
  • broker:任务队列,用于存放生产者和调度器生产的任务。一般使用消息队列或者 Redis 来存储,当然具有存储功能的数据库也是可以的。这一部分是 celery 所不提供的,需要依赖第三方。作用就是接收任务,存进队列
  • worker:任务的执行单元,会将任务从队列中顺序取出并执行
  • backend:用于在任务结束之后保存状态信息和结果,以便查询,一般是数据库,当然只要具备存储功能都可以作为 backend

实践

  • 项目目录


    项目目录.png
  • 下面代码只是一个demo项目
  • 代码init.py 空
  • 代码celery.py
from __future__ import absolute_import

from celery import Celery
#app是Celery类的实例,创建的时候添加了proj.tasks这个模块,也就是包含了proj/tasks.py这个文件
app = Celery('celery_proj', include=['celery_proj.tasks'])

#另一种配置方式,这里就大概写一下
#app = Celery('任务名',broker='xxx',backend='xxx',include=['xxx','xxx'])

#去找配置文件
app.config_from_object('celery_proj.config')
  • 代码config.py
from __future__ import absolute_import
#使用redis作为消息代理
BROKER_URL = 'redis://127.0.0.1:6379/1'
#把任务结果存放在redis2号库
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

##以下内容可以不用配置,这些都是默认配置,除非自定义需要配置,我用 -- 框住

#任务序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'json'

#读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_RESULT_SERIALIZER = 'json'

#任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_TASK_RESULT_EXPIRES = 60*60*24

#指定接受的内容类型
CELERY_ACCEPT_CONTENT = ['json']



#定时任务配置
from datetime import timedelta
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC=True

#ps:友情提示 CELERYBEAT_SCHEDULE 千万不要写错了,我之前写成 CELERY_BEAT_SCHEDULE导致定时任务执行不了,惨痛的教训!!!

CELERYBEAT_SCHEDULE = {
    #这个名字(print_test)随便写,无所谓
    'print_test': {
        'task': 'celery_proj.tasks.print_test',      #绑定的定时任务的函数
        'schedule': timedelta(seconds=30),      #设置定时时间,这里是2秒执行一次
         'args':()       #用来给函数传参

    }
}


  • 代码README.md
- 安装依赖 
pip install -r requirements.txt
切换到celery_proj上级目录去执行,并且celery要pip --updare下不然会有兼容问题


celery -A celery_proj worker  --loglevel=info --logfile="/usr/local/celery_proj/test.txt" 
celery -A celery_proj beat --loglevel=info --logfile="/usr/local/celery_proj/test1.txt"
  • requirements.txt
celery==5.1.0
redis==5.0.14
  • tasks.py
from __future__ import absolute_import

from celery_proj.celery import app
from datetime import datetime

@app.task
def print_test():
    now = datetime.now()
    current_time_str = now.strftime("%Y-%m-%d %H:%M:%S")
    print("nict try" + current_time_str)
    data_to_process = prepare_data_for_processing()
    # 使用apply_async调用另一个任务
    process_data.apply_async(args=(data_to_process + current_time_str,))
    return 'hello' + current_time_str

@app.task
def process_data(data):
    print(f'Processing data: {data}')
    # 这里添加处理数据的逻辑

def prepare_data_for_processing():
    # 这里添加生成数据的逻辑
    return "Some data to process "


腾讯云执行

  • test.txt test1.txt是单纯的日志文件
  • redis单台启动以及python环境安装自行百度
  • 切换到root后 pip install redis和celery
  • 上传到/usr/local下面,并且在/usr/local分别执行,都要用root用户
  • 运行beat定时任务
celery -A celery_proj beat --loglevel=info --logfile="/usr/local/celery_proj/test1.txt"
定时任务.png
  • 运行work任务,因为是分布式的当然work启动多个
celery -A celery_proj worker --loglevel=info --logfile="/usr/local/celery_proj/test.txt" 
work启动.png
  • work日志


    work日志.png
  • beat日志


    beat日志.png

参考文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容