完成例程和完成端口
注:重叠IO有三种实现方式:完成事件、完成例程、完成端口,完成事件和WSAEventSelect模型差不多,也都限制于64个事件(可用多个线程扩展),这里就不再叙述
项目地址
1.完成例程和完成端口都是重叠IO模型,它们都是异步模型。
用一个形象的比喻:打印店打印纸张。同步非阻塞模型(例如Select)是轮到你了就叫你过来,其他时间你可以去干别的事情。异步模型是店员帮你打印好了再叫你过来,其他时间任你安排。
2.不同点:
2.1完成例程需要自己控制线程的执行顺序,而完成端口是由操作系统帮你完成。
完成例程的流程一般是:
2.1.1使用一个线程专门用于接收连接(WSAAccept),WSAAccept是阻塞函数。
2.1.2接收到Socket之后开启一个线程开始接收数据(WSARecv)。完成例程一般使用SleepEx函数休眠,以等待OS执行完毕,完成事件使用WSAWaitForMultipleEvents函数。这里使用线程池来减少创建销毁线程的次数。
2.2完成端口不需要自己管理线程的执行顺序。
2.2.1完成端口首先需要使用CreateIoCompletionPort函数创建一个CompletionPort,FileHandle参数可以为文件句柄或者Socket,这里首次创建时使用INVALID_HANDLE_VALUE。调用该函数操作系统会将该设备句柄添加到设备列表。返回的句柄需要保存,接下来会使用到。
2.2.2接下来创建一些工作线程,一般为cput的两倍即可,CreateIoCompletionPort最后一个参数可以指定线程数,默认为cpu数。每个线程都是一个死循环,在循环内第一条语句调用GetQueuedCompletionStatus函数,该函数会让操作系统将该线程压入到等待线程队列中。该队列是LIFO。当I/O完成队列非空,且工作线程并未超出总的并发数时,系统从等待线程队列中取出线程。该线程将从CreateIoCompletionPort函数后继续执行。
2.2.3接受连接可以使用WSAAccept或者AcceptEx(异步),和其他IO模型一样,接受到连接后使用WSARecv接受数据,这里就不需要使用SleepEx休眠了。当某个IO事件完成时OS会选择一个线程执行,所以使用WSARecv时需要传入一些参数表示这个是Read类型。
2.2.4GetQueuedCompletionStatus函数的lpCompletionKey参数可供用户自身使用,LPOVERLAPPED作为重叠IO必须使用的结构体也可利用传参。LPOVERLAPPED结构体的hEvent参数是事件模型所需要的,可作为一个指针传参。这里还有一个技巧,使用CONTAINING_RECORD可以根据一个结构体的第一个参数参数找到这个结构体,但这个参数必须是该结构体的第一个参数。
2.2.5除了当IO操作完成时OS时投递完成包,用户还可使用PostQueuedCompletionStatus在IOCP上投寄一个完成包。当需要释放所有线程时只需要使用PostQueuedCompletionStatus函数传入一个约定好的信号,即可让线程退出。
完成端口核心代码
void TService::InitCompletionPort()
{
SYSTEM_INFO sys;
GetSystemInfo(&sys);
int process = sys.dwNumberOfProcessors;
threadNum = 2 * process;
m_portWorks = new HANDLE[threadNum]; //两倍cpu数线程
//将该设备句柄添加到设备列表中
m_completionPortHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, threadNum);
if (m_completionPortHandle == nullptr)
{
OnError("create completion port error");
return;
}
for (int i = 0; i < threadNum; i++)
{
m_portWorks[i] = ::CreateThread(0, 0, CompletionPortWord, m_completionPortHandle, 0, nullptr);//端口作为线程参数
}
ThreadPool::GetInstance().commit([](void* p)
{
while (true) //接收连接线程,可使用AcceptEx异步接收连接
{
TService& service = *Word::GetInstance().GetManager<TService>();
service.clientSock = WSAAccept(service.listenSocket, (sockaddr*)&service.clientAddr, &service.addrSize, nullptr, NULL);
IOContext* ioContext = new IOContext(service.clientSock);
service.AcceptComplete(*ioContext);
}
},nullptr);
}
DWORD __stdcall TService::CompletionPortWord(LPVOID IpParm) //工作线程
{
HANDLE portHandle = IpParm; //重叠端口
DWORD lpNumberOfBytesRecvd; //
OVERLAPPED* IpOverlapped;
TService* service;
IOContext* iOContext;
while (true)
{
BOOL bReturn =GetQueuedCompletionStatus(portHandle, &lpNumberOfBytesRecvd, (LPDWORD)&service, &IpOverlapped, INFINITE); //加入就绪栈,内核会选择一个线程执行(FIFO)
//先计算出IpOverlapped地址的偏移量,然后根据特定的变量地址找到结构体地址,IpOverlapped需要作为第一个参数
iOContext = (IOContext*)CONTAINING_RECORD(IpOverlapped, IOContext, m_overlapped);
if (iOContext->m_Type==OperationType::ExitPosted) //线程退出标志
{
break;
}
if (bReturn == false) //出现错误
{
break;
}
if (lpNumberOfBytesRecvd == 0)
{
CloseHandle((HANDLE)iOContext->m_sockAccept);
service->RemoveChannel(iOContext->m_sockAccept);
continue;
}
switch (iOContext->m_Type)
{
case OperationType::AcceptPosted:
service->AcceptComplete(*iOContext);
case OperationType::RecvPosted:
service->RecvComplete(*iOContext);
break;
case OperationType::SendPosted:
service->SendComplete(*iOContext);
break;
}
}
return 0;
}
void TService::PostRecv(IOContext & ioContext)
{
ioContext.m_Type = OperationType::RecvPosted;
WSARecv(ioContext.m_sockAccept, &ioContext.m_wsaBuf, 1, &ioContext.m_numOfBytes, &ioContext.m_flags, &ioContext.m_overlapped, nullptr);
}
void TService::PostExit()
{
IOContext context(listenSocket);
context.m_Type = OperationType::ExitPosted;
for (int i = 0; i < threadNum; i++)
{
//每个线程都会收到一个释放信号
PostQueuedCompletionStatus(m_completionPortHandle, 1, (ULONG_PTR)this, (OVERLAPPED*)&context);
}
}
void TService::AcceptComplete(IOContext & ioContext)
{
SOCKET sock = this->clientSock;
HANDLE handle = CreateIoCompletionPort((HANDLE)sock, this->m_completionPortHandle, (DWORD)this, 0); //加入端口
TChannel* channel = (TChannel*)this->AddChannel(this->clientSock);
PostRecv(ioContext);
}