cinder-volume是如何实现rpc消息到对应方法的

rpc消息到manage method的过程是oslo.messaging实现的
OpenStack的oslo.messaging wiki十分详细的解释了这个流程,也没有比这个来得更加正式和官方
https://wiki.openstack.org/wiki/Oslo/Messaging#Server_Side_API
Openstack中其他组件的rpc server实现方式基本相同

cinder-volume的初始化代码在cinder/service.py Service.start

    def start(self):
...
        target = messaging.Target(topic=self.topic, server=self.host)
        endpoints = [self.manager]
        self.rpcserver = rpc.get_server(target, endpoints)
        self.rpcserver.start()
...

get_server调用到oslo_messaging/rpc/server.py get_rpc_server

def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None):
    """Construct an RPC server.

    The executor parameter controls how incoming messages will be received and
    dispatched. By default, the most simple executor is used - the blocking
    executor.

    If the eventlet executor is used, the threading and time library need to be
    monkeypatched.

    :param transport: the messaging transport
    :type transport: Transport
    :param target: the exchange, topic and server to listen on
    :type target: Target
    :param endpoints: a list of endpoint objects
    :type endpoints: list
    :param executor: name of a message executor - for example
                     'eventlet', 'blocking'
    :type executor: str
    :param serializer: an optional entity serializer
    :type serializer: Serializer
    """
    dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
    return msg_server.MessageHandlingServer(transport, dispatcher, executor)

MessageHandlingServe类如下:

class MessageHandlingServer(object):
    """Server for handling messages.

    Connect a transport to a dispatcher that knows how to process the
    message using an executor that knows how the app wants to create
    new tasks.
    """

    def __init__(self, transport, dispatcher, executor='blocking'):
        """Construct a message handling server.

        The dispatcher parameter is a callable which is invoked with context
        and message dictionaries each time a message is received.

        The executor parameter controls how incoming messages will be received
        and dispatched. By default, the most simple executor is used - the
        blocking executor.

        :param transport: the messaging transport
        :type transport: Transport
        :param dispatcher: a callable which is invoked for each method
        :type dispatcher: callable
        :param executor: name of message executor - for example
                         'eventlet', 'blocking'
        :type executor: str
        """
        self.conf = transport.conf

        self.transport = transport
        self.dispatcher = dispatcher
        self.executor = executor

        try:
            mgr = driver.DriverManager('oslo.messaging.executors',
                                       self.executor)
        except RuntimeError as ex:
            raise ExecutorLoadFailure(self.executor, ex)
        else:
            self._executor_cls = mgr.driver
            self._executor = None

        super(MessageHandlingServer, self).__init__()

    def start(self):
        """Start handling incoming messages.

        This method causes the server to begin polling the transport for
        incoming messages and passing them to the dispatcher. Message
        processing will continue until the stop() method is called.

        The executor controls how the server integrates with the applications
        I/O handling strategy - it may choose to poll for messages in a new
        process, thread or co-operatively scheduled coroutine or simply by
        registering a callback with an event loop. Similarly, the executor may
        choose to dispatch messages in a new thread, coroutine or simply the
        current thread.
        """
        if self._executor is not None:
            return
        try:
            listener = self.dispatcher._listen(self.transport)
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        self._executor = self._executor_cls(self.conf, listener,
                                            self.dispatcher)
        self._executor.start()

cinder-volume使用eventlet类型的executor,所以消息分发在oslo.messaging/oslo_messaging/_executors/impl_eventlet.py中实现,入口函数为start()
每个消息,cinder-volume就会孵化一个协程去执行dispatcher方法,如下代码

    def _dispatch(self, incoming):
        spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool)

    def start(self):
        if self._thread is not None:
            return

        @excutils.forever_retry_uncaught_exceptions
        def _executor_thread():
            try:
                while self._running:
                    incoming = self.listener.poll()
                    if incoming is not None:
                        self._dispatch(incoming)
            except greenlet.GreenletExit:
                return

        self._running = True
        self._thread = eventlet.spawn(_executor_thread)

dispatcher对应oslo.messaging.rpc.RPCDispatcher,其中的\_\_call\_\_方法先回复rabbitmq的ack消息,然后再解析消息参数,获得指定的方法名,最后调用方法

@contextlib.contextmanager
    def __call__(self, incoming, executor_callback=None):
        incoming.acknowledge()
        yield lambda: self._dispatch_and_reply(incoming, executor_callback)

    def _dispatch_and_reply(self, incoming, executor_callback):
        try:
            incoming.reply(self._dispatch(incoming.ctxt,
                                          incoming.message,
                                          executor_callback))
        except ExpectedException as e:
            LOG.debug(u'Expected exception during message handling (%s)',
                      e.exc_info[1])
            incoming.reply(failure=e.exc_info, log_failure=False)
        except Exception as e:
            # sys.exc_info() is deleted by LOG.exception().
            exc_info = sys.exc_info()
            LOG.error(_('Exception during message handling: %s'), e,
                      exc_info=exc_info)
            incoming.reply(failure=exc_info)
            # NOTE(dhellmann): Remove circular object reference
            # between the current stack frame and the traceback in
            # exc_info.
            del exc_info

    def _dispatch(self, ctxt, message, executor_callback=None):
        """Dispatch an RPC message to the appropriate endpoint method.

        :param ctxt: the request context
        :type ctxt: dict
        :param message: the message payload
        :type message: dict
        :raises: NoSuchMethod, UnsupportedVersion
        """
        method = message.get('method')
        args = message.get('args', {})
        namespace = message.get('namespace')
        version = message.get('version', '1.0')

        found_compatible = False
        for endpoint in self.endpoints:
            target = getattr(endpoint, 'target', None)
            if not target:
                target = self._default_target

            if not (self._is_namespace(target, namespace) and
                    self._is_compatible(target, version)):
                continue

            if hasattr(endpoint, method):
                localcontext.set_local_context(ctxt)
                try:
                    return self._do_dispatch(endpoint, method, ctxt, args,
                                             executor_callback)
                finally:
                    localcontext.clear_local_context()

            found_compatible = True

        if found_compatible:
            raise NoSuchMethod(method)
        else:
            raise UnsupportedVersion(version, method=method)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容