GateApp启动流程---cdf::CMessageQueueBase

cde::CBigMessageQueue介绍

1679280193927.png

看继承关系:
继承于cdf::CMessageQueueBase和cdf::IWakeMessageLoopHandler两个类

cdf::CMessageQueueBase

先看cdf::CMessageQueueBase是啥:


image.png
        typedef cde_hash_map<SEntityId, IMessageHandlerPtr, entity_id_hash> MessageHandlerHandlerIdMap;
        //typedef std::map<SEntityId,IMessageHandlerPtr> MessageHandlerHandlerIdMap;
        typedef cde_hash_map<int,IMessageHandlerPtr> MessageHandlerCommandMap;
        typedef std::list<CMessageBlockPtr,CCdfAllocator<CMessageBlockPtr> > MessageBlockList;

MessageHandlerHandlerIdMap 是一个map 其中Key是SEntityId,value是 IMessageHandlerPtr
MessageHandlerCommandMap 是一个map 其中key是int value是IMessageHandlerPtr
MessageBlockList 是一个list 保存了CMessageBlockPtr

分别介绍这几个类:
SEntityId

image.png

继承于IMessageBase
image.png

可以看到 只是规定了每个消息需要实现的操作:”读写拷贝以及重载=符号
这个类有个友元类:CMessageBlock
image.png

这段代码定义了一个名为CMessageBlock的类,它继承了CRefShared类,因此CMessageBlock对象可以使用引用计数来管理内存,从而避免内存泄漏问题。

CMessageBlock类包含以下成员变量:
_messageHead:用于存储消息头信息的结构体。
_messageBase:指向实现了IMessageBase接口的对象的智能指针。
_os:指向CSerializeStream类对象的指针,用于序列化消息数据。
CMessageBlock类还包含了以下成员函数:
CMessageBlock():默认构造函数。
~CMessageBlock():虚析构函数。
initOS():初始化_os成员变量。
__write():将消息数据写入指定的序列化流。
__writeHead():将消息头信息写入指定的序列化流。
__writeBody():将消息体信息写入指定的序列化流。
__read():从指定的序列化流中读取消息数据。
__readHead():从指定的序列化流中读取消息头信息。
__readBody():从指定的序列化流中读取消息体信息。
operator=():赋值运算符重载函数,被声明为private,禁止对CMessageBlock对象进行赋值操作。
综上所述,CMessageBlock类是一个用于表示消息数据的类,它将消息头和消息体封装成一个对象,并且提供了序列化和反序列化消息数据的方法。

IMessageHandlerPtr

image.png

这段代码定义了一个名为IMessageHandler的抽象基类,它继承了CRefShared类,因此IMessageHandler对象可以使用引用计数来管理内存,从而避免内存泄漏问题。

IMessageHandler类包含以下成员变量:

_objectId:表示对象的唯一标识符。
IMessageHandler类还包含了以下成员函数:

IMessageHandler():默认构造函数。
~IMessageHandler():虚析构函数。
getId():获取对象的唯一标识符。
onMessage():处理收到的消息数据。
operator<():小于运算符重载函数,用于在容器中对IMessageHandler对象进行排序。
setId():设置对象的唯一标识符。
IMessageHandlerPtr是一个智能指针类型,它使用CHandle模板类来管理IMessageHandler对象的生命周期。可以使用IMessageHandlerPtr指针来操作IMessageHandler对象,而不必手动管理对象的内存。

综上所述,IMessageHandler类是一个用于处理消息的抽象基类,它定义了一个处理收到的消息数据的方法,并提供了对象唯一标识符的管理。IMessageHandlerPtr是一个智能指针类型,用于方便地管理IMessageHandler对象的生命周期。

CMessageBlockPtr:CMessageBlock的指针

class CMessageQueueBase
    {
    public:

        typedef cde_hash_map<SEntityId, IMessageHandlerPtr, entity_id_hash> MessageHandlerHandlerIdMap;
        //typedef std::map<SEntityId,IMessageHandlerPtr> MessageHandlerHandlerIdMap;
        typedef cde_hash_map<int,IMessageHandlerPtr> MessageHandlerCommandMap;
        typedef std::list<CMessageBlockPtr,CCdfAllocator<CMessageBlockPtr> > MessageBlockList;
        //typedef std::list<CMessageBlockPtr> MessageBlockList;

        CMessageQueueBase( IWakeMessageLoopHandler* messageLoopPtr );
        virtual ~CMessageQueueBase();

        /**
        * to regist handle to proccess message
        * if the MESSAGE_ID and common regist to throw command
        */
        int registeHandler(const SEntityId& id, const IMessageHandlerPtr& handler);

        /**
        * if the command not execute by id
        * the command to excecute to by command
        */
        int registeHandler( const int command , const IMessageHandlerPtr& handler );

        /**
        * remove handles
        */
        int removeHandler(const SEntityId& id);

        /**
        * remove handles
        */
        int removeHandler( const int command );

        /**
        * push message to the end
        */
        void pushMessage( const CMessageBlockPtr& messageBlock );

        /**
        * push message to the end
        */
        void pushMessage( int cmd , const IMessageBasePtr& messageBase );

        /**
        * push message to the end
        */
        void pushMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase);

        /**
        * push message to the end
        */
        void sendMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase);

        //to send message 
        void sendMessage( const CMessageBlockPtr& messageBlock );

        /**
        * push message to the front
        */
        void pushMessageFront( CMessageBlockPtr& messageBlock );

        /***
        * distribute message
        */
        void distributeMessage();

        /**
        * break on error
        */
        void setBreakOnError( bool breakOnError ){ _breakOnError = breakOnError; }
        bool isBreakOnError() const { return _breakOnError; }

    protected:
        CMessageQueueBase();

        CLightLock _idMessageHandlersLock;
        MessageHandlerHandlerIdMap _idMessageHandlers;

        CLightLock _commandMessageHandlersLock;
        MessageHandlerCommandMap _commandMessageHandlers;

        CLightLock _messageReadListLock;
        MessageBlockList* _messageReadList;
        
        CLightLock _messageWriteListLock;
        MessageBlockList* _messageWriteList;

        IWakeMessageLoopHandler* _messageLoopPtr;


        bool _breakOnError;

    };

这段代码定义了一个消息队列基类 CMessageQueueBase,用于存储和分发消息。以下是该类的主要成员函数和数据成员的解释:

成员函数:

registeHandler(const SEntityId& id, const IMessageHandlerPtr& handler):注册消息处理器,将给定的 IMessageHandlerPtr 实例与给定的 SEntityId 实例相关联。
registeHandler(const int command, const IMessageHandlerPtr& handler):注册消息处理器,将给定的 IMessageHandlerPtr 实例与给定的 command 相关联。
removeHandler(const SEntityId& id):从队列中移除与给定 SEntityId 相关联的消息处理器。
removeHandler(const int command):从队列中移除与给定 command 相关联的消息处理器。
pushMessage(const CMessageBlockPtr& messageBlock):将给定的 CMessageBlockPtr 实例添加到消息队列的末尾。
pushMessage(int cmd, const IMessageBasePtr& messageBase):构造一个消息块 CMessageBlock 实例,并将其添加到消息队列的末尾。
pushMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase):构造一个消息块 CMessageBlock 实例,并将其添加到消息队列的末尾。该消息将被分配给给定的 SEntityId。
sendMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase):将给定的消息块直接发送给给定的 SEntityId。
sendMessage(const CMessageBlockPtr& messageBlock):将给定的消息块直接发送给其 SEntityId。
pushMessageFront(CMessageBlockPtr& messageBlock):将给定的 CMessageBlockPtr 实例添加到消息队列的开头。
distributeMessage():分发当前在队列中的所有消息。
setBreakOnError(bool breakOnError):设置队列是否在出现错误时中断。
isBreakOnError() const:获取队列当前是否在出现错误时中断。
数据成员:

MessageHandlerHandlerIdMap _idMessageHandlers:用于存储与 SEntityId 相关联的 IMessageHandlerPtr 实例的映射。
MessageHandlerCommandMap _commandMessageHandlers:用于存储与 command 相关联的 IMessageHandlerPtr 实例的映射。
MessageBlockList* _messageReadList:指向消息队列中要读取的消息列表的指针。
MessageBlockList* _messageWriteList:指向消息队列中要写入的消息列表的指针。
IWakeMessageLoopHandler* _messageLoopPtr:指向与此消息队列相关联的消息循环的指针。
bool _breakOnError:表示队列是否在出现错误时中断。

成员方法
构造函数:

image.png

生成两个list _messageReadList和_messageWriteList

注册处理消息:


image.png

移除注册的消息:


image.png

发送消息:


image.png

cmd是消息id,toId是需要发送的玩家,messageBase是消息内容

void 
CMessageQueueBase::sendMessage( const CMessageBlockPtr& messageBlock )
{
    //LOG_TRACE( "CMessageQueue::pushMessage" , messageBlock._ptr );
    if( !messageBlock )
    {
        assert( false );
        return;
    }

    ///////// to player /////////
    SeqEntityId toIds( messageBlock->_messageHead.toIds );
    for( size_t i = 0 ; i < toIds.size() ; i++ )
    {
        SEntityId& id = toIds[i];
        IMessageHandlerPtr handler;
        {
            CAutoLightLock autoLock(_idMessageHandlersLock);
            //MessageHandlerHandlerIdMap::const_iterator iter;
            MessageHandlerHandlerIdMap::iterator iter;
            iter = _idMessageHandlers.find( id );
            if( _idMessageHandlers.end() != iter )
            {
                handler = iter->second;
            }
        }
        if( handler )
        {
            try
            {
                handler->onMessage( messageBlock );
            }
            catch( const CException& ex )
            {
                CDF_LOG_INFO( CWarning , 
                    __FILE__ << ":" << __LINE__ 
                    << "Exception " << ex.what() << 
                    " Cmd:" << messageBlock->_messageHead.command );
                if( _breakOnError ) throw;
            }
            catch( const std::exception& ex )
            {
                CDF_LOG_INFO( CWarning ,
                    __FILE__ << ":" << __LINE__ 
                    << "exception " << ex.what() << 
                    " Cmd:" << messageBlock->_messageHead.command );
                if( _breakOnError ) throw;
            }
            catch( ... )
            {//if has unkown exception

                CDF_LOG_INFO( CError ,
                    __FILE__ << ":" << __LINE__ 
                    << "unkown exception" <<
                    " Cmd:" << messageBlock->_messageHead.command );
                if( _breakOnError ) throw;
            }
        }
    }

    ///////// to system /////////
    if (toIds.empty())
    {
        IMessageHandlerPtr handler = NULL;
        {
            CAutoLightLock autoLock(_commandMessageHandlersLock);
            //MessageHandlerCommandMap::const_iterator iter;
            MessageHandlerCommandMap::iterator iter;
            iter = _commandMessageHandlers.find(messageBlock->_messageHead.command);
            if (_commandMessageHandlers.end() != iter)
            {
                handler = iter->second;
            }
        }
        //if is not process by any handler
        if (handler)
        {
            try
            {
                handler->onMessage(messageBlock);
            }
            catch (const CException& ex)
            {
                CDF_LOG_INFO(CWarning,
                    __FILE__ << ":" << __LINE__
                    << "cdf::CException " << ex.what() <<
                    " Cmd:" << messageBlock->_messageHead.command);
                if (_breakOnError) throw;

            }
            catch (const std::exception& ex)
            {
                CDF_LOG_INFO(CWarning,
                    __FILE__ << ":" << __LINE__
                    << "std::exception " << ex.what() <<
                    " Cmd:" << messageBlock->_messageHead.command);
                if (_breakOnError) throw;
            }
            catch (...)
            {//if has unkown exception
                CDF_LOG_INFO(CError,
                    __FILE__ << ":" << __LINE__
                    << "unkown exception" <<
                    " Cmd:" << messageBlock->_messageHead.command);
                if (_breakOnError) throw;
            }
        }
    }
}

这段代码实现了一个消息队列(Message Queue),用于处理消息的发送和处理。

函数 sendMessage 接收一个 CMessageBlockPtr 类型的消息块,并将其发送给一个或多个消息处理器(Message Handler),或者发送给系统处理器(System Handler)。

首先,代码会遍历消息块中的接收者列表,然后在 _idMessageHandlers 中查找与接收者 ID 匹配的消息处理器。如果找到匹配的消息处理器,则调用其 onMessage 函数,将消息块传递给它进行处理。

如果找不到匹配的消息处理器,则将消息块发送给系统处理器。这时,代码会在 _commandMessageHandlers 中查找与消息块命令匹配的系统处理器。如果找到匹配的系统处理器,则同样调用其 onMessage 函数,将消息块传递给它进行处理。

代码中还包括异常处理机制。如果在消息处理器的 onMessage 函数中发生了异常,代码会将异常信息写入日志文件,并根据 _breakOnError 标志决定是否抛出异常。

推送消息至队列:


image.png

推送消息至队列头:


image.png

注意上面是往_messageWriteList的尾部插入消息
下面的是往 __messageReadList的头部插入消息

分发消息

image.png

这段代码是一个无限循环,其目的是将消息块从一个队列中取出并进行处理。

首先,代码会从一个叫做 _messageReadList 的队列中读取消息块,如果该队列不为空,则将队首的消息块取出并从队列中删除。

然后,代码会调用 sendMessage 方法来处理该消息块,即根据消息头中的目标 ID(toIds)将消息发送到对应的消息处理器中。

如果读取的消息块为空,则继续循环,直到队列中有新的消息块可以处理。

如果读取的消息块不为空,但是在处理消息块期间抛出异常,则根据 _breakOnError 的值来决定是否将异常继续向上抛出。

最后,当 _messageReadList 队列为空时,代码会将 _messageReadList 和 _messageWriteList 进行交换,以便下一轮循环可以从新的 _messageReadList 队列中读取消息块。如果交换成功,则跳出循环。

看完cdf::CMessageQueueBase之后 可以开始看bigMessageQueue是何方神圣了~

bigMessageQueue

    class CBigMessageQueue
        :public cdf::CMessageQueueBase,
        public cdf::IWakeMessageLoopHandler
    {
    public:
        CBigMessageQueue();
        /**
        * to push message to chanel
        */
        bool pushRemoteMessage( 
            int channelId ,
            const cdf::CMessageBlockPtr& mb
            );

        /**
        * to push message to chanel
        */
        bool pushRemoteMessage( 
            int channelId ,
            int command ,
            const cdf::IMessageBasePtr& messageBase
            );

        bool pushRemoteMessage( 
            std::vector<int>& channelIds,
            const cdf::CMessageBlockPtr& mb
            );

        bool onMessage(
            cdf::CSerializeStream& is
            );

        /**
        * to wake message loop
        */
        virtual void wakeMessageLoop();

        virtual void waitMessageLoop( int mill );

    public:
        static CBigMessageQueue* instance();
    };

CBigMessageQueue::CBigMessageQueue() 构造函数用于初始化类对象,该函数将当前对象的指针作为参数传递给基类 CMessageQueueBase 的构造函数。
CBigMessageQueue::pushRemoteMessage() 方法用于将消息块推送到指定的通道中,方法的参数包括通道 ID 和消息块的指针。
CBigMessageQueue::pushRemoteMessage() 方法的另一个重载用于将消息推送到指定通道中,其参数包括通道 ID、命令号和消息的基类指针。
CBigMessageQueue::pushRemoteMessage() 方法的另一个重载用于将消息块推送到多个通道中。
CBigMessageQueue::onMessage() 方法用于接收从网络中接收到的消息,并将消息块推送到消息队列中等待处理。
CBigMessageQueue::wakeMessageLoop() 方法用于唤醒事件循环处理器。
CBigMessageQueue::waitMessageLoop() 方法不实现任何操作。
CBigMessageQueue::instance() 方法用于获取 CBigMessageQueue 类的静态实例。

构造函数

image.png

上面介绍到,本质就是初始化CMessageQueueBase的两个list

推送远程消息

bool CBigMessageQueue::pushRemoteMessage( 
                 int channelId ,
                 const cdf::CMessageBlockPtr& mb
                 )
{
    if( channelId == 0 )
    {
        return false;
    }
    cde::CSessionPtr session = 
        cde::CChannelManager::instance()->get( channelId );
    if( !session )
    {
        CDF_LOG_TRACE( "CBigMessageQueue::pushRemoteMessage", " session is NULL, channel id: " << channelId );
        return false;
    }
    static Engine::RMI::SRMICall call;
    
    cdf::CAutoSerializeStream stream( cdf::CSerializeStreamPool::instance()->newObject() );
    static CContext contextNotUsed;
    static CRMIObjectBindPtr objectBindNotUsed = new CRMIObjectBind;
    cde::COutgoing::perpareInvoke(
        contextNotUsed,
        *stream,
        call,
        NULL,
        objectBindNotUsed,
        ::Engine::RMI::MessageTypeMQ);
    mb->__write( *stream );
    cde::COutgoing::invokeAsyncNoBack( 
        session ,
        call ,
        *stream , 
        ::Engine::RMI::MessageTypeMQ 
        );
    return true;
}

这个方法是将消息推送到远程的消息队列中。它首先获取指定通道的会话对象,然后创建一个序列化流并使用给定的消息块填充它。接下来,它使用COutgoing::invokeAsyncNoBack函数将消息通过远程调用异步地发送到会话的远程端点。在此过程中,它使用了SRMICall、CContext、CRMIObjectBind等对象来实现远程过程调用。最后,该方法返回一个布尔值表示是否成功将消息推送到远程队列中。

channelManager

这里出现了几个没出现过的对象
先介绍一下channelManager


image.png

这是一个单例模式的类 CChannelManager,它用于管理一组 CSession 对象。
在这个类中,_sessionMap 是一个映射表,它将一个整数的 id 映射到一个 CSessionPtr 智能指针。
通过 add() 方法,可以将一个 CSessionPtr 添加到 _sessionMap 中;通过 get() 方法,可以根据给定的 id 查找相应的 CSession 对象;
通过 remove() 方法,可以删除一个 id 对应的 CSession 对象;通过 clear() 方法,可以清空 _sessionMap 中的所有对象;
通过 flush() 方法,可以遍历 _sessionMap 中的所有 CSession 对象,然后调用每个 CSession 对象中的 CRMIConnection 对象的 flush() 方法来刷新数据。
类中还包括了一个 instance() 静态方法,它返回了这个类的单例对象。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容