5 工作进程

启动

# 在前台启动celery
celery -A proj worker -l info

# 可在一台机器上启动多个工作进程
# 注意:设置不同的 --hostname(-n)
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

--hostname (-n) 可用的参数变量
# %h: Hostname, including domain name.
# %n: Hostname only.
# %d: Domain name only.
注意如果是supervisor则需要转义%

停止

可以使用TERM信号来停止,工作进程会完成它当前执行的程序,你需要等待它完成。如果想立即停止则可发送KILL信号,但是会造成任务丢失,除非该任务启用了acks_late选项。此外不能正常终止它的子进程,所以需要使用:

pkill -9 -f 'celery worker'
或
ps auxww | grep 'celery worker' | a wk '{print $2}' | xargs kill -9

重启

celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
celery multi restart 1 --pidfile=/var/run/celery/%n.pid

也可以发送HUP信号来重启:

kill -HUP $pid
# 在mac下不可用

信号

TERM Warm shutdown, wait for tasks to complete.

QUIT Cold shutdown, terminate ASAP

USR1 Dump traceback for all active threads.

USR2 Remote debug, see

文件类参数 选项

[--logfile] [--pidfile] [--statedb]

%p: Full node name.
%h: Hostname, including domain name.
%n: Hostname only.
%d: Domain name only.
%i: Prefork pool process index or 0 if MainProcess.
%I: Prefork pool process index with separator.

多进程 选项

%i - Pool process index or 0 if MainProcess.
%I - Pool process index with separator.

并发

默认是使用多进程方式来处理,可用Eventlet。

远程控制

远程控制是通过使用高优先级别的广播队列,可发送给所有也可指定工作进程。命令可以有响应,调用者可以等待并收集回复。因为无法知道集群中有多少个工作进程,所以也没办法知道有多少个工作进程会发送回复,所有有一个等待回复的超时时间,默认是1秒钟,可以根据需求更改这个值。另可指定收到回复的最大数量,可指定接收者。

broadcast

用于把命令发送到工作进程,其他的一些命令可能也是在后台使用broadcast。

app.control.broadcast('rate_limit', 
    arguments={'task_name': 'myapp.mytask',  
    'rate_limit': '200/m'})

# 这样默认是异步的 不等待回执,可指定reply=True

app.control.broadcast('rate_limit', {
    'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)

# 可指定接收的工作进程 destination

app.control.broadcast('rate_limit', {
    'task_name': 'myapp.mytask',
    'rate_limit': '200/m'}, reply=True,
                           destination=['worker1@example.com'])

命令

revoke 取消

当一个工作进程收到了取消命令,会跳过执行该任务,但是如果已经开始执行了则不会终止,除非开启了terminate选项(terminate关闭的是进程,而该进程可能已经开始处理其他任务了,所以应该在任务卡住的情况下使用该选项)。

例子:

result.revoke()

AsyncResult(id).revoke()

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', terminate=True)

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', terminate=True, signal='SIGKILL')

一次取消多个任务:
app.control.revoke([
    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
    'f565793e-b041-4b2b-9ca4-dca22762a55d',
    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

持久化取消请求

取消任务的方式是广播消息给所有工作进程,工作进程会在内存中维护一个取消状态的任务列表,当工作进程启动时会和集群中的其他工作进程同步。由于是在内存中保存的,所以如果所有的工作进程都重启了那么这个列表就消失了。可以使用-statedb选项来指定一个文件来存储。

celery -A proj worker -l info --statedb=/var/run/celery/worker.state

如果是多进程的话需得标示节点

celery multi start 2 -l info --statedb=/var/run/celery/%n.state

Time Limits

一个任务默认是可以一直运行的,如果有很多任务在等待,则会阻塞工作进程,无法处理新的任务。最好的方式是指定时间限制。

时间限制分为两种soft hard,soft会抛出异常,任务中可以捕获它并进行处理,hard则是不能捕获的,强制结束该任务。
-time-limit -soft-time-limit

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

app.control.time_limit('tasks.crawl_the_web',
                       soft=60, hard=120, reply=True)

Rate Limits

# 每分钟最多执行200个任务
app.control.rate_limit('myapp.mytask', '200/m')

app.control.rate_limit('myapp.mytask', '200/m',
          destination=['celery@worker1.example.com'])

如果设置了worker_disable_rate_limits则不会受影响

Max tasks per child setting

设置一个工作进程可执行的最大任务数量,可以处理有资源泄露的情况

Max memory per child setting

设置一个工作进程可使用的最大内存大小,可以处理有资源泄露的情况

Autoscaling

自动伸缩,可设置worker的最大数量和最小数量
--autoscale=10,3

Queues

指定工作进程只在某个队列中获取任务:

celery -A proj worker -l info -Q foo,bar,baz

如果队列之前没有创建过则可以使用task_create_missing_queues选项指定生成新队列。也可以在运行时通过add_consumer、cancel_consumer来控制。

celery -A proj control add_consumer foo
celery -A proj control add_consumer foo -d celery@worker1.local
app.control.add_consumer('foo', reply=True)
app.control.add_consumer('foo', reply=True,
                        destination=['worker1@example.com'])
>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['w1@example.com', 'w2@example.com'])

告知worker处理foo队列

celery -A proj control cancel_consumer foo
celery -A proj control cancel_consumer foo -d celery@worker1.local
app.control.cancel_consumer('foo', reply=True)

告知worker取消处理foo队列

celery -A proj inspect active_queues
celery -A proj inspect active_queues -d celery@worker1.local
app.control.inspect().active_queues()
app.control.inspect(['worker1.local']).active_queues()

查看

Inspecting workers

i = app.control.inspect()
i = app.control.inspect(['worker1.example.com', 'worker2.example.com'])
i = app.control.inspect('worker1.example.com')

i.registered()
i.active()
i.scheduled()
i.reserved()
celery -A proj inspect stats

Additional Commands

关闭

app.control.broadcast('shutdown')
app.control.broadcast('shutdown', destination='worker1@example.com')

ping

app.control.ping(timeout=0.5)

监控管理

app.control.enable_events()
app.control.disable_events()

Writing your own remote control commands

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 1. Nginx的模块与工作原理 Nginx由内核和模块组成,其中,内核的设计非常微小和简洁,完成的工作也非常简单...
    rosekissyou阅读 10,208评论 5 124
  • @(python)[笔记] 目录 一、什么是进程 1.1 进程的概念 进程的概念起源于操作系统,是操作系统最核心的...
    CaiGuangyin阅读 1,256评论 0 9
  • 第一章 Nginx简介 Nginx是什么 没有听过Nginx?那么一定听过它的“同行”Apache吧!Ngi...
    JokerW阅读 32,664评论 24 1,002
  • linux资料总章2.1 1.0写的不好抱歉 但是2.0已经改了很多 但是错误还是无法避免 以后资料会慢慢更新 大...
    数据革命阅读 12,157评论 2 33