Celery介绍
分布式任务队列是一种跨线程或机器调度work的机制。一个任务队列的输入是一个叫task的work单元。专用worker进程不断监控任务队列以执行新work。
Celery就是一个分布式任务队列(Distributed Task Queue)。Celery通过收和发消息来通信,在clients和workers间使用一个broker代理来调解。RabbitMQ和Redis作为代理broker支持celery的该需求特性,但是也有一些其他实用性解决方案,比如用于本地开发的SQLite。初始化一个task时,client向队列新增一条消息,broker将消息传递给worker。
一个Celery系统可以由多个workers和brokers组成,以提供高可用和水平扩展。
Celery可以运行在单个虚机或多个虚机上,甚至是跨数据中心上。
Celery是Python语言开发,但协议可以使用任何语言实现。除了Python外,还有Node.js的node-celery和node-celery-ts,以及一个 PHP 客户端。还可以通过暴露HTTP endpoint端点并具有请求的任务(webhooks)来实现语言互操作性。
Celery开源的,在BSD license下获得许可。
特点
1) simple
Celery易使用和易维护,不需要配置文件。 有友好和活跃的社区提供支持。
2) Highly Available高可用性
在有连接丢失或失败场景下,Workers和clients之间有重试机制,一些broker支持高可用方式:主主和主备副本模式。
3) Fast快速(低时延)
一个Celery单进程每分钟可以处理百万个tasks,仅有亚毫秒的传输时延(使用RabbitMQ和librabbitmq并优化配置)。
4) Flexible弹性
Celery几乎每部分都可以扩展或使用,自定义池实现、序列化程序、压缩方案、日志记录、调度程序、消费者、生产者、代理传输等等。
Celery特性
1) Monitoring监控
监控事件流由worker发出,并由内置和外部工具实时告知集群正在做什么。
在celery集群中如何监控broker(RabbitMQ和Redis)?
status: 查询该集群中活跃的node状态
$ celery -A proj status
result: 查询某个task的执行结果
$ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
注意:只有不使用自定义后端,就可以省略task的名称。
purge: 从配置的task队列中清除消息
该命令将移除CELERY_QUEUES配置中所有队列内部的消息。警告:此操作不可以反悔,所有消息将被删除。
$ celery -A proj purge
使用参数-Q option来指定队列:
$ celery -A proj purge -Q celery,foo,bar
使用-X option参数排除某个队列:
$ celery -A proj purge -X celery
inspect active: 查询活跃的tasks
$ celery -A proj inspect active
inspect scheduled: 查询被调度到worker上的ETA或倒计时参数的 tasks
$ celery -A proj inspect scheduled
inspect reserved: 查询已调度到worker,正要准备执行的tasks(不包括eta类型的tasks)
$ celery -A proj inspect reserved
inspect stats: 查询worker信息统计
$ celery -A proj inspect stats
migrate: 迁移替换broker
$ celery -A proj migrate redis://localhost amqp://localhost
2) Scheduling调度
支持以秒或日期形式来指定一个task运行的时间,支持以简单的时间间隔或者以分,小时,每周某天,每月某天,每年某月来执行来周期性的task。
celery beat是一个调度器,定期启动任务,由集群中可用的worker来执行。
周期性task调度默认使用UTC时区,可以通过timezone来设置时区。
from celery import Celery
app = Celery()
app.conf.timezone = 'Europe/London'
启动一个celery beat服务:
$ celery -A proj beat
Beat需要在一个叫celerybeat-schedule的本地文件中存储task最后一次运行的时间
$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
默认情况下,entries从beat_schedule设置中获取,但也可以使用自定义存储比如在sql数据库中的entries。
必须保证一次只能运行一个调度器,否则会有重复的任务。使用集中式的方法意味着调度不需要同步,服务运行也不用使用锁。
3) Work-flows工作流
简单和复杂的work-flows由canvas一系列强大原语组成,包括分组,连接和分块。
通常会使用task延迟方法调用任务,但有时会将任务调用的签名signature传递给另一个进程或作为另一个函数的参数。signature()以某种方式包装单个任务调用的参数、关键字参数和执行选项,以便它可以传递给函数,甚至可以序列化并通过网络发送。
from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)
4) Resource Leak Protection资源泄露保护
--max-tasks-per-child选项用于可能泄漏资源(如内存或文件描述符)的用户任务,这些资源完全不受控制。
--max-tasks-per-child或worker_max_tasks_per_child:一个worker可以执行的task的最大个数,这对于无法控制内存泄露的闭源C扩展程序来说很有用。
--max-memory-per-child 或worker_max_memory_per_child:一个worker的最大常驻内存,这对于无法控制内存泄露的闭源C扩展程序来说很有用。
--autoscale:指定线程池的最大最小数,在该范围内根据负载来动态调整线程个数。
--autoscale=AUTOSCALE
always keep 3 processes, but grow to 10 if necessary:--autoscale=10,3
5) Time & Rate Limits时间和速率限制
支持指定每时每分每秒可以运行tasks的数量,支持指定一个task可以运行多长时间,当然这些根据task类型来设定task的运行时间以及运行时长。
一个task有可能永久运行,另外的任务将无法执行,最好的方法是开启时间限制防止该场景发生。–time-limit指定一个task可能运行的最大时长(秒),在被杀掉或替换新进程之前。你可以开启–soft-time-limit,在–hard-time-limit强制杀掉task之前捕获异常并进行清理。
from myapp import app
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
do_work()
except SoftTimeLimitExceeded:
clean_up_in_a_hurry()
注意:时间限制不适用于在不支持SIGUSR1信号的平台上工作。
>>> app.control.time_limit( 'tasks.crawl_the_web', soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]
指定每分钟最多运行的task个数,也可以指定task的destination。
>>> app.control.rate_limit('myapp.mytask', '200/m')
>>> app.control.rate_limit('myapp.mytask', '200/m',destination=['celery@worker1.example.com'])
6) User Components用户自定义组件
每个worker组件都可以定制化,附加组件都可以用户自定义。worker以bootsteps构建,一个依赖图可以对worker内部进行细粒度控制的。
Celery版本历史
当前稳定版本Celery version 5.1 可以运行在Python ❨3.6, 3.7, 3.8❩和PyPy3.6 ❨7.3❩环境
Celery 4.x是支持Python 2.7的最后一个版本 Celery 5.x和Celery 5.1.x需要在Python 3.6或更新版本。
如果运行更老版本Python,需要celery相应d的版本。
Python 2.7或3.5: Celery 4.4系列或更早。
Python 2.6: Celery 3.1系列或更早。
Python 2.5: Celery 3.0系列或更早。
Python 2.4 : Celery 2.2系列或更早。
Celery是一个资金资助最少的项目,所以不支持Microsoft Windows平台,不要提此平台的问题。
celery broker and backends
celery支持和多个不同backends和brokers进行通信和存储,backends后端用于存储结果,brokers代理用于消息传输。
实验性的brokers只是功能性的,并没有维护者。缺少监控意味着传输没有实现事件,就不支持Flower, celery events, celerymon 等其他基于事件监控的工具。
远程控制意味着支持celery在运行时能inspect和管理workers,使用celery inspect或celery控制命令或者其他远程控制API的工具。
Redis既可以做backend也可以做broker。作为broker,对于小的信息量传输很快,但大的信息量将阻塞系统。作为Backend,redis是一个超级快速K/V存储,使得在获取一个task调用结果时很高效。redis的设计需要考虑,存储数据的最小内存和持久化数据的方式。假如持久化的结果很重要,考虑使用另外的DB作为后端。详情点击。
RabbitMQ是一个Broker,作为Broker,RabbitMQ相比redis能更好的处理大量消息,无论很多消息非常快的传入,scaling弹性扩写可能会是个问题,除非RabbitMQ大规模运行外,尽量使用Redis或SQS。作为Backend,RabbitMQ可以通过rpc:// backend来存储,会为每个客户端创建临时的队列。详情点击。
注意:RabbitMQ 作为 broker和Redis 作为 backend经常组合使用。假如需要对结果存储长期持久化,要考虑使用PostgreSQL或MySQL(通过SQLAlchemy),Cassandra或自定义存储后端。
Amazon SQS 是一个broker,如果已经和aws紧密集成,又对sqs较熟悉,sqs是一个不错选择。有很强的扩展性和完全托管性,类似于RabbitMQ的管理任务委派。但缺少RabbitMQ broker的某些特性比如worker的远程控制命令。详情点击。
SQLAlchemy是一个backend,允许celery和MySQL, PostgreSQL, SQlite等数据库集成进来,SQLAlchemy是一个ORM,是一种celery使用sql db作为存储后端的方式。从经验上看,SQLAlchemy并不是最稳定的后端,请谨慎使用。
Celery 安装升级,hello world以及最佳实践稍后奉上。