1.Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
简单说就是分布式的任务队列
2.消息队列与任务队列区别
可以看我另一篇文章
https://www.jianshu.com/p/cde93d4d00c8
3.在我看来,消息队列和任务队列 主要能解决以下场景的问题:
非实时数据离线计算业务逻辑拆分,降低耦合异步响应数据,提高用户体验其实在知乎,很多行为,比如你点一个赞,写一篇回答,发表一篇文章,启用匿名等等,背后都会有很多消息发送出去。不同的业务方收到后做不同的处理,比如算法会计算回答、文章的分值,做一些标签的判定,后台系统可能会记录回答,文章的 meta 信息等等。
4. 什么是 celery?
接触 python 的同学肯定不会陌生,即便没有用过,但应该也会听说过。celery 是 python 世界中最有名的开源消息队列框架。他之所以特别火爆,主要在于它实现了以下几点:性能高,吞吐量大配置灵活,简单易用文档齐全,配套完善其实除了以上的优点外,celery 的源码也有很大的价值,可以说是软件工程设计模式的典范。不过在这篇文章中我不会涉及源码的解读,更多的会介绍下 celery 架构设计中的一些哲学。
5. 高性能:多进程事件驱动的异步模型说到高性能
尤其是对于消息队列来说,很多人不以为然,认为高性能是没有意义的。其实这也不无道理。主要表现为以下几点:离线任务本身不要求实时性,一秒处理 100 和一秒处理 1000 个任务没有本质的区别。如果非要提高吞吐量,可以通过扩容等更加灵活的手段。而且资源的开销并不是很大。吞吐量过高,反而有可能是坏事。比如把上游业务打崩,把 mysql 打崩,把系统连接打满等等。虽然对于离线队列来说,性能不重要。
但是,这并不妨碍我们从学习的角度看待 celery 的架构原则。
celery 的高性能主要靠两个方面来保证,一个是多进程,一个是事件驱动。
下面我分别来讲一下他们的设计思想。珠玉在前很多人应该对 nginx 不陌生,提到 nginx,我们首先想到的,就是 nginx 是一个高性能的反向代理服务器。而 celery,可以说相当程度上借鉴了 nginx。
6.消费模型celery 的核心架构,
分成了调度器(master/main process) 和 工作进程(slaves/worker processes),也就是我们常说的主从。
celery 的消费模型很简单,调度器负责任务的获取,分发,工作进程(slaves/worker processes)的管理(创建,增加,关闭,重启,丢弃等等),其他辅助模块的维护等等。
工作进程负责消费从调度器传递过来的任务。具体流程:调度器首先预生成(prefork)工作进程,做为一个进程池(mutiprocessing-pool),之后通过事件驱动(select/poll/epoll)的方式,监听内核的事件(读、写、异常等等),如果监听到就执行对应的回调,源源不断的从 中间人(broker)那里提取任务,并通过 管道(pipe)作为进程间通讯的方式,运用一系列的路由策略(round-robin、weight 等等)交给工作进程。
工作进程消费(ack)任务,再通过管道向调度器进行状态同步(sync),进程间通讯等等行为。
当然,这只是一个很粗粒度的描述,其实 celery 内部还实现了很多有趣的功能,比如 prefetch,集群监控与管理,auto-scaler,容灾恢复等等,这些非核心功能的模块暂时还不会涉及,以后可以单独拆出来看他是怎么实现的。
7.高效的理由可以思考一下,为什么这种架构方式性能非常高。
首先,我们分析下调度器。调度器是一个事件驱动模型,什么事事件驱动,其实就是它消灭了阻塞。正常的单线程模型,一次只能拿一条消息,每一次都要走一条来和回的链路,并且需要一个 while True 的循环不断的去检测,这样无疑是非常低效且开销大的。而事件驱动则不这样,他可以同时发送多个检测的信号,然后就直接挂起,等待内核进行提示,有提示再去执行对应的回调。这样既优雅的化解了单线程每次都要检测的 while True,又通过多次请求并发降低了重复链路。
然后,我们看一下工作进程用多进程的优势。业内有经验的工程师,在配置容器的时候,经常会使用 n 核,n*m worker 数的配置。这是因为,多进程可以良好的发挥每个核的计算能力。
而且多进程良好的分摊了并发请求的处理压力,同时,多进程内部,还可以使用多线程、异步等方式
这样,可以在充分利用多核计算优势的基础上,再充分利用单个线程非阻塞模型的优势。好,关于 celery 的设计架构大概就讲到这里,之后会从源码的角度分析下上面的那一系列流程是怎么实现的
[celery执行器--CeleryExecutor]
由于celery任务执行器不能并行执行,因此开启多进程进行执行
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Celery executor."""
import math
import os
import subprocess
import time
import traceback
from multiprocessing import Pool, cpu_count
from typing import Any, List, Optional, Tuple, Union
from celery import Celery, Task, states as celery_states
from celery.result import AsyncResult
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor, CommandType
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKeyType, TaskInstanceStateType
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
from airflow.utils.timeout import timeout
# Make it constant for unit test.
CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task state'
CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task'
'''
To start the celery worker, run the command:
airflow celery worker
'''
if conf.has_option('celery', 'celery_config_options'):
celery_configuration = import_string(
conf.get('celery', 'celery_config_options')
)
else:
celery_configuration = DEFAULT_CELERY_CONFIG
#app实例
app = Celery(
conf.get('celery', 'CELERY_APP_NAME'),
config_source=celery_configuration)
@app.task
def execute_command(command_to_exec: CommandType) -> None:
"""
需要执行的命令
Executes command.
subprocess.check_call(args, *, stdin = None, stdout = None, stderr = None, shell = False)
与call方法类似,不同在于如果命令行执行成功,check_call返回返回码0,否则抛出subprocess.CalledProcessError异常。
subprocess.CalledProcessError异常包括returncode、cmd、output等属性,其中returncode是子进程的退出码,cmd是子进程的执行命令,output为None。
当子进程退出异常时,则报错
"""
log = LoggingMixin().log
log.info("Executing command in Celery: %s", command_to_exec)
env = os.environ.copy()
try:
subprocess.check_call(command_to_exec, stderr=subprocess.STDOUT,
close_fds=True, env=env)
except subprocess.CalledProcessError as e:
log.exception('execute_command encountered a CalledProcessError')
log.error(e.output)
raise AirflowException('Celery command failed')
class ExceptionWithTraceback:
"""
包装器
Wrapper class used to propagate exceptions to parent processes from subprocesses.
:param exception: The exception to wrap
:type exception: Exception
:param exception_traceback: The stacktrace to wrap
:type exception_traceback: str
"""
def __init__(self, exception: Exception, exception_traceback: str):
self.exception = exception
self.traceback = exception_traceback
def fetch_celery_task_state(celery_task: Tuple[TaskInstanceKeyType, AsyncResult]) \
-> Union[TaskInstanceStateType, ExceptionWithTraceback]:
"""
返回任务状态
Fetch and return the state of the given celery task. The scope of this function is
global so that it can be called by subprocesses in the pool.
:param celery_task: a tuple of the Celery task key and the async Celery object used
to fetch the task's state
:type celery_task: tuple(str, celery.result.AsyncResult)
:return: a tuple of the Celery task key and the Celery state of the task
:rtype: tuple[str, str]
"""
try:
with timeout(seconds=2):
# Accessing state property of celery task will make actual network request
# to get the current state of the task.
return celery_task[0], celery_task[1].state
except Exception as e: # pylint: disable=broad-except
exception_traceback = "Celery Task ID: {}\n{}".format(celery_task[0],
traceback.format_exc())
return ExceptionWithTraceback(e, exception_traceback)
# Task instance that is sent over Celery queues
# TaskInstanceKeyType, SimpleTaskInstance, Command, queue_name, CallableTask
TaskInstanceInCelery = Tuple[TaskInstanceKeyType, SimpleTaskInstance, CommandType, Optional[str], Task]
def send_task_to_executor(task_tuple: TaskInstanceInCelery) \
-> Tuple[TaskInstanceKeyType, CommandType, Union[AsyncResult, ExceptionWithTraceback]]:
"""
发送任务到celery执行器
Sends task to executor."""
key, _, command, queue, task_to_run = task_tuple
try:
with timeout(seconds=2):
#异步执行
result = task_to_run.apply_async(args=[command], queue=queue)
except Exception as e: # pylint: disable=broad-except
exception_traceback = "Celery Task ID: {}\n{}".format(key, traceback.format_exc())
result = ExceptionWithTraceback(e, exception_traceback)
return key, command, result
class CeleryExecutor(BaseExecutor):
"""
CeleryExecutor is recommended for production use of Airflow. It allows
distributing the execution of task instances to multiple worker nodes.
Celery is a simple, flexible and reliable distributed system to process
vast amounts of messages, while providing operations with the tools
required to maintain such a system.
celery执行器只能同步执行(不能调用execute_async),不能异步执行;由于开启了多进程, 因此加速了执行。
由于celery本身是异步的,本质上来说,还是异步执行
"""
def __init__(self):
super().__init__()
# Celery doesn't support querying the state of multiple tasks in parallel
# (which can become a bottleneck on bigger clusters) so we use
# a multiprocessing pool to speed this up.
# How many worker processes are created for checking celery task state.
self._sync_parallelism = conf.getint('celery', 'SYNC_PARALLELISM')
if self._sync_parallelism == 0:
self._sync_parallelism = max(1, cpu_count() - 1)
self._sync_pool = None
#正在运行的任务
self.tasks = {}
#最近的状态
self.last_state = {}
def start(self) -> None:
self.log.debug(
'Starting Celery Executor using %s processes for syncing',
self._sync_parallelism
)
def _num_tasks_per_send_process(self, to_send_count: int) -> int:
"""
每个进程多少个任务
任务数量 / 并行度
How many Celery tasks should each worker process send.
:return: Number of tasks that should be sent per process
:rtype: int
"""
return max(1,
int(math.ceil(1.0 * to_send_count / self._sync_parallelism)))
def _num_tasks_per_fetch_process(self) -> int:
"""
How many Celery tasks should be sent to each worker process.
一次发送多少任务
:return: Number of tasks that should be used per process
:rtype: int
"""
return max(1, int(math.ceil(1.0 * len(self.tasks) / self._sync_parallelism)))
def trigger_tasks(self, open_slots: int) -> None:
"""
触发任务
Overwrite trigger_tasks function from BaseExecutor
:param open_slots: Number of open slots
:return:
"""
sorted_queue = self.order_queued_tasks_by_priority()
task_tuples_to_send: List[TaskInstanceInCelery] = []
for _ in range(min((open_slots, len(self.queued_tasks)))):
key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
task_tuples_to_send.append((key, simple_ti, command, queue, execute_command))
cached_celery_backend = None
if task_tuples_to_send:
tasks = [t[4] for t in task_tuples_to_send]
# Celery state queries will stuck if we do not use one same backend for all tasks.
cached_celery_backend = tasks[0].backend
if task_tuples_to_send:
# Use chunks instead of a work queue to reduce context switching
# since tasks are roughly uniform in size
chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))
#进程数量
num_processes = min(len(task_tuples_to_send), self._sync_parallelism)
#进程池
send_pool = Pool(processes=num_processes)
key_and_async_results = send_pool.map(
send_task_to_executor,
task_tuples_to_send,
chunksize=chunksize)
send_pool.close()
send_pool.join()
self.log.debug('Sent all tasks.')
for key, command, result in key_and_async_results:
if isinstance(result, ExceptionWithTraceback):
self.log.error(
CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
)
elif result is not None:
# Only pops when enqueued successfully, otherwise keep it
# and expect scheduler loop to deal with it.
self.queued_tasks.pop(key)
result.backend = cached_celery_backend
self.running.add(key)
self.tasks[key] = result
self.last_state[key] = celery_states.PENDING
def sync(self) -> None:
"""同步结果"""
num_processes = min(len(self.tasks), self._sync_parallelism)
if num_processes == 0:
self.log.debug("No task to query celery, skipping sync")
return
self.log.debug("Inquiring about %s celery task(s) using %s processes",
len(self.tasks), num_processes)
# Recreate the process pool each sync in case processes in the pool die
self._sync_pool = Pool(processes=num_processes)
# Use chunks instead of a work queue to reduce context switching since tasks are
# roughly uniform in size
chunksize = self._num_tasks_per_fetch_process()
self.log.debug("Waiting for inquiries to complete...")
#获取任务的结果
task_keys_to_states = self._sync_pool.map(
fetch_celery_task_state,
self.tasks.items(),
chunksize=chunksize)
self._sync_pool.close()
self._sync_pool.join()
self.log.debug("Inquiries completed.")
self.update_task_states(task_keys_to_states)
def update_task_states(self,
task_keys_to_states: List[Union[TaskInstanceStateType,
ExceptionWithTraceback]]) -> None:
"""
更新所有任务状态
Updates states of the tasks."""
for key_and_state in task_keys_to_states:
if isinstance(key_and_state, ExceptionWithTraceback):
self.log.error(
CELERY_FETCH_ERR_MSG_HEADER + ", ignoring it:%s\n%s\n",
repr(key_and_state.exception), key_and_state.traceback
)
continue
key, state = key_and_state
self.update_task_state(key, state)
def update_task_state(self, key: TaskInstanceKeyType, state: str) -> None:
"""
更新任务状态
Updates state of a single task."""
# noinspection PyBroadException
try:
#只有状态发生变化才处理
if self.last_state[key] != state:
if state == celery_states.SUCCESS:
self.success(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.FAILURE:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.REVOKED:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
else:
self.log.info("Unexpected state: %s", state)
self.last_state[key] = state
except Exception: # pylint: disable=broad-except
self.log.exception("Error syncing the Celery executor, ignoring it.")
def end(self, synchronous: bool = False) -> None:
if synchronous:
while any([task.state not in celery_states.READY_STATES for task in self.tasks.values()]):
time.sleep(5)
self.sync()
def execute_async(self,
key: TaskInstanceKeyType,
command: CommandType,
queue: Optional[str] = None,
executor_config: Optional[Any] = None):
"""
不允许异步运行
Do not allow async execution for Celery executor."""
raise AirflowException("No Async execution for Celery executor.")
def terminate(self):
pass