python分布式任务框架celery

目录

  • 简介
  • 架构
  • 实践
  • 腾讯云执行
  • 参考文章

简介

  • Celery是一个开源的分布式任务队列,主要用于处理大规模任务的分布式调度。它能够简化异步任务处理、定时任务调度和任务结果的处理,通常用于处理网络请求、数据处理、定时任务等等
  • celery类似java的定时任务,elastic job等框架,当然有所不同,但是地位可以简单的做一个对比,不使用java的分布式定时任务框架时因为在机器学习,异常数据检测的场景下,pyhton提供的包更合适,所以用python的轻量级框架
  • 如何需要更大量数据处理,要使用pyflink pyspark大数据框架

架构

架构.png
  • producer:生产者,专门用来生产任务(task)
  • celery beat:任务调度器,调度器进程会读取配置文件的内容,周期性地将配置文件里面到期需要执行的任务发送给消息队列,说白了就是生产定时任务
  • broker:任务队列,用于存放生产者和调度器生产的任务。一般使用消息队列或者 Redis 来存储,当然具有存储功能的数据库也是可以的。这一部分是 celery 所不提供的,需要依赖第三方。作用就是接收任务,存进队列
  • worker:任务的执行单元,会将任务从队列中顺序取出并执行
  • backend:用于在任务结束之后保存状态信息和结果,以便查询,一般是数据库,当然只要具备存储功能都可以作为 backend

实践

  • 项目目录


    项目目录.png
  • 下面代码只是一个demo项目
  • 代码init.py 空
  • 代码celery.py
from __future__ import absolute_import

from celery import Celery
#app是Celery类的实例,创建的时候添加了proj.tasks这个模块,也就是包含了proj/tasks.py这个文件
app = Celery('celery_proj', include=['celery_proj.tasks'])

#另一种配置方式,这里就大概写一下
#app = Celery('任务名',broker='xxx',backend='xxx',include=['xxx','xxx'])

#去找配置文件
app.config_from_object('celery_proj.config')
  • 代码config.py
from __future__ import absolute_import
#使用redis作为消息代理
BROKER_URL = 'redis://127.0.0.1:6379/1'
#把任务结果存放在redis2号库
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

##以下内容可以不用配置,这些都是默认配置,除非自定义需要配置,我用 -- 框住

#任务序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'json'

#读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_RESULT_SERIALIZER = 'json'

#任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_TASK_RESULT_EXPIRES = 60*60*24

#指定接受的内容类型
CELERY_ACCEPT_CONTENT = ['json']



#定时任务配置
from datetime import timedelta
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC=True

#ps:友情提示 CELERYBEAT_SCHEDULE 千万不要写错了,我之前写成 CELERY_BEAT_SCHEDULE导致定时任务执行不了,惨痛的教训!!!

CELERYBEAT_SCHEDULE = {
    #这个名字(print_test)随便写,无所谓
    'print_test': {
        'task': 'celery_proj.tasks.print_test',      #绑定的定时任务的函数
        'schedule': timedelta(seconds=30),      #设置定时时间,这里是2秒执行一次
         'args':()       #用来给函数传参

    }
}


  • 代码README.md
- 安装依赖 
pip install -r requirements.txt
切换到celery_proj上级目录去执行,并且celery要pip --updare下不然会有兼容问题


celery -A celery_proj worker  --loglevel=info --logfile="/usr/local/celery_proj/test.txt" 
celery -A celery_proj beat --loglevel=info --logfile="/usr/local/celery_proj/test1.txt"
  • requirements.txt
celery==5.1.0
redis==5.0.14
  • tasks.py
from __future__ import absolute_import

from celery_proj.celery import app
from datetime import datetime

@app.task
def print_test():
    now = datetime.now()
    current_time_str = now.strftime("%Y-%m-%d %H:%M:%S")
    print("nict try" + current_time_str)
    data_to_process = prepare_data_for_processing()
    # 使用apply_async调用另一个任务
    process_data.apply_async(args=(data_to_process + current_time_str,))
    return 'hello' + current_time_str

@app.task
def process_data(data):
    print(f'Processing data: {data}')
    # 这里添加处理数据的逻辑

def prepare_data_for_processing():
    # 这里添加生成数据的逻辑
    return "Some data to process "


腾讯云执行

  • test.txt test1.txt是单纯的日志文件
  • redis单台启动以及python环境安装自行百度
  • 切换到root后 pip install redis和celery
  • 上传到/usr/local下面,并且在/usr/local分别执行,都要用root用户
  • 运行beat定时任务
celery -A celery_proj beat --loglevel=info --logfile="/usr/local/celery_proj/test1.txt"
定时任务.png
  • 运行work任务,因为是分布式的当然work启动多个
celery -A celery_proj worker --loglevel=info --logfile="/usr/local/celery_proj/test.txt" 
work启动.png
  • work日志


    work日志.png
  • beat日志


    beat日志.png

参考文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容