在openstack 启动rpc server一般都是调用oslo.message库的get_rpc_server方法。
def get_server(target, endpoints, serializer=None):
target = oslo_messaging.Target(server=cfg.CONF.host, topic=topic)
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
其中target是调用oslo_messaging.Target方法,里面定义了一些topic,server,exchange等信息,当创建RPC Server的target时候,topic 和 server是必选的,exchange是可选的。
Target类的文件路径oslo_messaging.target.Target
class Target(object):
"""
初始化Target
"""
def __init__(self, exchange=None, topic=None, namespace=None,
version=None, server=None, fanout=None,
legacy_namespaces=None):
self.exchange = exchange
self.topic = topic
self.namespace = namespace
self.version = version
self.server = server
self.fanout = fanout
self.accepted_namespaces = [namespace] + (legacy_namespaces or [])
def __call__(self, **kwargs):
for a in ('exchange', 'topic', 'namespace',
'version', 'server', 'fanout'):
kwargs.setdefault(a, getattr(self, a))
return Target(**kwargs)
def __eq__(self, other):
return vars(self) == vars(other)
def __ne__(self, other):
return not self == other
def __repr__(self):
attrs = []
for a in ['exchange', 'topic', 'namespace',
'version', 'server', 'fanout']:
v = getattr(self, a)
if v:
attrs.append((a, v))
values = ', '.join(['%s=%s' % i for i in attrs])
return '<Target ' + values + '>'
def __hash__(self):
return id(self)
当rpc_server调用start()方法的时候,就rpc server就开始启动监听了。RPCServer类未实现start方法,到父类MessageHandlingServer看start()实现
文件路径oslo_messaging.rpc.server.RPCServer
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'):
super(RPCServer, self).__init__(transport, dispatcher, executor)
if not isinstance(transport, msg_transport.RPCTransport):
LOG.warning("Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport "
"instance.")
self._target = target
def _create_listener(self):
return self.transport._listen(self._target, 1, None)
def _process_incoming(self, incoming):
message = incoming[0]
# TODO(sileht): We should remove that at some point and do
# this directly in the driver
try:
message.acknowledge()
except Exception:
LOG.exception("Can not acknowledge message. Skip processing")
return
failure = None
try:
res = self.dispatcher.dispatch(message)
except rpc_dispatcher.ExpectedException as e:
failure = e.exc_info
LOG.debug(u'Expected exception during message handling (%s)', e)
except Exception:
# current sys.exc_info() content can be overridden
# by another exception raised by a log handler during
# LOG.exception(). So keep a copy and delete it later.
failure = sys.exc_info()
LOG.exception('Exception during message handling')
try:
if failure is None:
message.reply(res)
else:
message.reply(failure=failure)
except exceptions.MessageUndeliverable as e:
LOG.exception(
"MessageUndeliverable error, "
"source exception: %s, routing_key: %s, exchange: %s: ",
e.exception, e.routing_key, e.exchange
)
except Exception:
LOG.exception("Can not send reply for message")
finally:
# NOTE(dhellmann): Remove circular object reference
# between the current stack frame and the traceback in
# exc_info.
del failure
文件路径oslo_messaging.server.MessageHandlingServer.start。父类start实现,关键代码self.listener = self._create_listener()方法,创建listener
@ordered(reset_after='stop')
def start(self, override_pool_size=None):
"""
MessageHandlingServer.start实现
"""
if self._started:
LOG.warning('The server has already been started. Ignoring '
'the redundant call to start().')
return
self._started = True
executor_opts = {}
if self.executor_type in ("threading", "eventlet"):
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)
try:
self.listener = self._create_listener()
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
self.listener.start(self._on_incoming)
_create_listener函数文件路径oslo_messaging.rpc.server.RPCServer._create_listener
def _create_listener(self):
return self.transport._listen(self._target, 1, None)
调用文件路径oslo_messaging.transport.Transport._listen的方法,这边如果rpc server未初始化target.topic和target.server,函数会抛异常。
def _listen(self, target, batch_size, batch_timeout):
if not (target.topic and target.server):
raise exceptions.InvalidTarget('A server\'s target must have '
'topic and server names specified',
target)
return self._driver.listen(target, batch_size,
batch_timeout)
目前openstack MQ driver是amqp,调用文件位置oslo_messaging._drivers.amqpdriver.AMQPDriverBase.listen ,里面会创建3个消费者出来,两个topic类型的consumer,一个fanout的类型consumer
def listen(self, target, batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = RpcAMQPListener(self, conn)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic,
target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)
return base.PollStyleListenerAdapter(listener, batch_size,
batch_timeout)
文件路径:oslo_messaging._drivers.impl_rabbit.Connection.declare_topic_consumer
topic类型的queue_name为topic和topic.server组合
def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None):
"""Create a 'topic' consumer."""
consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer)
fanout类型的queue_name:'%s_fanout_%s' % (topic, unique)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer."""
unique = uuid.uuid4().hex
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)
consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name,
routing_key=topic,
type='fanout',
durable=False,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
self.declare_consumer(consumer)
我们在rpc client 处理消息的时候,可以有三种queue_name可以让我们处理,我们选择合适的类型和queue_name即可