openstack组件oslo.message之RPCServer实现细节

在openstack 启动rpc server一般都是调用oslo.message库的get_rpc_server方法。

def get_server(target, endpoints, serializer=None):
    target = oslo_messaging.Target(server=cfg.CONF.host, topic=topic)
    serializer = RequestContextSerializer(serializer)
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer)

其中target是调用oslo_messaging.Target方法,里面定义了一些topic,server,exchange等信息,当创建RPC Server的target时候,topic 和 server是必选的,exchange是可选的。
Target类的文件路径oslo_messaging.target.Target

class Target(object):
"""
    初始化Target
"""
    def __init__(self, exchange=None, topic=None, namespace=None,
                 version=None, server=None, fanout=None,
                 legacy_namespaces=None):
        self.exchange = exchange
        self.topic = topic
        self.namespace = namespace
        self.version = version
        self.server = server
        self.fanout = fanout
        self.accepted_namespaces = [namespace] + (legacy_namespaces or [])

    def __call__(self, **kwargs):
        for a in ('exchange', 'topic', 'namespace',
                  'version', 'server', 'fanout'):
            kwargs.setdefault(a, getattr(self, a))
        return Target(**kwargs)

    def __eq__(self, other):
        return vars(self) == vars(other)

    def __ne__(self, other):
        return not self == other

    def __repr__(self):
        attrs = []
        for a in ['exchange', 'topic', 'namespace',
                  'version', 'server', 'fanout']:
            v = getattr(self, a)
            if v:
                attrs.append((a, v))
        values = ', '.join(['%s=%s' % i for i in attrs])
        return '<Target ' + values + '>'

    def __hash__(self):
        return id(self)

当rpc_server调用start()方法的时候,就rpc server就开始启动监听了。RPCServer类未实现start方法,到父类MessageHandlingServer看start()实现
文件路径oslo_messaging.rpc.server.RPCServer

class RPCServer(msg_server.MessageHandlingServer):
    def __init__(self, transport, target, dispatcher, executor='blocking'):
        super(RPCServer, self).__init__(transport, dispatcher, executor)
        if not isinstance(transport, msg_transport.RPCTransport):
            LOG.warning("Using notification transport for RPC. Please use "
                        "get_rpc_transport to obtain an RPC transport "
                        "instance.")
        self._target = target

    def _create_listener(self):
        return self.transport._listen(self._target, 1, None)

    def _process_incoming(self, incoming):
        message = incoming[0]

        # TODO(sileht): We should remove that at some point and do
        # this directly in the driver
        try:
            message.acknowledge()
        except Exception:
            LOG.exception("Can not acknowledge message. Skip processing")
            return

        failure = None
        try:
            res = self.dispatcher.dispatch(message)
        except rpc_dispatcher.ExpectedException as e:
            failure = e.exc_info
            LOG.debug(u'Expected exception during message handling (%s)', e)
        except Exception:
            # current sys.exc_info() content can be overridden
            # by another exception raised by a log handler during
            # LOG.exception(). So keep a copy and delete it later.
            failure = sys.exc_info()
            LOG.exception('Exception during message handling')

        try:
            if failure is None:
                message.reply(res)
            else:
                message.reply(failure=failure)
        except exceptions.MessageUndeliverable as e:
            LOG.exception(
                "MessageUndeliverable error, "
                "source exception: %s, routing_key: %s, exchange: %s: ",
                e.exception, e.routing_key, e.exchange
            )
        except Exception:
            LOG.exception("Can not send reply for message")
        finally:
            # NOTE(dhellmann): Remove circular object reference
            # between the current stack frame and the traceback in
            # exc_info.
            del failure

文件路径oslo_messaging.server.MessageHandlingServer.start。父类start实现,关键代码self.listener = self._create_listener()方法,创建listener

 @ordered(reset_after='stop')
    def start(self, override_pool_size=None):
        """
        MessageHandlingServer.start实现
        """
        if self._started:
            LOG.warning('The server has already been started. Ignoring '
                        'the redundant call to start().')
            return

        self._started = True

        executor_opts = {}

        if self.executor_type in ("threading", "eventlet"):
            executor_opts["max_workers"] = (
                override_pool_size or self.conf.executor_thread_pool_size
            )
        self._work_executor = self._executor_cls(**executor_opts)

        try:
            self.listener = self._create_listener()
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        self.listener.start(self._on_incoming)

_create_listener函数文件路径oslo_messaging.rpc.server.RPCServer._create_listener

    def _create_listener(self):
        return self.transport._listen(self._target, 1, None)

调用文件路径oslo_messaging.transport.Transport._listen的方法,这边如果rpc server未初始化target.topic和target.server,函数会抛异常。

    def _listen(self, target, batch_size, batch_timeout):
        if not (target.topic and target.server):
            raise exceptions.InvalidTarget('A server\'s target must have '
                                           'topic and server names specified',
                                           target)
        return self._driver.listen(target, batch_size,
                                   batch_timeout)

目前openstack MQ driver是amqp,调用文件位置oslo_messaging._drivers.amqpdriver.AMQPDriverBase.listen ,里面会创建3个消费者出来,两个topic类型的consumer,一个fanout的类型consumer

    def listen(self, target, batch_size, batch_timeout):
        conn = self._get_connection(rpc_common.PURPOSE_LISTEN)

        listener = RpcAMQPListener(self, conn)

        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic=target.topic,
                                    callback=listener)
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic='%s.%s' % (target.topic,
                                                     target.server),
                                    callback=listener)
        conn.declare_fanout_consumer(target.topic, listener)

        return base.PollStyleListenerAdapter(listener, batch_size,
                                             batch_timeout)

文件路径:oslo_messaging._drivers.impl_rabbit.Connection.declare_topic_consumer
topic类型的queue_name为topic和topic.server组合

    def declare_topic_consumer(self, exchange_name, topic, callback=None,
                               queue_name=None):
        """Create a 'topic' consumer."""
        consumer = Consumer(exchange_name=exchange_name,
                            queue_name=queue_name or topic,
                            routing_key=topic,
                            type='topic',
                            durable=self.amqp_durable_queues,
                            exchange_auto_delete=self.amqp_auto_delete,
                            queue_auto_delete=self.amqp_auto_delete,
                            callback=callback,
                            rabbit_ha_queues=self.rabbit_ha_queues)

        self.declare_consumer(consumer)

fanout类型的queue_name:'%s_fanout_%s' % (topic, unique)

    def declare_fanout_consumer(self, topic, callback):
        """Create a 'fanout' consumer."""

        unique = uuid.uuid4().hex
        exchange_name = '%s_fanout' % topic
        queue_name = '%s_fanout_%s' % (topic, unique)

        consumer = Consumer(exchange_name=exchange_name,
                            queue_name=queue_name,
                            routing_key=topic,
                            type='fanout',
                            durable=False,
                            exchange_auto_delete=True,
                            queue_auto_delete=False,
                            callback=callback,
                            rabbit_ha_queues=self.rabbit_ha_queues,
                            rabbit_queue_ttl=self.rabbit_transient_queues_ttl)

        self.declare_consumer(consumer)

我们在rpc client 处理消息的时候,可以有三种queue_name可以让我们处理,我们选择合适的类型和queue_name即可

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容