celery-beat分发定时任务
beat 概念
celery beat 是一种调度者;
celery beat 定期将任务及参数发送给集群中的可用worker节点,然后任务由可用worker节点执行。
beat配置
在celery的配置文件中
beat_schedule = {
'tt': { # 任务名称
'task': 'tasks.task_beat.tt', # 真实执行的task路径
'schedule': crontab(minute= '*/1'), # crontab timer
'options': {
'queue': 'monitor:taskq' # 是否用单独的队列,如用单独队列,需要指定
}
},
}
beat/worker启动
编写好task
使用 celery beat -A tasks -l debug
启动beat
使用 celery -A tasks worker -Q 队列名称 -l info -E
启动worker
任务完成情况追踪
使用celery event机制,监听task执行成功时会发出的 task-succeeded 事件。
开启event 需要在配置文件中 配置 task_send_sent_event = True
event 介绍
def monitor_events():
def on_event(event):
"""
:param event:
:type event dict
:return:
"""
task_result = literal_eval(event['result'])
deal_with_task_result....
debug_logger = auto_log('beat recv')
logger = influx_logger('influx_monitor_events')
while 1:
try:
debug_logger.info('start...')
with app.connection() as conn:
recv = app.events.Receiver(conn, handlers={'task-succeeded': on_event})
recv.capture(limit=None, timeout=10, wakeup=True)
except (InterruptedError, KeyboardInterrupt):
debug_logger.info(' recv end ...')
break
except Exception as e:
debug_logger.error(e)
任务异常处理
有两种方式
1.重载Task.on_success、Task.on_failure
2.监听FAILURE事件
重载Task.on_success、Task.on_failure
这里采用自定义的Task类(Task是celery的内置组件。Task是可以从任何可调用对象创建的类。Task用于封装我们自定义的任务,在Task中,celery处理任务的分发,结果/异常的处理,任务的执行),来增加任务异常的处理。
class CallbackTask(Task):
def on_success(self, retval, task_id, args, kwargs):
"""Success handler.
Run by the worker if the task executes successfully.
Arguments:
retval (Any): The return value of the task.
task_id (str): Unique id of the executed task.
args (Tuple): Original arguments for the executed task.
kwargs (Dict): Original keyword arguments for the executed task.
Returns:
None: The return value of this handler is ignored.
"""
with open('test.txt', 'a+') as f:
f.write(self.name+str(retval)+'\n')
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Error handler.
This is run by the worker when the task fails.
Arguments:
exc (Exception): The exception raised by the task.
task_id (str): Unique id of the failed task.
args (Tuple): Original arguments for the task that failed.
kwargs (Dict): Original keyword arguments for the task that failed.
einfo (~billiard.einfo.ExceptionInfo): Exception information.
Returns:
None: The return value of this handler is ignored.
"""
dd_notice(
{'alert_reason': 'celery beat exec failure', 'Exception': exc, 'task_id': task_id, 'task_name': self.name,
'args': args, 'kwargs': kwargs, 'einfo': einfo, 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M%S')}, task_failure_dd_uri)
然后将任务代码改为
@app.task(base=CallbackTask)
def xx():
"""
:return:
"""
from tasks.assignments.xx import XX
return XX().run()
监听FAILURE事件
把上面监听task-succeeded的地方改为task-failure
总结
至此我们可以用celery执行定时任务,并且用event进行任务执行成功/失败进行相应的处理,也可以直接使用自定义的Task类,直接处理。