Consumer创建
Consumer与Worker类似,都需要使用Blueprint进行创建,我们来简单浏览一下Consumer的init函数
class Consumer(object):
"""Consumer blueprint."""
def __init__(self, on_task_request,
init_callback=noop, hostname=None,
pool=None, app=None,
timer=None, controller=None, hub=None, amqheartbeat=None,
worker_options=None, disable_rate_limits=False,
initial_prefetch_count=2, prefetch_multiplier=1, **kwargs):
self.app = app
self.controller = controller
self.init_callback = init_callback
self.hostname = hostname or gethostname()
self.pid = os.getpid()
self.pool = pool
self.timer = timer
self.strategies = self.Strategies()
self.conninfo = self.app.connection_for_read()
self.connection_errors = self.conninfo.connection_errors
self.channel_errors = self.conninfo.channel_errors
self._restart_state = restart_state(maxR=5, maxT=1)
self._does_info = logger.isEnabledFor(logging.INFO)
self._limit_order = 0
self.on_task_request = on_task_request
self.on_task_message = set()
self.amqheartbeat_rate = self.app.conf.broker_heartbeat_checkrate
self.disable_rate_limits = disable_rate_limits
self.initial_prefetch_count = initial_prefetch_count
self.prefetch_multiplier = prefetch_multiplier
# this contains a tokenbucket for each task type by name, used for
# rate limits, or None if rate limits are disabled for that task.
self.task_buckets = defaultdict(lambda: None)
self.reset_rate_limits()
self.hub = hub
if self.hub or getattr(self.pool, 'is_green', False):
self.amqheartbeat = amqheartbeat
if self.amqheartbeat is None:
self.amqheartbeat = self.app.conf.broker_heartbeat
else:
self.amqheartbeat = 0
if not hasattr(self, 'loop'):
self.loop = loops.asynloop if hub else loops.synloop
if _detect_environment() == 'gevent':
# there's a gevent bug that causes timeouts to not be reset,
# so if the connection timeout is exceeded once, it can NEVER
# connect again.
self.app.conf.broker_connection_timeout = None
self._pending_operations = []
self.steps = [] # 进行一些初始化工作
self.blueprint = self.Blueprint(
steps=self.app.steps['consumer'],
on_close=self.on_close,
) # 创建blueprint
self.blueprint.apply(self, **dict(worker_options or {}, **kwargs)) # 使用blueprint进行初始化
这个函数主要进行了三项工作:
- 初始化属性
- 创建blueprint
- 使用blueprint进行初始化
接下来我们把目光转向Consumer类创建时Blueprint需要执行的步骤
class Blueprint(bootsteps.Blueprint):
"""Consumer blueprint."""
name = 'Consumer'
default_steps = [
'celery.worker.consumer.connection:Connection',
'celery.worker.consumer.mingle:Mingle',
'celery.worker.consumer.events:Events',
'celery.worker.consumer.gossip:Gossip',
'celery.worker.consumer.heart:Heart',
'celery.worker.consumer.control:Control',
'celery.worker.consumer.tasks:Tasks',
'celery.worker.consumer.consumer:Evloop',
'celery.worker.consumer.agent:Agent',
]
可以看到,形式上与Worker启动时的Blueprint类似。我们首先梳理一下各个步骤的依赖关系
与Worker一样,Consumer也会执行Blueprint实例的apply函数,初始化各个Step,创建顺序为
Connection
Events
Heart
Mingle
Tasks
Control
Gossip
Agent
event loop
通过Blueprint创建完各个组件后,Consumer的创建流程完成,开始下一步的启动操作。
组件创建流程
与之前Worker类一样,我们来分析Consumer类创建的过程中,各个组件的创建流程