Celery文档总结(一)

目前我接触的distributed task queue比较有名的是python的celery和go的nsq, 本文是我在学习celery的一些总结

Celery 是什么?

Celery是一个简单, 灵活, 可靠的分布式系统, 用于处理大量消息, 同时为操作提供维护此类系统所需的工具. 它是一个任务队列, 专注于实时处理, 同时还支持任务调度

Celery 优点

  • 简单: celery使用很简单, 你可以不用配置就可以启动一个任务
  • 高度可用: worker和clients会自动处理失败或丢失的消息
  • 快: 一个celery每分钟可以处理数百万的任务(使用RabbitMQ并做好优化)
  • 灵活: 几乎Celery的每个部分都可以自行扩展或使用, 自定义池实现, 序列化器, 压缩方案, 日志记录, 调度程序, 消费者, 生产者, 代理传输等等

Celery 架构

celery架构

Broker: RabbitMQ, Redis
Backend: Redis, Memcached, SQLAlchemy, Cassandra, Elasticsearch
Concurrency: Prefork, Eventlet, Gevent
Serialization: Pickle, Json, Yaml, Zlib, Bzip2

Celery 特性

Monitoring 自检

  • Celery命令行(可以通过celery <command> --help了解celery的各种命令)
  • Flower是一个Django搭建的celery实时监测拓展
  • RabbitMQ(rabbitmqctl list_queue)和Redis, 查看各种broker和backend数据
  • 通过代码使用celery events跟踪任务
from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,  # 任务状态:处理函数
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Scheduling 调度

celery beat是一个调度器, 它定期启动任务, 然后由群集中的可用工作节点执行

  1. 将任务添加到节拍调度器 beat shedule
from celery import Celery
from celery.schedules import crontab

app = Celery()
app.conf.timezone =  'Asian/Shanghai'  # 需要设置时区, 默认是UTC,
app.conf.beat_schedule = {  # 配置中设置定时任务
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,  
        'schedule': crontab(hour=7, minute=30, day_of_week=1), 
        'schedule': solar('sunset', -37.81753, 144.96715),
        'args': (16, 16)
    },
}

@app.task
def add(a, b):
    return a + b

# 使用定时函数设置定时任务
@app.on_after_configure.connect  # 这个装饰器确保了配置完成后才调用函数
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)
    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)
  1. 启动定时任务celery -A proj beat

Work-Flows 任务流

Signatures : 包装单个任务调用的参数, 关键字参数和执行选项, 以便可以将其传递给函数, 甚至可以通过线路进行序列化和发送

s = add.s(2, 2, {'debug': True}).set(countdown=1)  # 星形参数的快捷方式, 通过set()函数定义options
s.args  # (2, 2)
s.kwargs  # {'debug': True}
s.options  # {'countdown': 10}

# Partial可以实现部分用于回调
s()  # 相当在当前进程执行add(2, 2)
s.delay()  # 相当在当前进程执行add.delay(2, 2)
s.apply_async()  # 相当在当前进程执行add.apply_asybc(2, 2)
p = add.s(2)  # 所有参数都可以在后续流程中传入
p.apply_async(args=(4,),kwargs={'debug': True}, countdown=1)

# Immutable不希望带上上一个任务的结果
add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
add.apply_async((2, 2), link=reset_buffers.si())
add.si(2, 3)  # 最好使用这个方式, 简单

# Callbacks可以用于任务回调
add.apply_async((2, 2), link=other_task.s())

# Chain 任务链式完成, 上一个结果可以用于后续任务
from celery import chain
res = chain(add.s(2, 2), add.s(4), add.s(8))()
res.get()  # 16
res1 = (add.s(2, 2) | add.s(4) | add.s(8))().get()  # 链式传递
res1.get()  # 16
res1.parent.get() # 8
res1.parent.parent.get()  # 4
res2 = (add.si(2, 2) | add.si(4, 8) | add.si(10, 10))()  # 结果不传递
res2.get()  # 20
res2.parent.get() # 12
res2.parent.parent.get()  # 4

# Group 创建一组要并行执行的任务
from celery import group
res = group(add.s(i, i) for i in xrange(10))()
res.get(timeout=1)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# Chord 所有任务完成执行时添加要调用的回调
from celery import chord
res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
res.get()  # 90

# Map he Chunk
xsum.map([range(10), range(100)])
res = add.chunks(zip(range(100), range(100)), 10)()

Time & Rate Limits 时间评率限制

可以控制单个任务的请求评率和执行时间限制

# 时间限制有soft和hard之分, 运行超过soft会抛出SoftTimeLimitExceeded异常, 超过hard会直接终止任务, 可以在configuration中设置
task_soft_time_limit = 60
task_time_limit = 120
task_default_rate_limit = '200/m'

# 可以在任务中设置
@app.task(time_soft_limit=60 ,time_limit=120, rate_limit='200/m')
def test():
    pass

# 远程调用下面可以在运行中修改任务时间和评率限制
app.control.time_limit('tasks.crawl_the_web', soft=60, hard=120, reply=True)  
app.control.rate_limit('myapp.mytask', '200/m', destination=['celery@worker1.example.com'])  # destination可以指定相应节点限制评率

Resource Leak Protection 资源泄露保护

可以通过设置来保护工作节点的资源不被泄露, 使用此选项, 您可以配置工作程序在被新进程替换之前可以执行的最大资源配置, 如果您无法控制资源使用, 例如来自闭源C扩展, 则此功能非常有用

  • Max tasks per child setting 最大任务数, 可以使用workers [--max-tasks-per-child]
  • Max memory per child 最大内存, 可以使用workers [--max-memory-per-child]
# 池工作进程在用新工作进程替换之前可以执行的最大任务数, 默认是没有限制的
worker_max_tasks_per_child = 10
# 在新worker替换之前,worker可能消耗的最大驻留内存量, 如果单个任务导致worker超过此限制, 则任务将完成,worker将被替换
worker_max_memory_per_child = 12000  # 12MB

User Components 用户组件

可以定制每个工作组件, 并且可以由用户定义其他组件. 工作人员使用“bootsteps”构建 - 一个依赖关系图, 可以对工人的内部进行细粒度控制

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 7,289评论 0 10
  • 1.定义: Celery是一个异步的任务队列(也叫做分布式任务队列) 2.工作结构 Celery分为3个部...
    四号公园_2016阅读 28,701评论 5 60
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • =========================================================...
    lavor阅读 3,484评论 0 5
  • 早在三月底,我就认识到了我自己的写文问题,我就知道是因为我读书少,思想认识不到位,写出的文章没有深度和广度。 将近...
    静心专注阅读 566评论 2 1