cde::CBigMessageQueue介绍
看继承关系:
继承于cdf::CMessageQueueBase和cdf::IWakeMessageLoopHandler两个类
cdf::CMessageQueueBase
先看cdf::CMessageQueueBase是啥:
typedef cde_hash_map<SEntityId, IMessageHandlerPtr, entity_id_hash> MessageHandlerHandlerIdMap;
//typedef std::map<SEntityId,IMessageHandlerPtr> MessageHandlerHandlerIdMap;
typedef cde_hash_map<int,IMessageHandlerPtr> MessageHandlerCommandMap;
typedef std::list<CMessageBlockPtr,CCdfAllocator<CMessageBlockPtr> > MessageBlockList;
MessageHandlerHandlerIdMap 是一个map 其中Key是SEntityId,value是 IMessageHandlerPtr
MessageHandlerCommandMap 是一个map 其中key是int value是IMessageHandlerPtr
MessageBlockList 是一个list 保存了CMessageBlockPtr
分别介绍这几个类:
SEntityId:
继承于IMessageBase
可以看到 只是规定了每个消息需要实现的操作:”读写拷贝以及重载=符号
这个类有个友元类:CMessageBlock
这段代码定义了一个名为CMessageBlock的类,它继承了CRefShared类,因此CMessageBlock对象可以使用引用计数来管理内存,从而避免内存泄漏问题。
CMessageBlock类包含以下成员变量:
_messageHead:用于存储消息头信息的结构体。
_messageBase:指向实现了IMessageBase接口的对象的智能指针。
_os:指向CSerializeStream类对象的指针,用于序列化消息数据。
CMessageBlock类还包含了以下成员函数:
CMessageBlock():默认构造函数。
~CMessageBlock():虚析构函数。
initOS():初始化_os成员变量。
__write():将消息数据写入指定的序列化流。
__writeHead():将消息头信息写入指定的序列化流。
__writeBody():将消息体信息写入指定的序列化流。
__read():从指定的序列化流中读取消息数据。
__readHead():从指定的序列化流中读取消息头信息。
__readBody():从指定的序列化流中读取消息体信息。
operator=():赋值运算符重载函数,被声明为private,禁止对CMessageBlock对象进行赋值操作。
综上所述,CMessageBlock类是一个用于表示消息数据的类,它将消息头和消息体封装成一个对象,并且提供了序列化和反序列化消息数据的方法。
IMessageHandlerPtr:
这段代码定义了一个名为IMessageHandler的抽象基类,它继承了CRefShared类,因此IMessageHandler对象可以使用引用计数来管理内存,从而避免内存泄漏问题。
IMessageHandler类包含以下成员变量:
_objectId:表示对象的唯一标识符。
IMessageHandler类还包含了以下成员函数:
IMessageHandler():默认构造函数。
~IMessageHandler():虚析构函数。
getId():获取对象的唯一标识符。
onMessage():处理收到的消息数据。
operator<():小于运算符重载函数,用于在容器中对IMessageHandler对象进行排序。
setId():设置对象的唯一标识符。
IMessageHandlerPtr是一个智能指针类型,它使用CHandle模板类来管理IMessageHandler对象的生命周期。可以使用IMessageHandlerPtr指针来操作IMessageHandler对象,而不必手动管理对象的内存。
综上所述,IMessageHandler类是一个用于处理消息的抽象基类,它定义了一个处理收到的消息数据的方法,并提供了对象唯一标识符的管理。IMessageHandlerPtr是一个智能指针类型,用于方便地管理IMessageHandler对象的生命周期。
CMessageBlockPtr:CMessageBlock的指针
class CMessageQueueBase
{
public:
typedef cde_hash_map<SEntityId, IMessageHandlerPtr, entity_id_hash> MessageHandlerHandlerIdMap;
//typedef std::map<SEntityId,IMessageHandlerPtr> MessageHandlerHandlerIdMap;
typedef cde_hash_map<int,IMessageHandlerPtr> MessageHandlerCommandMap;
typedef std::list<CMessageBlockPtr,CCdfAllocator<CMessageBlockPtr> > MessageBlockList;
//typedef std::list<CMessageBlockPtr> MessageBlockList;
CMessageQueueBase( IWakeMessageLoopHandler* messageLoopPtr );
virtual ~CMessageQueueBase();
/**
* to regist handle to proccess message
* if the MESSAGE_ID and common regist to throw command
*/
int registeHandler(const SEntityId& id, const IMessageHandlerPtr& handler);
/**
* if the command not execute by id
* the command to excecute to by command
*/
int registeHandler( const int command , const IMessageHandlerPtr& handler );
/**
* remove handles
*/
int removeHandler(const SEntityId& id);
/**
* remove handles
*/
int removeHandler( const int command );
/**
* push message to the end
*/
void pushMessage( const CMessageBlockPtr& messageBlock );
/**
* push message to the end
*/
void pushMessage( int cmd , const IMessageBasePtr& messageBase );
/**
* push message to the end
*/
void pushMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase);
/**
* push message to the end
*/
void sendMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase);
//to send message
void sendMessage( const CMessageBlockPtr& messageBlock );
/**
* push message to the front
*/
void pushMessageFront( CMessageBlockPtr& messageBlock );
/***
* distribute message
*/
void distributeMessage();
/**
* break on error
*/
void setBreakOnError( bool breakOnError ){ _breakOnError = breakOnError; }
bool isBreakOnError() const { return _breakOnError; }
protected:
CMessageQueueBase();
CLightLock _idMessageHandlersLock;
MessageHandlerHandlerIdMap _idMessageHandlers;
CLightLock _commandMessageHandlersLock;
MessageHandlerCommandMap _commandMessageHandlers;
CLightLock _messageReadListLock;
MessageBlockList* _messageReadList;
CLightLock _messageWriteListLock;
MessageBlockList* _messageWriteList;
IWakeMessageLoopHandler* _messageLoopPtr;
bool _breakOnError;
};
这段代码定义了一个消息队列基类 CMessageQueueBase,用于存储和分发消息。以下是该类的主要成员函数和数据成员的解释:
成员函数:
registeHandler(const SEntityId& id, const IMessageHandlerPtr& handler):注册消息处理器,将给定的 IMessageHandlerPtr 实例与给定的 SEntityId 实例相关联。
registeHandler(const int command, const IMessageHandlerPtr& handler):注册消息处理器,将给定的 IMessageHandlerPtr 实例与给定的 command 相关联。
removeHandler(const SEntityId& id):从队列中移除与给定 SEntityId 相关联的消息处理器。
removeHandler(const int command):从队列中移除与给定 command 相关联的消息处理器。
pushMessage(const CMessageBlockPtr& messageBlock):将给定的 CMessageBlockPtr 实例添加到消息队列的末尾。
pushMessage(int cmd, const IMessageBasePtr& messageBase):构造一个消息块 CMessageBlock 实例,并将其添加到消息队列的末尾。
pushMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase):构造一个消息块 CMessageBlock 实例,并将其添加到消息队列的末尾。该消息将被分配给给定的 SEntityId。
sendMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase):将给定的消息块直接发送给给定的 SEntityId。
sendMessage(const CMessageBlockPtr& messageBlock):将给定的消息块直接发送给其 SEntityId。
pushMessageFront(CMessageBlockPtr& messageBlock):将给定的 CMessageBlockPtr 实例添加到消息队列的开头。
distributeMessage():分发当前在队列中的所有消息。
setBreakOnError(bool breakOnError):设置队列是否在出现错误时中断。
isBreakOnError() const:获取队列当前是否在出现错误时中断。
数据成员:
MessageHandlerHandlerIdMap _idMessageHandlers:用于存储与 SEntityId 相关联的 IMessageHandlerPtr 实例的映射。
MessageHandlerCommandMap _commandMessageHandlers:用于存储与 command 相关联的 IMessageHandlerPtr 实例的映射。
MessageBlockList* _messageReadList:指向消息队列中要读取的消息列表的指针。
MessageBlockList* _messageWriteList:指向消息队列中要写入的消息列表的指针。
IWakeMessageLoopHandler* _messageLoopPtr:指向与此消息队列相关联的消息循环的指针。
bool _breakOnError:表示队列是否在出现错误时中断。
成员方法:
构造函数:
生成两个list _messageReadList和_messageWriteList
注册处理消息:
移除注册的消息:
发送消息:
cmd是消息id,toId是需要发送的玩家,messageBase是消息内容
void
CMessageQueueBase::sendMessage( const CMessageBlockPtr& messageBlock )
{
//LOG_TRACE( "CMessageQueue::pushMessage" , messageBlock._ptr );
if( !messageBlock )
{
assert( false );
return;
}
///////// to player /////////
SeqEntityId toIds( messageBlock->_messageHead.toIds );
for( size_t i = 0 ; i < toIds.size() ; i++ )
{
SEntityId& id = toIds[i];
IMessageHandlerPtr handler;
{
CAutoLightLock autoLock(_idMessageHandlersLock);
//MessageHandlerHandlerIdMap::const_iterator iter;
MessageHandlerHandlerIdMap::iterator iter;
iter = _idMessageHandlers.find( id );
if( _idMessageHandlers.end() != iter )
{
handler = iter->second;
}
}
if( handler )
{
try
{
handler->onMessage( messageBlock );
}
catch( const CException& ex )
{
CDF_LOG_INFO( CWarning ,
__FILE__ << ":" << __LINE__
<< "Exception " << ex.what() <<
" Cmd:" << messageBlock->_messageHead.command );
if( _breakOnError ) throw;
}
catch( const std::exception& ex )
{
CDF_LOG_INFO( CWarning ,
__FILE__ << ":" << __LINE__
<< "exception " << ex.what() <<
" Cmd:" << messageBlock->_messageHead.command );
if( _breakOnError ) throw;
}
catch( ... )
{//if has unkown exception
CDF_LOG_INFO( CError ,
__FILE__ << ":" << __LINE__
<< "unkown exception" <<
" Cmd:" << messageBlock->_messageHead.command );
if( _breakOnError ) throw;
}
}
}
///////// to system /////////
if (toIds.empty())
{
IMessageHandlerPtr handler = NULL;
{
CAutoLightLock autoLock(_commandMessageHandlersLock);
//MessageHandlerCommandMap::const_iterator iter;
MessageHandlerCommandMap::iterator iter;
iter = _commandMessageHandlers.find(messageBlock->_messageHead.command);
if (_commandMessageHandlers.end() != iter)
{
handler = iter->second;
}
}
//if is not process by any handler
if (handler)
{
try
{
handler->onMessage(messageBlock);
}
catch (const CException& ex)
{
CDF_LOG_INFO(CWarning,
__FILE__ << ":" << __LINE__
<< "cdf::CException " << ex.what() <<
" Cmd:" << messageBlock->_messageHead.command);
if (_breakOnError) throw;
}
catch (const std::exception& ex)
{
CDF_LOG_INFO(CWarning,
__FILE__ << ":" << __LINE__
<< "std::exception " << ex.what() <<
" Cmd:" << messageBlock->_messageHead.command);
if (_breakOnError) throw;
}
catch (...)
{//if has unkown exception
CDF_LOG_INFO(CError,
__FILE__ << ":" << __LINE__
<< "unkown exception" <<
" Cmd:" << messageBlock->_messageHead.command);
if (_breakOnError) throw;
}
}
}
}
这段代码实现了一个消息队列(Message Queue),用于处理消息的发送和处理。
函数 sendMessage 接收一个 CMessageBlockPtr 类型的消息块,并将其发送给一个或多个消息处理器(Message Handler),或者发送给系统处理器(System Handler)。
首先,代码会遍历消息块中的接收者列表,然后在 _idMessageHandlers 中查找与接收者 ID 匹配的消息处理器。如果找到匹配的消息处理器,则调用其 onMessage 函数,将消息块传递给它进行处理。
如果找不到匹配的消息处理器,则将消息块发送给系统处理器。这时,代码会在 _commandMessageHandlers 中查找与消息块命令匹配的系统处理器。如果找到匹配的系统处理器,则同样调用其 onMessage 函数,将消息块传递给它进行处理。
代码中还包括异常处理机制。如果在消息处理器的 onMessage 函数中发生了异常,代码会将异常信息写入日志文件,并根据 _breakOnError 标志决定是否抛出异常。
推送消息至队列:
推送消息至队列头:
注意上面是往_messageWriteList的尾部插入消息
下面的是往 __messageReadList的头部插入消息
分发消息:
这段代码是一个无限循环,其目的是将消息块从一个队列中取出并进行处理。
首先,代码会从一个叫做 _messageReadList 的队列中读取消息块,如果该队列不为空,则将队首的消息块取出并从队列中删除。
然后,代码会调用 sendMessage 方法来处理该消息块,即根据消息头中的目标 ID(toIds)将消息发送到对应的消息处理器中。
如果读取的消息块为空,则继续循环,直到队列中有新的消息块可以处理。
如果读取的消息块不为空,但是在处理消息块期间抛出异常,则根据 _breakOnError 的值来决定是否将异常继续向上抛出。
最后,当 _messageReadList 队列为空时,代码会将 _messageReadList 和 _messageWriteList 进行交换,以便下一轮循环可以从新的 _messageReadList 队列中读取消息块。如果交换成功,则跳出循环。
看完cdf::CMessageQueueBase之后 可以开始看bigMessageQueue是何方神圣了~
bigMessageQueue
class CBigMessageQueue
:public cdf::CMessageQueueBase,
public cdf::IWakeMessageLoopHandler
{
public:
CBigMessageQueue();
/**
* to push message to chanel
*/
bool pushRemoteMessage(
int channelId ,
const cdf::CMessageBlockPtr& mb
);
/**
* to push message to chanel
*/
bool pushRemoteMessage(
int channelId ,
int command ,
const cdf::IMessageBasePtr& messageBase
);
bool pushRemoteMessage(
std::vector<int>& channelIds,
const cdf::CMessageBlockPtr& mb
);
bool onMessage(
cdf::CSerializeStream& is
);
/**
* to wake message loop
*/
virtual void wakeMessageLoop();
virtual void waitMessageLoop( int mill );
public:
static CBigMessageQueue* instance();
};
CBigMessageQueue::CBigMessageQueue() 构造函数用于初始化类对象,该函数将当前对象的指针作为参数传递给基类 CMessageQueueBase 的构造函数。
CBigMessageQueue::pushRemoteMessage() 方法用于将消息块推送到指定的通道中,方法的参数包括通道 ID 和消息块的指针。
CBigMessageQueue::pushRemoteMessage() 方法的另一个重载用于将消息推送到指定通道中,其参数包括通道 ID、命令号和消息的基类指针。
CBigMessageQueue::pushRemoteMessage() 方法的另一个重载用于将消息块推送到多个通道中。
CBigMessageQueue::onMessage() 方法用于接收从网络中接收到的消息,并将消息块推送到消息队列中等待处理。
CBigMessageQueue::wakeMessageLoop() 方法用于唤醒事件循环处理器。
CBigMessageQueue::waitMessageLoop() 方法不实现任何操作。
CBigMessageQueue::instance() 方法用于获取 CBigMessageQueue 类的静态实例。
构造函数:
上面介绍到,本质就是初始化CMessageQueueBase的两个list
推送远程消息
bool CBigMessageQueue::pushRemoteMessage(
int channelId ,
const cdf::CMessageBlockPtr& mb
)
{
if( channelId == 0 )
{
return false;
}
cde::CSessionPtr session =
cde::CChannelManager::instance()->get( channelId );
if( !session )
{
CDF_LOG_TRACE( "CBigMessageQueue::pushRemoteMessage", " session is NULL, channel id: " << channelId );
return false;
}
static Engine::RMI::SRMICall call;
cdf::CAutoSerializeStream stream( cdf::CSerializeStreamPool::instance()->newObject() );
static CContext contextNotUsed;
static CRMIObjectBindPtr objectBindNotUsed = new CRMIObjectBind;
cde::COutgoing::perpareInvoke(
contextNotUsed,
*stream,
call,
NULL,
objectBindNotUsed,
::Engine::RMI::MessageTypeMQ);
mb->__write( *stream );
cde::COutgoing::invokeAsyncNoBack(
session ,
call ,
*stream ,
::Engine::RMI::MessageTypeMQ
);
return true;
}
这个方法是将消息推送到远程的消息队列中。它首先获取指定通道的会话对象,然后创建一个序列化流并使用给定的消息块填充它。接下来,它使用COutgoing::invokeAsyncNoBack函数将消息通过远程调用异步地发送到会话的远程端点。在此过程中,它使用了SRMICall、CContext、CRMIObjectBind等对象来实现远程过程调用。最后,该方法返回一个布尔值表示是否成功将消息推送到远程队列中。
channelManager
这里出现了几个没出现过的对象
先介绍一下channelManager
这是一个单例模式的类 CChannelManager,它用于管理一组 CSession 对象。
在这个类中,_sessionMap 是一个映射表,它将一个整数的 id 映射到一个 CSessionPtr 智能指针。
通过 add() 方法,可以将一个 CSessionPtr 添加到 _sessionMap 中;通过 get() 方法,可以根据给定的 id 查找相应的 CSession 对象;
通过 remove() 方法,可以删除一个 id 对应的 CSession 对象;通过 clear() 方法,可以清空 _sessionMap 中的所有对象;
通过 flush() 方法,可以遍历 _sessionMap 中的所有 CSession 对象,然后调用每个 CSession 对象中的 CRMIConnection 对象的 flush() 方法来刷新数据。
类中还包括了一个 instance() 静态方法,它返回了这个类的单例对象。