openstack组件oslo.message之RPCServer实现细节

在openstack 启动rpc server一般都是调用oslo.message库的get_rpc_server方法。

def get_server(target, endpoints, serializer=None):
    target = oslo_messaging.Target(server=cfg.CONF.host, topic=topic)
    serializer = RequestContextSerializer(serializer)
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer)

其中target是调用oslo_messaging.Target方法,里面定义了一些topic,server,exchange等信息,当创建RPC Server的target时候,topic 和 server是必选的,exchange是可选的。
Target类的文件路径oslo_messaging.target.Target

class Target(object):
"""
    初始化Target
"""
    def __init__(self, exchange=None, topic=None, namespace=None,
                 version=None, server=None, fanout=None,
                 legacy_namespaces=None):
        self.exchange = exchange
        self.topic = topic
        self.namespace = namespace
        self.version = version
        self.server = server
        self.fanout = fanout
        self.accepted_namespaces = [namespace] + (legacy_namespaces or [])

    def __call__(self, **kwargs):
        for a in ('exchange', 'topic', 'namespace',
                  'version', 'server', 'fanout'):
            kwargs.setdefault(a, getattr(self, a))
        return Target(**kwargs)

    def __eq__(self, other):
        return vars(self) == vars(other)

    def __ne__(self, other):
        return not self == other

    def __repr__(self):
        attrs = []
        for a in ['exchange', 'topic', 'namespace',
                  'version', 'server', 'fanout']:
            v = getattr(self, a)
            if v:
                attrs.append((a, v))
        values = ', '.join(['%s=%s' % i for i in attrs])
        return '<Target ' + values + '>'

    def __hash__(self):
        return id(self)

当rpc_server调用start()方法的时候,就rpc server就开始启动监听了。RPCServer类未实现start方法,到父类MessageHandlingServer看start()实现
文件路径oslo_messaging.rpc.server.RPCServer

class RPCServer(msg_server.MessageHandlingServer):
    def __init__(self, transport, target, dispatcher, executor='blocking'):
        super(RPCServer, self).__init__(transport, dispatcher, executor)
        if not isinstance(transport, msg_transport.RPCTransport):
            LOG.warning("Using notification transport for RPC. Please use "
                        "get_rpc_transport to obtain an RPC transport "
                        "instance.")
        self._target = target

    def _create_listener(self):
        return self.transport._listen(self._target, 1, None)

    def _process_incoming(self, incoming):
        message = incoming[0]

        # TODO(sileht): We should remove that at some point and do
        # this directly in the driver
        try:
            message.acknowledge()
        except Exception:
            LOG.exception("Can not acknowledge message. Skip processing")
            return

        failure = None
        try:
            res = self.dispatcher.dispatch(message)
        except rpc_dispatcher.ExpectedException as e:
            failure = e.exc_info
            LOG.debug(u'Expected exception during message handling (%s)', e)
        except Exception:
            # current sys.exc_info() content can be overridden
            # by another exception raised by a log handler during
            # LOG.exception(). So keep a copy and delete it later.
            failure = sys.exc_info()
            LOG.exception('Exception during message handling')

        try:
            if failure is None:
                message.reply(res)
            else:
                message.reply(failure=failure)
        except exceptions.MessageUndeliverable as e:
            LOG.exception(
                "MessageUndeliverable error, "
                "source exception: %s, routing_key: %s, exchange: %s: ",
                e.exception, e.routing_key, e.exchange
            )
        except Exception:
            LOG.exception("Can not send reply for message")
        finally:
            # NOTE(dhellmann): Remove circular object reference
            # between the current stack frame and the traceback in
            # exc_info.
            del failure

文件路径oslo_messaging.server.MessageHandlingServer.start。父类start实现,关键代码self.listener = self._create_listener()方法,创建listener

 @ordered(reset_after='stop')
    def start(self, override_pool_size=None):
        """
        MessageHandlingServer.start实现
        """
        if self._started:
            LOG.warning('The server has already been started. Ignoring '
                        'the redundant call to start().')
            return

        self._started = True

        executor_opts = {}

        if self.executor_type in ("threading", "eventlet"):
            executor_opts["max_workers"] = (
                override_pool_size or self.conf.executor_thread_pool_size
            )
        self._work_executor = self._executor_cls(**executor_opts)

        try:
            self.listener = self._create_listener()
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        self.listener.start(self._on_incoming)

_create_listener函数文件路径oslo_messaging.rpc.server.RPCServer._create_listener

    def _create_listener(self):
        return self.transport._listen(self._target, 1, None)

调用文件路径oslo_messaging.transport.Transport._listen的方法,这边如果rpc server未初始化target.topic和target.server,函数会抛异常。

    def _listen(self, target, batch_size, batch_timeout):
        if not (target.topic and target.server):
            raise exceptions.InvalidTarget('A server\'s target must have '
                                           'topic and server names specified',
                                           target)
        return self._driver.listen(target, batch_size,
                                   batch_timeout)

目前openstack MQ driver是amqp,调用文件位置oslo_messaging._drivers.amqpdriver.AMQPDriverBase.listen ,里面会创建3个消费者出来,两个topic类型的consumer,一个fanout的类型consumer

    def listen(self, target, batch_size, batch_timeout):
        conn = self._get_connection(rpc_common.PURPOSE_LISTEN)

        listener = RpcAMQPListener(self, conn)

        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic=target.topic,
                                    callback=listener)
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic='%s.%s' % (target.topic,
                                                     target.server),
                                    callback=listener)
        conn.declare_fanout_consumer(target.topic, listener)

        return base.PollStyleListenerAdapter(listener, batch_size,
                                             batch_timeout)

文件路径:oslo_messaging._drivers.impl_rabbit.Connection.declare_topic_consumer
topic类型的queue_name为topic和topic.server组合

    def declare_topic_consumer(self, exchange_name, topic, callback=None,
                               queue_name=None):
        """Create a 'topic' consumer."""
        consumer = Consumer(exchange_name=exchange_name,
                            queue_name=queue_name or topic,
                            routing_key=topic,
                            type='topic',
                            durable=self.amqp_durable_queues,
                            exchange_auto_delete=self.amqp_auto_delete,
                            queue_auto_delete=self.amqp_auto_delete,
                            callback=callback,
                            rabbit_ha_queues=self.rabbit_ha_queues)

        self.declare_consumer(consumer)

fanout类型的queue_name:'%s_fanout_%s' % (topic, unique)

    def declare_fanout_consumer(self, topic, callback):
        """Create a 'fanout' consumer."""

        unique = uuid.uuid4().hex
        exchange_name = '%s_fanout' % topic
        queue_name = '%s_fanout_%s' % (topic, unique)

        consumer = Consumer(exchange_name=exchange_name,
                            queue_name=queue_name,
                            routing_key=topic,
                            type='fanout',
                            durable=False,
                            exchange_auto_delete=True,
                            queue_auto_delete=False,
                            callback=callback,
                            rabbit_ha_queues=self.rabbit_ha_queues,
                            rabbit_queue_ttl=self.rabbit_transient_queues_ttl)

        self.declare_consumer(consumer)

我们在rpc client 处理消息的时候,可以有三种queue_name可以让我们处理,我们选择合适的类型和queue_name即可

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容