Celery 进阶学习
参考链接: Celery 4.1.0 documentation
初始文件
安装部署celery
相关的pip
包,参考文档或Celery 部署小记
另外,本文使用ipython
作为控制台的交互式解释器,pip install ipython
tasks.py
例1
from celery import Celery
class CeleryConfig():
broker_url = 'redis://localhost'
result_backend = 'redis://localhost'
timezone = 'Asia/Shanghai'
app = Celery()
app.config_from_object(CeleryConfig)
@app.task
def add(a, b):
return a + b
以上文件可以正常通过以下命令启动
celery -A tasks worker --loglevel=info
例1中使用类的方式来加载配置,其他方式有:
-
app.conf.timezone = 'Asia/Shanghai'
或app.conf.update(option1=True, option2='xxx', ...)
-
app.config_from_object(param)
方法,参数可以是模块名的字符串形式、模块对象实体、配置的类或对象等 -
app.config_from_envvar(param)
方法,参数是系统的环境变量名,而这个变量对应的值是模块的字符串。如:os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
抽象任务(类)
所有task
都必须使用@app.task
装饰器来装饰,经过装饰器之后,这些任务会继承Task
类。可以通过继承Task
类,来创建一个抽象类,供task
装饰
tasks.py
例2
from celery import Celery
# 抽象tasks
from celery import Task
class DebugTask(Task):
# 在调用之前打印一行字
def __call__(self, *args, **kwargs):
print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
return super(DebugTask, self).__call__(*args, **kwargs)
class CeleryConfig():
broker_url = 'redis://localhost'
result_backend = 'redis://localhost'
timezone = 'Asia/Shanghai'
app = Celery()
app.config_from_object(CeleryConfig)
@app.task(base=DebugTask)
def add(a, b):
return a + b
ipython调试
In [1]: from tasks import add
In [2]: add.delay(2, 3)
Out[2]: <AsyncResult: d9e63190-0591-403d-a5be-8b59893fcb2d>
celery输出
[2017-11-20 15:52:05,660: INFO/MainProcess] Received task: tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d]
[2017-11-20 15:52:05,662: WARNING/ForkPoolWorker-4] TASK STARTING: tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d]
[2017-11-20 15:52:05,666: INFO/ForkPoolWorker-4] Task tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d] succeeded in 0.00480927500029793s: 5
acks_late选项
task
在经过worker
确认(acknowledge)之后,才会从worker
的任务队列中移除。并且worker
维护的任务队列可以保留相当大量的队列信息,即使这个worker
被杀掉,任务信息仍然可以转移到其他的worker
中
broker
在收到来自worker
的确认之后,便不会发送任务信息给其他的worker
worker
默认的确认时机:任务信息被worker
接收设置了
acks_late
选项之后,worker
的确认时机变为任务被实际执行在保证
task
是idempotent
(幂等的),即任意次执行代码所产生的影响和一次执行的影响相同,可以使用acks_late
选项http://docs.celeryproject.org/en/master/userguide/tasks.html#Task.acks_late
http://docs.celeryproject.org/en/master/faq.html#faq-acks-late-vs-retry
无限期阻塞的任务
由于网络传输等问题,导致任务无限期阻塞,会阻止此worker
实例执行其他工作,解决方案是:
- I/O任务:确保添加超时(可配合
retry
)。例如:使用requests
库
connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
-
time limits
选项也可以很方便地规定任务遵循一个时间限制,但是这个时间过后,这个worker
会被直接杀掉,所以该选项仅用于没有使用其他超时方案的情况 (http://docs.celeryproject.org/en/master/userguide/workers.html#worker-time-limits)
Prefork池预取设置
prefork池默认将异步发送尽可能多的任务到进程中(进程预取任务)。对于延时短的任务,这样会加快速度,但是如果是高延时的任务,该进程后面的任务会长期处于等待。
默认设置: worker
会发送任务给缓冲区可写的进程,例子如下
-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B
-> send task T3 to process A
# A still executing T1, T3 stuck in local buffer and won't start until
# T1 returns, and other queued tasks won't be sent to idle processes
<- T1 complete sent by process A
# A executes T3
使用-Ofair
选项可以关闭预取设置,此时,worker
会发送任务给真正可用于工作的进程,例子如下
-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B
-> send T3 to process B
# B executes T3
<- T3 complete sent by process B
<- T1 complete sent by process A
task options
@app.task(options...)
logging
worker
会自动建立log,当然你也可以自定义log,例子如下
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
参数检测(typing)
>>> @app.task
... def add(x, y):
... return x + y
# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 376, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 485, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)
# typing 属性
>>> @app.task(typing=False)
... def add(x, y):
... return x + y
# Works locally, but the worker reciving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
隐藏敏感信息(避免进入log)
v4.0之后,且
task_protocol
为2或以上才有效(该值在4.0之后默认为2)
可以使用argsrepr
和kwargsrepr
调用参数来覆盖敏感信息,例子如下
>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
>>> charge.s(account, card='1234 5678 1234 5678').set(
... kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()
但实际上,只要可以从
broker
中读取数据,仍然可以获得这些“敏感信息”,所以如果需要高度保密的数据,要使用其他方法存储(加密等)
重试(Retrying)
当任务执行出现错误情况,可以通过设置retry
来解决可恢复的错误。celery
的retry
机制会确保由相同的队列去执行此task-id
的原始任务。简单的例子如下
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
特别注意,retry
是raise
出来的,所以,即使后面有代码,也不会执行。另外,这个异常会被worker
视为需要重试,以便在启用result backend
时,可以存储正确的状态(RETRY)
重试的各种选项和应用:
以下是@app.task
的参数
- max_retries: 当超过该参数设置的重试次数,会终止并报错。默认是3次,设置为None表示不重试
- default_retry_delay: 默认重试间隔时间。默认是180s,可以在
retry
调用中使用countdown
参数覆盖 - autoretry_for: 对特定的异常自动重试,如
autoretry_for=(Exception,)
,仅适用于v4.0 - retry_kwargs: 后面接字典类型,如
retry_kwargs={'max_retries': 5}
以下三个选项是v4.1版本支持的
- retry_backoff: 可以是布尔型或者数字,如果设置为
True
,则遵循“指数退避”,即第一次重试在1s后,第二次2s,第三次4s,第n次(2^(n-1))s。 如果设置为数字m,则表示基数为m,即第一次3s,第二次6s,第n次(3×2^(n-1))s。 如果设置为False
,则表示不延迟 - retry_backoff_max: 如果
retry_backoff
打开,这个选项决定了两次自动重试之间最大的延时,超过了这个延时,则不再重试。默认600s (这里理解可能有问题?需要具体测试: If retry_backoff is enabled, this option will set a maximum delay in seconds between task autoretries. By default, this option is set to 600, which is 10 minutes.) - retry_jitter: 布尔型,jitter(抖动)。抖动用于将随机性引入指数退避延迟,以防队列中过多的任务同时被执行。如果设置为
True
,则retry_backoff
计算的延迟值将被视为最大值,而实际延迟值将是介于0和最大值之间的随机数。该设置默认为True
task方法选项列表
类似@app.task(option1=xx, option2=yy),括号内的参数即选项
http://docs.celeryproject.org/en/master/userguide/tasks.html#general
一些选项(部分参考文档即可):
- max_retries: 只有在任务调用self.retry或者任务使用autoretry_for参数进行装饰时才适用
- throws: 值是tuple类型,里面的“异常”不会被视为错误而导致失败或重发,即使发生,也是成功的
- default_retry_delay: 重试间隔时间
- rate_limit: 关联
task_default_rate_limit
这个设置,表示某个时间段执行的任务数目,如:"100/m"
表示一分钟最多100条。默认无限制 - time_limit / soft_time_limit: 任务完成的时间限制。前者是经过时限之后
worker
被杀死,然后用新worker
代替,后者是经过时限之后抛出SoftTimeLimitExceeded
异常,供开发者处理。默认没有时限。 - ignore_result / store_errors_even_if_ignored
- name
- request
- serializer
- compression
- backend
- ack_late
- track_started
状态
Handlers
在任务返回、失败、重试、成功、超时等事件发生的时候,触发特定的方法:after_return
, on_failure
, on_retry
, on_success
, on_timeout
...
可用于状态转移的监控,如发邮件提醒等
一个自定义请求的例子如下
import logging
from celery.worker.request import Request
logger = logging.getLogger('my.package')
class MyRequest(Request):
'A minimal custom request to log failures and hard time limits.'
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'A hard timeout was enforced for task %s',
self.task.name
)
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
super(Request, self).on_failure(
exc_info,
send_failed_event=send_failed_event,
return_ok=return_ok
)
logger.warning(
'Failure detected for task %s',
self.task.name
)
class MyTask(Task):
Request = MyRequest # you can use a FQN 'my.package:MyRequest'
@app.task(base=MyTask)
def some_longrunning_task():
# use your imagination
最佳实践
- 忽略不需要的结果,
ignore_result=True
- 尽量避免使用同步子任务(task调用需要依赖其他task执行的结果,这样会造成互相等待,陷入死锁) Avoid launching synchronous subtasks
- 设置
broker_pool_limit
,默认为10,可以根据使用连接的线程的数目调整 link -
worker_prefetch_multiplier
表示一次prefetch
多少条消息乘以并发进程数,默认值为4(每个进程4个消息)。对于长时间的任务,可以把这个值设置为1,其实就相当于关闭预取;对于短时任务,可以设置大一些,比如64,128等;对于长短不一的任务,可以通过Routing Tasks,即分队列的方式执行 - 针对长任务,许多人希望的是让当前执行的任务数与保留待确认的任务数目相同,且都等于当前并发数(如
-c 10
,此时在执行的任务是10个,等待的任务数也是10个)。满足这样的要求选项:task_acks_late = True
和worker_prefetch_multiplier = 1
- 在默认的
prefork
模式下,进程池中的进程可能处于空闲或忙碌的状态。-O
是优化选项,如果是default
,进程是预取来自worker
中的任务的,可能造成长时间的等待;如果是fair
,进程只在有空闲的时候,才会去取任务执行。设置fair
对于耗时长的任务来说比较有利