了解分布式系统的童鞋肯定听过Paxos算法的大名。Paxos算法以晦涩难懂著称,其工程实现更难。目前,号称在工程上实现了Paxos算法的应该只有Google、阿里和腾讯。然而,只有腾讯的微信团队真正将代码开源出来,他们将Paxos算法的实现封装成了一个Paxos库,大家可以基于该库实现自己想要的功能,比如用于master选举,或者甚至利用它来实现一个分布式KV数据库等。
之前就对Paxos很感兴趣,但是一直没看过实现的代码,这次微信开源了PhxPaxos后终于有机会深入地了解Paxos的实现细节。在这里感谢微信团队。感谢PhxPaxos的作者。让我们一起来领略Paxos的魅力吧。
本次的源码分析先从网络部分开始。因为在分布式系统中不可避免会涉及到不同节点以及相同节点上不同进程之间的通信。因此网络部分也是至关重要,所以就先把网络单独拿出来看,接下来再去看Paxos算法的实现部分。
概览
源码的include/phxpaxos目录下是公共头文件。include/phpaxos/network.h 是网络模块的抽象函数,如果用户想使用自己的网络协议,可以通过重写这些函数实现网络模块的自定义。
我们先来看下network.h的内容:
namespace phxpaxos
{
//You can use your own network to make paxos communicate. :)
class Node;
class NetWork
{
public:
NetWork();
virtual ~NetWork() {}
//Network must not send/recieve any message before paxoslib called this funtion.
virtual void RunNetWork() = 0;
//If paxoslib call this function, network need to stop receive any message.
virtual void StopNetWork() = 0;
virtual int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0;
virtual int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0;
//When receive a message, call this funtion.
//This funtion is async, just enqueue an return.
int OnReceiveMessage(const char * pcMessage, const int iMessageLen);
private:
friend class Node;
Node * m_poNode;
};
}
这几个函数的作用从名字就可以看出来。而且都是虚函数,即需要重写这些函数。在PhxPaxos中,提供了一个默认的网络模块,就是继承了NetWork类。该类的名字叫DFNetWork,DF应该就是default的缩写了。如下:
namespace phxpaxos
{
class DFNetWork : public NetWork
{
public:
DFNetWork();
virtual ~DFNetWork();
int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount);
void RunNetWork();
void StopNetWork();
int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage);
int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage);
private:
UDPRecv m_oUDPRecv;
UDPSend m_oUDPSend;
TcpIOThread m_oTcpIOThread;
};
}
该类的私有成员里有UDPRecv、UDPSend和TcpIOThread三个类的对象,这三个类分别用于接收UDP消息、发送UDP消息以及收发TCP消息。
Init方法就是将UDPRecv、UDPSend和TcpIOThread分别初始化:
int DFNetWork :: Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount)
{ //初始化UDPSend
int ret = m_oUDPSend.Init();
if (ret != 0)
{
return ret;
}
//初始化UDPRecv
ret = m_oUDPRecv.Init(iListenPort);
if (ret != 0)
{
return ret;
}
//初始化TCP
ret = m_oTcpIOThread.Init(sListenIp, iListenPort, iIOThreadCount);
if (ret != 0)
{
PLErr("m_oTcpIOThread Init fail, ret %d", ret);
return ret;
}
return 0;
}
具体的初始化过程就是调用socket的api。以UDPRecv为例,就是创建socket、设定端口、设置socket属性(如端口可重用)最后绑定端口。如下:
int UDPRecv :: Init(const int iPort)
{ //创建socket,获得socket fd
if ((m_iSockFD = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
{
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(iPort); //设定端口
addr.sin_addr.s_addr = htonl(INADDR_ANY);
int enable = 1;
//设定socket属性,端口可重用
setsockopt(m_iSockFD, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
//绑定,用于监听
if (bind(m_iSockFD, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
return -1;
}
return 0;
}
RunNetWork就是将UDPRecv、UDPSend和TcpIOThread分别运行起来:
void DFNetWork :: RunNetWork()
{ //UDPSend和UDPRecv都是调用Thread的start方法
m_oUDPSend.start();
m_oUDPRecv.start();
//TCP的Start是封装过的
m_oTcpIOThread.Start();
}
TcpIOThread的Start()实际执行的代码如下,分别启动了TcpAcceptor、TcpWrite和TcpRead:
void TcpIOThread :: Start()
{
m_oTcpAcceptor.start();
for (auto & poTcpWrite : m_vecTcpWrite)
{
poTcpWrite->start();
}
for (auto & poTcpRead : m_vecTcpRead)
{
poTcpRead->start();
}
m_bIsStarted = true;
}
StopNetWork就是将UDPRecv、UDPSend和TcpIOThread停止。
SendMessageTCP就是将消息用TCP发送:
int DFNetWork :: SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage)
{
return m_oTcpIOThread.AddMessage(iGroupIdx, sIp, iPort, sMessage);
}
SendMessageUDP就是将消息用UDP发送:
int DFNetWork :: SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage)
{
return m_oUDPSend.AddMessage(sIp, iPort, sMessage);
}
UDP
UDPSend
前面SendMessageUDP调用了m_oUDPSend.AddMessage。这里的UDPSend维护了一个发送队列,如下:
Queue<QueueData *> m_oSendQueue;
m_oUDPSend.AddMessage就是将消息加入到UDP的m_oSendQueue中。
然后UDPSend在run方法中一直循环将m_oSendQueue中的消息发送出去:
void UDPSend :: run()
{
m_bIsStarted = true;
while(true)
{
QueueData * poData = nullptr;
//同步,线程安全
m_oSendQueue.lock();
bool bSucc = m_oSendQueue.peek(poData, 1000);
if (bSucc)
{ //取出队头消息
m_oSendQueue.pop();
}
m_oSendQueue.unlock();
if (poData != nullptr)
{ //将消息发送出去
SendMessage(poData->m_sIP, poData->m_iPort, poData->m_sMessage);
delete poData;
}
if (m_bIsEnd)
{
PLHead("UDPSend [END]");
return;
}
}
}
因此UDPSend就是把消息加入到消息队列,然后循环将消息队列里的消息发送出去。
UDPRecv
接下来看看UDPRecv。UDPRecv的初始化前面已经看过了,就是简单的获得socket fd,设定sockaddr_in,设置socket属性最后将socket fd和sockaddr_in绑定用于监听。
主要来看看UDPRecv的run方法。这里主要用了I/O多路复用中的poll,注册了一个pollfd,该pollfd的fd即之前创建的绑定了端口的socket fd,events为POLLIN,表示监听数据可读事件,如果有数据可读了,则调用recvfrom读入数据。最后调用OnReceiveMessage将消息添加到当前instance的IoLoop中:
void UDPRecv :: run()
{
m_bIsStarted = true;
char sBuffer[65536] = {0};
struct sockaddr_in addr;
socklen_t addr_len = sizeof(struct sockaddr_in);
memset(&addr, 0, sizeof(addr));
while(true)
{
if (m_bIsEnd)
{
PLHead("UDPRecv [END]");
return;
}
struct pollfd fd;
int ret;
fd.fd = m_iSockFD;
//注册POLLIN事件
fd.events = POLLIN;
//调用poll检查是否有数据可读
ret = poll(&fd, 1, 500);
if (ret == 0 || ret == -1)
{
continue;
}
//将接收到的数据放入sBuffer中
int iRecvLen = recvfrom(m_iSockFD, sBuffer, sizeof(sBuffer), 0,
(struct sockaddr *)&addr, &addr_len);
BP->GetNetworkBP()->UDPReceive(iRecvLen);
if (iRecvLen > 0)
{ //这里会依次调用Node和Instance的OnReceiveMessage方法,最后将消息加入到Instance的IoLoop中
m_poDFNetWork->OnReceiveMessage(sBuffer, iRecvLen);
}
}
}
TCP
TcpIOThread
接下来看看收发TCP消息的TcpIOThread:
class TcpIOThread
{
public:
TcpIOThread(NetWork * poNetWork);
~TcpIOThread();
//用于初始化TcpAcceptor以及iIOThreadCount个m_vecTcpRead和m_vecTcpWrite
int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount);
//启动TcpAcceptor用于监听以及所有的m_vecTcpRead和m_vecTcpWrite用于读写消息
void Start();
//停止TcpAcceptor和所有的m_vecTcpRead及m_vecTcpWrite
void Stop();
//将消息加入到特定TcpWrite的消息队列中
int AddMessage(const int iGroupIdx, const std::string & sIP, const int iPort, const std::string & sMessage);
private:
NetWork * m_poNetWork;
TcpAcceptor m_oTcpAcceptor;
std::vector<TcpRead *> m_vecTcpRead;
std::vector<TcpWrite *> m_vecTcpWrite;
bool m_bIsStarted;
};
TcpRead类似于前面讲的UDPRecv,TcpWrite类似于于UDPSend。严格来讲,TcpAcceptor + TcpRead才是UDPRecv。这里把TcpAcceptor单独抽出来,专门用于监听连接请求并建立连接。TcpRead只需要负责读消息就行。
TcpAcceptor
我们来看看TcpAcceptor:
class TcpAcceptor : public Thread
{
public:
TcpAcceptor();
~TcpAcceptor();
//监听端口
void Listen(const std::string & sListenIP, const int iListenPort);
//一直while循环,监听连接事件并建立连接获得fd,然后添加事件到EventLoop中
void run();
void Stop();
void AddEventLoop(EventLoop * poEventLoop);
void AddEvent(int iFD, SocketAddress oAddr);
private:
//服务端的socket,用于监听
ServerSocket m_oSocket;
std::vector<EventLoop *> m_vecEventLoop;
private:
bool m_bIsEnd;
bool m_bIsStarted;
};
这里主要来看下run方法:
void TcpAcceptor :: run()
{
m_bIsStarted = true;
PLHead("start accept...");
m_oSocket.setAcceptTimeout(500);
m_oSocket.setNonBlocking(true);
while (true)
{
struct pollfd pfd;
int ret;
pfd.fd = m_oSocket.getSocketHandle();
//注册事件
pfd.events = POLLIN;
//等待事件到来
ret = poll(&pfd, 1, 500);
if (ret != 0 && ret != -1)
{
SocketAddress oAddr;
int fd = -1;
try
{
//建立连接,获得fd。这里的acceptfd对accept进行了简单的封装
fd = m_oSocket.acceptfd(&oAddr);
}
catch(...)
{
fd = -1;
}
if (fd >= 0)
{
BP->GetNetworkBP()->TcpAcceptFd();
PLImp("accepted!, fd %d ip %s port %d",
fd, oAddr.getHost().c_str(), oAddr.getPort());
//添加事件
AddEvent(fd, oAddr);
}
}
if (m_bIsEnd)
{
PLHead("TCP.Acceptor [END]");
return;
}
}
}
再看看AddEvent方法:
void TcpAcceptor :: AddEvent(int iFD, SocketAddress oAddr)
{
EventLoop * poMinActiveEventLoop = nullptr;
int iMinActiveEventCount = 1 << 30;
for (auto & poEventLoop : m_vecEventLoop)
{
int iActiveCount = poEventLoop->GetActiveEventCount();
if (iActiveCount < iMinActiveEventCount)
{
iMinActiveEventCount = iActiveCount;
poMinActiveEventLoop = poEventLoop;
}
}
oAddr.getPort());
poMinActiveEventLoop->AddEvent(iFD, oAddr);
}
即找到活跃数最少的EventLoop,将事件添加到该EventLoop中。这里应该是为了负载均衡,防止有些线程工作量很大,有些则很空闲。
具体EventLoop的AddEvent就是将事件加入到FDQueue中,如下:
void EventLoop :: AddEvent(int iFD, SocketAddress oAddr)
{
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
m_oFDQueue.push(make_pair(iFD, oAddr));
}
到这里TcpAcceptor的作用及实现基本就很清晰了。
TcpRead
先来看看TcpRead类的定义:
class TcpRead : public Thread
{
public:
TcpRead(NetWork * poNetWork);
~TcpRead();
int Init();
void run();
void Stop();
EventLoop * GetEventLoop();
private:
EventLoop m_oEventLoop;
};
这里的成员变量是一个EventLoop对象。通过源码发现,Init、run、Stop方法其实都是调用了m_oEventLoop相应的方法,如下:
int TcpRead :: Init()
{
return m_oEventLoop.Init(20480);
}
void TcpRead :: run()
{
m_oEventLoop.StartLoop();
}
void TcpRead :: Stop()
{
m_oEventLoop.Stop();
join();
PLHead("TcpReadThread [END]");
}
因此主要来看下EventLoop。
首先说下Event。PhxPaxos在TCP这块主要用了I/O多路复用中的epoll。这里主要将数据和通知等都封装成Event,然后由TcpWrite和TcpRead的EventLoop去执行。PhxPaxos中的Event包含两个子类,分别是MessageEvent和Notify。其中MessageEvent主要用于数据的读写;而Notify主要用于通知事件发生。这里的Notify基于管道pipe和EPOLLIN事件来实现,可以通过Notify的Init方法看出:
int Notify :: Init()
{ //m_iPipeFD是一个长度为2的int数组,用于存放管道两端的socket fd
int ret = pipe(m_iPipeFD);
if (ret != 0)
{
PLErr("create pipe fail, ret %d", ret);
return ret;
}
fcntl(m_iPipeFD[0], F_SETFL, O_NONBLOCK);
fcntl(m_iPipeFD[1], F_SETFL, O_NONBLOCK);
AddEvent(EPOLLIN);
return 0;
}
继续回到EventLoop。首先看下EventLoop的Init方法:
int EventLoop :: Init(const int iEpollLength)
{
//创建epoll句柄,iEpollLength为监听的fd数
m_iEpollFd = epoll_create(iEpollLength);
if (m_iEpollFd == -1)
{
PLErr("epoll_create fail, ret %d", m_iEpollFd);
return -1;
}
m_poNotify = new Notify(this);
assert(m_poNotify != nullptr);
//初始化Notify:创建pipe,设置m_iPipeFD并添加EPOLLIN事件
int ret = m_poNotify->Init();
if (ret != 0)
{
return ret;
}
return 0;
}
接着来看下最重要的StartLoop:
void EventLoop :: StartLoop()
{
m_bIsEnd = false;
while(true)
{
BP->GetNetworkBP()->TcpEpollLoop();
int iNextTimeout = 1000;
DealwithTimeout(iNextTimeout);
//PLHead("nexttimeout %d", iNextTimeout);
OneLoop(iNextTimeout);
CreateEvent();
if (m_poTcpClient != nullptr)
{
m_poTcpClient->DealWithWrite();
}
if (m_bIsEnd)
{
PLHead("TCP.EventLoop [END]");
break;
}
}
}
主循环是OneLoop:
void EventLoop :: OneLoop(const int iTimeoutMs)
{ //调用epoll_wait等待事件发生
int n = epoll_wait(m_iEpollFd, m_EpollEvents, MAX_EVENTS, 1);
if (n == -1)
{
if (errno != EINTR)
{
PLErr("epoll_wait fail, errno %d", errno);
return;
}
}
//逐一处理发生的epoll事件
for (int i = 0; i < n; i++)
{
int iFd = m_EpollEvents[i].data.fd;
auto it = m_mapEvent.find(iFd);
if (it == end(m_mapEvent))
{
continue;
}
int iEvents = m_EpollEvents[i].events;
Event * poEvent = it->second.m_poEvent;
int ret = 0;
if (iEvents & EPOLLERR)
{
OnError(iEvents, poEvent);
continue;
}
try
{
//如果是EPOLLIN事件,表明由数据可读,则调用poEvent的OnRead方法处理
if (iEvents & EPOLLIN)
{
ret = poEvent->OnRead();
}
//如果是EPOLLOUT事件,表明由数据可写,则调用poEvent的OnWrite方法处理
if (iEvents & EPOLLOUT)
{
ret = poEvent->OnWrite();
}
}
catch (...)
{
ret = -1;
}
if (ret != 0)
{
OnError(iEvents, poEvent);
}
}
}
其他具体的细节这里就不再赘述了,有兴趣的可以自己去看看源码。
TcpWrite
看完了TcpRead,再来看看TcpWrite。首先还是看它的定义:
class TcpWrite : public Thread
{
public:
TcpWrite(NetWork * poNetWork);
~TcpWrite();
int Init();
void run();
void Stop();
int AddMessage(const std::string & sIP, const int iPort, const std::string & sMessage);
private:
TcpClient m_oTcpClient;
EventLoop m_oEventLoop;
};
Init、run、Stop跟TcpRead中对应方法的作用一致。AddMessage则是调用了m_oTcpClient的AddMessage方法。发现TcpWrite的成员变量比TcpRead多了一个TcpClient对象,因此主要来看看这个TcpClient是干嘛的。
刚刚说TcpWrite的AddMessage调用了m_oTcpClient的AddMessage方法。在m_oTcpClient的AddMessage方法中,则是先创建了一个指向MessageEvent对象的指针poEvent,然后再调用poEvent的AddMessage方法:
int TcpClient :: AddMessage(const std::string & sIP, const int iPort, const std::string & sMessage)
{
//PLImp("ok");
MessageEvent * poEvent = GetEvent(sIP, iPort);
if (poEvent == nullptr)
{
PLErr("no event created for this ip %s port %d", sIP.c_str(), iPort);
return -1;
}
return poEvent->AddMessage(sMessage);
}
因此继续看看MessageEvent的AddMessage方法:
int MessageEvent :: AddMessage(const std::string & sMessage)
{
m_llLastActiveTime = Time::GetSteadyClockMS();
std::unique_lock<std::mutex> oLock(m_oMutex);
if ((int)m_oInQueue.size() > TCP_QUEUE_MAXLEN)
{
BP->GetNetworkBP()->TcpQueueFull();
//PLErr("queue length %d too long, can't enqueue", m_oInQueue.size());
return -2;
}
if (m_iQueueMemSize > MAX_QUEUE_MEM_SIZE)
{
//PLErr("queue memsize %d too large, can't enqueue", m_iQueueMemSize);
return -2;
}
QueueData tData;
//将消息封装成QueueData后放入队列
tData.llEnqueueAbsTime = Time::GetSteadyClockMS();
tData.psValue = new string(sMessage);
m_oInQueue.push(tData);
m_iQueueMemSize += sMessage.size();
oLock.unlock();
//退出EpollWait,实际是调用SendNotify发送了一个通知
JumpoutEpollWait();
return 0;
}
可以看到这里将消息加上入队时间后封装成一个QueueDate,然后放入m_oInQueue队列中。最后调用EventLoop的SendNotify发送了一个通知(利用之前创建的pipe)退出EpollWait。
说完了消息怎么入队,那消息是怎么发送出去的呢?
这里主要涉及到MessageEvent的OnWrite函数:
int MessageEvent :: OnWrite()
{
int ret = 0;
//只要发送队列不为空或者还有上次未发送完的数据,就调用DoOnWrite执行真正的发送操作
while (!m_oInQueue.empty() || m_iLeftWriteLen > 0)
{
ret = DoOnWrite();
if (ret != 0 && ret != 1)
{
return ret;
}
else if (ret == 1)
{
//need break, wait next write
return 0;
}
}
WriteDone();
return 0;
}
DoOnWrite:
int MessageEvent :: DoOnWrite()
{
//上一次的消息还未发送完毕,将剩下的发送完
if (m_iLeftWriteLen > 0)
{
return WriteLeft();
}
m_oMutex.lock();
if (m_oInQueue.empty())
{
m_oMutex.unlock();
return 0;
}
//从队列中取出一条新消息,准备发送
QueueData tData = m_oInQueue.front();
m_oInQueue.pop();
m_iQueueMemSize -= tData.psValue->size();
m_oMutex.unlock();
std::string * poMessage = tData.psValue;
//如果该消息入队太久没有被处理,则抛弃,不发送
uint64_t llNowTime = Time::GetSteadyClockMS();
int iDelayMs = llNowTime > tData.llEnqueueAbsTime ? (int)(llNowTime - tData.llEnqueueAbsTime) : 0;
BP->GetNetworkBP()->TcpOutQueue(iDelayMs);
if (iDelayMs > TCP_OUTQUEUE_DROP_TIMEMS)
{
//PLErr("drop request because enqueue timeout, nowtime %lu unqueuetime %lu",
//llNowTime, tData.llEnqueueAbsTime);
delete poMessage;
return 0;
}
//计算发送缓冲区长度,需要加上4字节用于表示消息长度
int iBuffLen = poMessage->size();
int niBuffLen = htonl(iBuffLen + 4);
int iLen = iBuffLen + 4;
//申请缓冲区
m_oWriteCacheBuffer.Ready(iLen);
//将消息长度及消息内容拷贝到缓冲区
memcpy(m_oWriteCacheBuffer.GetPtr(), &niBuffLen, 4);
memcpy(m_oWriteCacheBuffer.GetPtr() + 4, poMessage->c_str(), iBuffLen);
m_iLeftWriteLen = iLen;
m_iLastWritePos = 0;
delete poMessage;
//PLImp("write len %d ip %s port %d", iLen, m_oAddr.getHost().c_str(), m_oAddr.getPort());
//开始发送消息,有可能消息太大一次发送不完
int iWriteLen = m_oSocket.send(m_oWriteCacheBuffer.GetPtr(), iLen);
if (iWriteLen < 0)
{
PLErr("fail, write len %d ip %s port %d",
iWriteLen, m_oAddr.getHost().c_str(), m_oAddr.getPort());
return -1;
}
//需要下次再发送
if (iWriteLen == 0)
{
//need wait next write
AddEvent(EPOLLOUT);
return 1;
}
//PLImp("real write len %d", iWriteLen);
//发送成功
if (iWriteLen == iLen)
{
m_iLeftWriteLen = 0;
m_iLastWritePos = 0;
//write done
}
//没有一次性全部发送完,剩下的需要下次发送
else if (iWriteLen < iLen)
{
//m_iLastWritePos和m_iLeftWriteLen分别用来表示上次写的位置以及剩下需要发送的长度
m_iLastWritePos = iWriteLen;
m_iLeftWriteLen = iLen - iWriteLen;
PLImp("write buflen %d smaller than expectlen %d", iWriteLen, iLen);
}
else
{
PLErr("write buflen %d large than expectlen %d", iWriteLen, iLen);
}
return 0;
}
结语
先介绍这么多吧,接下去会有更多相关的文章,特别是PhxPaxos中实现Paxos算法的那部分,相信看过Paxos相关论文的童鞋会对这块很感兴趣。
最后,附上PhxPaxos源码的地址:https://github.com/Tencent/phxpaxos
可进入我的博客查看原文