Celery 使用 +

在项目中使用celery

项目结构:

proj/__init__.py  # 注意这个文件 django是不带的默认 对应django的项目
      /celery.py
      /tasks.py

proj/celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
         broker='amqp://',  # 使用的消息队列
         backend='amqp://',  # 使用的结果存储
         include=['proj.tasks'])  # 任务们

if __name__ == '__main__':
    app.start()

proj/tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y): 
    return x + y

启动工作进程:

celery -A proj worker -l info

-------------- celery@halcyon.local v4.0 (latentcall)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//

  • ** ---------- . app: main:0x1012d8590
  • ** ---------- . concurrency: 8 (processes)
  • ** ---------- . events: OFF (enable -E to monitor this worker)
  • ** ----------
  • *** --- * --- [Queues]
    -- ******* ---- . celery: exchange:celery(direct) binding:celery
    --- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.

broker就是配置中的消息队列,concurrency就是工作进程数量(默认是cpu数量)如果所有进程都被占用了那么新的任务需要等待,events是指定celery是否发送监控信息,queue就是队列。

终止工作进程

直接control -c(按照上述命令在前台启动的),当然了如果在后台启动和停止:

celery multi start w1 -A proj -l info
celery  multi restart w1 -A proj -l info

# 异步关闭 立即返回
celery multi stop w1 -A proj -l info
# 等待关闭操作完成
celery multi stopwait w1 -A proj -l info

默认会在当前目录下创建pid和log文件,指定路径和名称:

celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
                                    --logfile=/var/log/celery/%n%I.log

关于-A(--app)选项:
指定celery app,可以用 module.path:attribute的形式指定,也可直接指定package,如--app=proj,然后会搜索其中的proj.app,proj.celery等。

操作已被创建的worker时,不需要参数什么的完全一样,只要pidfile和logfile一样即可。

调用

add.delay(1, 1)
add.apply_async((1, 1))

add.apply_async((2, 2), queue='lopri', countdown=10)
# 指定要发送到哪个队列 运行时间延迟countdown

# 结果获得
res  = add.delay(1, 1)
res.get()

celery默认不产生结果的原因:因为各种应用的需求不同,而大多数任务保存返回值没有什么意义,而且产生结果并不是用来监控任务和工作进程,应该是使用event消息的专门的监控模块。

res.id  # 任务id uuid

# 任务出错会raise 错误 可指定propagate
res.get(propagate=False)
res.failed()
res.successful()
res.state

# PENDING -> STARTED -> SUCCESS(FAILURE)
# STARTED 需要设置 task_track_started 或在任务级别设置@task(track_started=True)
# 重试的情况也有
# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

工作流

有时想把一个任务的签名(包含了实参和任务的执行选项)发送到另一个进程或把参数发送到另一个函数。

add.signature((2, 2), countdown=10)  # with args and kwargs
add.s(2, 2)  # with args only

然后获得的签名就可以调用:

s1 = add.s(2, 2)
res = s1.delay()
res.get()

# 也可以部分指定参数 然后在调用时补全参数    
s2 = add.s(2)
res = s2.delay(8)
res.get()

调用方式

之前一直使用的是delay(*args, **kwargs)的方式来调用,还有apply_async的调用方式,支持一些执行时的选项

Groups

group是同时调用的一系列任务,返回特殊的结果可以获得组内所有的执行结果,可以按照任务的顺序从该结果中获得对应任务的结果。

group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# 组也支持部分参数化
g = group(add.s(i) for i in xrange(10))
g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

把任务连接起来,执行完一个后把结果传入另一个再调用。

chain(add.s(4, 4) | mul.s(8))().get()
g = chain(add.s(4) | mul.s(8))
g(4).get()
# 也支持部分参数化

chords

chord是一个带callback的group

from celery import chord
from proj.tasks import add, xsum

chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
# 而group的结果发送到另一个任务则自动转换为chord
(group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()

# 可以把单任务的结果发送到组中
upload_doc.s(file) | group(apply_filter.s() for filter in filters)

Routing

在app层面配置task到某个队列:

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

在调用时指定使用某个队列:

add.apply_async((2, 2), queue='hipri')

启动工作进程时指定该进程处理的队列:

celery -A proj worker -Q hipri

# 指定多个处理的队列 默认队列名称:celery
celery -A proj worker -Q hipri,celery

远程控制

查看工作进程当前的工作:

celery -A proj inspect active

这个命令是一个广播,所有工作进程都会收到。(需要启用event)

celery -A proj inspect active --destination=celery@example.com

让工作进程启用event

celery -A proj control enable_events
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容