celery:一个分布式任务队列

Celery介绍

分布式任务队列是一种跨线程或机器调度work的机制。一个任务队列的输入是一个叫task的work单元。专用worker进程不断监控任务队列以执行新work。

Celery就是一个分布式任务队列(Distributed Task Queue)。Celery通过收和发消息来通信,在clients和workers间使用一个broker代理来调解。RabbitMQ和Redis作为代理broker支持celery的该需求特性,但是也有一些其他实用性解决方案,比如用于本地开发的SQLite。初始化一个task时,client向队列新增一条消息,broker将消息传递给worker。

一个Celery系统可以由多个workers和brokers组成,以提供高可用和水平扩展。

Celery可以运行在单个虚机或多个虚机上,甚至是跨数据中心上。

Celery是Python语言开发,但协议可以使用任何语言实现。除了Python外,还有Node.js的node-celery和node-celery-ts,以及一个 PHP 客户端。还可以通过暴露HTTP endpoint端点并具有请求的任务(webhooks)来实现语言互操作性。

Celery开源的,在BSD license下获得许可。

特点

1) simple

Celery易使用和易维护,不需要配置文件。 有友好和活跃的社区提供支持。

简单使用案例

2) Highly Available高可用性

在有连接丢失或失败场景下,Workers和clients之间有重试机制,一些broker支持高可用方式:主主和主备副本模式。

3) Fast快速(低时延)

一个Celery单进程每分钟可以处理百万个tasks,仅有亚毫秒的传输时延(使用RabbitMQ和librabbitmq并优化配置)。

4) Flexible弹性

Celery几乎每部分都可以扩展或使用,自定义池实现、序列化程序、压缩方案、日志记录、调度程序、消费者、生产者、代理传输等等。

Celery特性

1) Monitoring监控

监控事件流由worker发出,并由内置和外部工具实时告知集群正在做什么。

命令监控方式web实时监控Flower

在celery集群中如何监控broker(RabbitMQRedis)?

status: 查询该集群中活跃的node状态

$ celery -A proj status

result: 查询某个task的执行结果

$ celery -A proj result  -t  tasks.add  4e196aa4-0141-4601-8138-7aa33db0f577

注意:只有不使用自定义后端,就可以省略task的名称。

purge: 从配置的task队列中清除消息

该命令将移除CELERY_QUEUES配置中所有队列内部的消息。警告:此操作不可以反悔,所有消息将被删除。

$ celery -A proj purge

使用参数-Q option来指定队列:

$ celery -A proj purge -Q celery,foo,bar

使用-X option参数排除某个队列:

$ celery -A proj purge -X celery

inspect active: 查询活跃的tasks

$ celery -A proj inspect active

inspect scheduled: 查询被调度到worker上的ETA或倒计时参数的 tasks

$ celery -A proj inspect scheduled

inspect reserved: 查询已调度到worker,正要准备执行的tasks(不包括eta类型的tasks)

$ celery -A proj inspect reserved

inspect stats: 查询worker信息统计

$ celery -A proj inspect stats

migrate: 迁移替换broker

$ celery -A proj migrate redis://localhost  amqp://localhost

2) Scheduling调度

支持以秒或日期形式来指定一个task运行的时间,支持以简单的时间间隔或者以分,小时,每周某天,每月某天,每年某月来执行来周期性的task。

celery beat是一个调度器,定期启动任务,由集群中可用的worker来执行。

周期性task调度默认使用UTC时区,可以通过timezone来设置时区。

from celery import Celery

app = Celery()

app.conf.timezone = 'Europe/London'

启动一个celery beat服务:

$ celery -A proj beat

Beat需要在一个叫celerybeat-schedule的本地文件中存储task最后一次运行的时间

$ celery -A proj beat -s  /home/celery/var/run/celerybeat-schedule

默认情况下,entries从beat_schedule设置中获取,但也可以使用自定义存储比如在sql数据库中的entries。

必须保证一次只能运行一个调度器,否则会有重复的任务。使用集中式的方法意味着调度不需要同步,服务运行也不用使用锁。

3) Work-flows工作流

简单和复杂的work-flows由canvas一系列强大原语组成,包括分组,连接和分块。

通常会使用task延迟方法调用任务,但有时会将任务调用的签名signature传递给另一个进程或作为另一个函数的参数。signature()以某种方式包装单个任务调用的参数、关键字参数和执行选项,以便它可以传递给函数,甚至可以序列化并通过网络发送。

from celery import signature

>>> signature('tasks.add', args=(2, 2), countdown=10)

tasks.add(2, 2)

4) Resource Leak Protection资源泄露保护

--max-tasks-per-child选项用于可能泄漏资源(如内存或文件描述符)的用户任务,这些资源完全不受控制。

--max-tasks-per-child或worker_max_tasks_per_child:一个worker可以执行的task的最大个数,这对于无法控制内存泄露的闭源C扩展程序来说很有用。

--max-memory-per-child 或worker_max_memory_per_child:一个worker的最大常驻内存,这对于无法控制内存泄露的闭源C扩展程序来说很有用。

--autoscale:指定线程池的最大最小数,在该范围内根据负载来动态调整线程个数。

--autoscale=AUTOSCALE

always keep 3 processes, but grow to 10 if necessary:--autoscale=10,3 

5) Time & Rate Limits时间和速率限制

支持指定每时每分每秒可以运行tasks的数量,支持指定一个task可以运行多长时间,当然这些根据task类型来设定task的运行时间以及运行时长。

一个task有可能永久运行,另外的任务将无法执行,最好的方法是开启时间限制防止该场景发生。–time-limit指定一个task可能运行的最大时长(秒),在被杀掉或替换新进程之前。你可以开启–soft-time-limit,在–hard-time-limit强制杀掉task之前捕获异常并进行清理。

from myapp import app

from celery.exceptions import SoftTimeLimitExceeded

@app.task

def mytask():

    try:

        do_work()

    except SoftTimeLimitExceeded:

        clean_up_in_a_hurry()

注意:时间限制不适用于在不支持SIGUSR1信号的平台上工作。

>>> app.control.time_limit( 'tasks.crawl_the_web', soft=60, hard=120,  reply=True)

[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

指定每分钟最多运行的task个数,也可以指定task的destination。

>>> app.control.rate_limit('myapp.mytask', '200/m')

>>> app.control.rate_limit('myapp.mytask', '200/m',destination=['celery@worker1.example.com'])

6) User Components用户自定义组件

每个worker组件都可以定制化,附加组件都可以用户自定义。worker以bootsteps构建,一个依赖图可以对worker内部进行细粒度控制的。

Celery版本历史

当前稳定版本Celery version 5.1 可以运行在Python ❨3.6, 3.7, 3.8❩和PyPy3.6 ❨7.3❩环境

Celery 4.x是支持Python 2.7的最后一个版本 Celery 5.x和Celery 5.1.x需要在Python 3.6或更新版本。

如果运行更老版本Python,需要celery相应d的版本。

Python 2.7或3.5: Celery 4.4系列或更早。

Python 2.6: Celery 3.1系列或更早。

Python 2.5: Celery 3.0系列或更早。

Python 2.4 : Celery 2.2系列或更早。

Celery是一个资金资助最少的项目,所以不支持Microsoft Windows平台,不要提此平台的问题。

celery broker and backends

celery支持和多个不同backends和brokers进行通信和存储,backends后端用于存储结果,brokers代理用于消息传输。

broker对比

实验性的brokers只是功能性的,并没有维护者。缺少监控意味着传输没有实现事件,就不支持Flower, celery events, celerymon 等其他基于事件监控的工具。

远程控制意味着支持celery在运行时能inspect和管理workers,使用celery inspect或celery控制命令或者其他远程控制API的工具。

Redis既可以做backend也可以做broker。作为broker,对于小的信息量传输很快,但大的信息量将阻塞系统。作为Backend,redis是一个超级快速K/V存储,使得在获取一个task调用结果时很高效。redis的设计需要考虑,存储数据的最小内存和持久化数据的方式。假如持久化的结果很重要,考虑使用另外的DB作为后端。详情点击

RabbitMQ是一个Broker,作为Broker,RabbitMQ相比redis能更好的处理大量消息,无论很多消息非常快的传入,scaling弹性扩写可能会是个问题,除非RabbitMQ大规模运行外,尽量使用Redis或SQS。作为Backend,RabbitMQ可以通过rpc:// backend来存储,会为每个客户端创建临时的队列。详情点击

注意:RabbitMQ 作为 broker和Redis 作为 backend经常组合使用。假如需要对结果存储长期持久化,要考虑使用PostgreSQL或MySQL(通过SQLAlchemy),Cassandra或自定义存储后端。

Amazon SQS 是一个broker,如果已经和aws紧密集成,又对sqs较熟悉,sqs是一个不错选择。有很强的扩展性和完全托管性,类似于RabbitMQ的管理任务委派。但缺少RabbitMQ broker的某些特性比如worker的远程控制命令。详情点击

SQLAlchemy是一个backend,允许celery和MySQL, PostgreSQL, SQlite等数据库集成进来,SQLAlchemy是一个ORM,是一种celery使用sql db作为存储后端的方式。从经验上看,SQLAlchemy并不是最稳定的后端,请谨慎使用。

Celery 安装升级,hello world以及最佳实践稍后奉上。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容