EventLoop中时序深究:
过程:
1.在程序中我们一般会先构造一个EventLoop对象(muduo::EventLoop loop),EventLoop构造函数初始化列表,构造poller_,timerQueue_,wakeupFd_和wakeupChannel_等成员,在函数中具体表现为:
poller_(new Poller(this)),
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_))
其中wakeupFd_通过createEventfd()函数来初始化,在后者的函数中通过eventfd()实现线程高效的唤醒,wakeupFd_将用于后面的wakeup()函数,用来唤醒线程.
wakeupChannel_->setReadCallback(
boost::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
对于wakeupChannel_,调用setReadCallback()函数注册wakeupChannel回调函数为EventLoop::handleRead,后者会调用::read来read掉weakupFd_上的可读事件,避免一直触发,后面会说.另外wakeupChannel会调用enableReading,后者接着会调用Channel::update();进而调用EventLoop::UpdataChannel();最后调用Poller::updataChannel(),在此函数内会添加一个新的Channel或者更新已经存在的此channel关注的事件.在一开始会将wakeupChannel_添加进Poller::channels_中,其类型为(typedef std::map<int, Channel*> ChannelMap;);并使用wakeupChannel_.fd和wakeupChannel_.events构造一个struct pollfd pfd,然后将pfd push_back到Poller::pollfds_,以后会关注wakeupChannel_(wakeupFd_)上的可读事件.
2.在程序中事件的循环开始于EventLoop::loop(),其内部通过调用poller_->poll()(假设是采用的poll而不是epoll).::poll()函数会阻塞返回,即有事件发生,如socket上有数据可读写;Poller::poll中会调用fillActiveChannels()函数,该函数中会将pollfds_中的pollfd结构体中的revents设置为此channel的revents_,然后将次channel压入activeChannels中,然后将其作为EventLoop::activeChannels_返回.接着在EventLoop::loop中遍历activeChannels_,对每个channel调用Channel::handleEvent(),进而调用每个channel注册的读写回调函数.
3.EventLoop::runInLoop(const Functor& cb)函数是为了在IO线程中执行某个回调函数,其主要功能是可以跨线程调用.EventLoop::doPendingFunctors(),用于处理被挂起的事件,该函数只会在当前IO线程中调用.
测试程序
#include "EventLoop.h"
#include <stdio.h>
muduo::EventLoop* g_loop;
int g_flag = 0;
void run4()
{
printf("run4(): pid = %d, flag = %d\n", getpid(), g_flag);
g_loop->quit();
}
void run3()
{
printf("run3(): pid = %d, flag = %d\n", getpid(), g_flag);
g_loop->runAfter(5, run4);
g_flag = 3;
printf("%d\n",g_flag);
}
void run2()
{
printf("run2(): pid = %d, flag = %d\n", getpid(), g_flag);
g_loop->queueInLoop(run3);
}
void run1()
{
g_flag = 1;
printf("run1(): pid = %d, flag = %d\n", getpid(), g_flag);
g_loop->runInLoop(run2);
g_flag = 2;
}
int main()
{
printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);
muduo::EventLoop loop;
g_loop = &loop;
loop.runAfter(2, run1);
loop.loop();
printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);
}
执行结果:
main(): pid = 28847, flag = 0
run1(): pid = 28847, flag = 1
run2(): pid = 28847, flag = 1
run3(): pid = 28847, flag = 2
3
run4(): pid = 28847, flag = 3
main(): pid = 28847, flag = 3
此为单线程程序,在loop内定义了一个2秒之后执行的回调函数run1(),在run1()内,由于就是在当前IO线程,那么就会马上执行g_loop->runInLoop(run2),此时flag还是为1,接着在run2()中,执行g_loop->queueInLoop(run3);即把run3()函数加入到队列中,run2()返回,此时flag=2;此时主线程中的loop已经处理完事件了,之后就处理执行挂起事件,即执行doPendingFunctors(),就执行了run3(), run3()内设置另一个5s定时器,run3()执行完回到loop继续poll, 3s后超时执行run4(),此时flag=3.
Acceptor类
Acceptor用于accept(2)新TCP连接,并通过回调通知使用者.它是内部的class,供TcpServer使用,生命期由后者控制.
Acceptor的数据成员包括acceptSocket_、acceptChannel_,Acceptor的acceptSocket_是正在监听的socket(即server socket).acceptChannel_是用于观察acceptSocket_上的可读事件,万一有可读事件发生,那么Channel::handleEvent函数便会被调用,在其中回调Acceptor::handleRead().
acceptChannel_.setReadCallback( boost::bind(&Acceptor::handleRead, this)); //构造函数中
然后在Acceptor::handleRead()函数中,使用accept(2)来接受新连接,并且可以回调用户的callback,可以看到回调函数中的第一个参数为accept返回的connfd.
void setNewConnectionCallback(const NewConnectionCallback& cb)
{
newConnectionCallback_ = cb;
}
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr(0);
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0) {
if (newConnectionCallback_) {
newConnectionCallback_(connfd, peerAddr);
} else {
sockets::close(connfd);
}
}
}
在该类中还有一个成员函数:listen,其作用是用于监听.
void Acceptor::listen()
{
loop_->assertInLoopThread();
listenning_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}
TcpServer与TcpConnection:
在上层应用程序中,我们一般不会直接调用Acceptor类,而是借助于TcpServer,把Acceptor作为数据成员使用.
该类的功能是管理accept(2)获得TcpConnection,是给用户直接使用的,生命期由用户来控制.
TcpServer的Acceptor与TcpConnection数据成员:
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::map<std::string, TcpConnectionPtr> ConnectionMap
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
ConnectionCallback connectionCallback_;
TcpConnection类使用Channel来获得socket上的IO事件,他会自己处理可写事件,而把可读事件通过MessageCallback传达给客户,其没有直接发起连接的功能,其构造函数的参数是已经建立好的socket.因此其中有两个重要的数据成员:
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;
时序分析
在程序中我们创建一个TcpServer对象,在其构造函数中我们会初始化TcpServer::acceptor_对象,并且设置acceptor_中的handleRead中回调TcpServer::newConnection()函数.
acceptor_(new Acceptor(loop, listenAddr));
//setNewConnectionCallback函数在Acceptor中用来设置newConnectionCallback_,而此时将它设置为TcpServer::newConnection.
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
然后该TcpServer对象调用start(),在该函数中会调用Acceptor::listen()函数来启动监听;若有活动连接,即TcpServer::acceptor_中的acceptChannel_ 对象可读,则poll返回,调用Channel::handleEvent()来处理活动通道,在该函数中又被注册了Acceptor::handleRead()回调函数,在该函数中,使用accept(2)来接受新连接,并且会回调TcpServer构造函数中注册的TcpServer::newConnection()函数,在该函数中:
TcpConnectionPtr conn(
new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1));
conn->connectEstablished();
会创建一个新的TcpConnectionPtr变量conn,在该变量的构造函数中我们设置了
// 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
并且在该函数中我们对conn设置了连接到来的回调函数和消息回调函数(这是在TcpServer构造函数中初始化的).最后我们调用TcpConnection::connectEstablished(),会将该TcpConnection所对应的通道加入到Poller关注.至此我们已经成功建立了一个连接,当对方发送数据到connfd,内核接受缓冲区不在为空,触发可读事件,
TcpConnection::channel_ 可读事件发生,poll返回,调用Channel::handleEvent()处理活动通道,调用TcpConnection::handleRead(),在handleRead()中会回调messageCallback_函数.
void TcpConnection::handleRead(Timestamp receiveTime)
{
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
messageCallback_(shared_from_this(), buf, n);
}
测试程序
#include "TcpServer.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <stdio.h>
void onConnection(const muduo::TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toHostPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const muduo::TcpConnectionPtr& conn,
muduo::Buffer* buf,
muduo::Timestamp receiveTime)
{
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
buf->readableBytes(),
conn->name().c_str(),
receiveTime.toFormattedString().c_str());
printf("onMessage(): [%s]\n", buf->retrieveAsString().c_str());
}
int main()
{
printf("main(): pid = %d\n", getpid());
muduo::InetAddress listenAddr(9981);
muduo::EventLoop loop;
muduo::TcpServer server(&loop, listenAddr);
server.setConnectionCallback(onConnection);
server.setMessageCallback(onMessage);
server.start();
loop.loop();
}
运行结果:
main(): pid = 5983
20191008 08:48:51.871004Z 5983 INFO TcpServer::newConnection [0.0.0.0:9981] - new connection [0.0.0.0:9981#1] from 127.0.0.1:41400 - TcpServer.cc:58
20191008 08:48:51.871146Z 5983 DEBUG TcpConnection TcpConnection::ctor[0.0.0.0:9981#1] at 0x10624a0 fd=14 - TcpConnection.cc:36
onConnection(): new connection [0.0.0.0:9981#1] from 127.0.0.1:41400
onMessage(): received 4 bytes from connection [0.0.0.0:9981#1] at 20191008 08:49:05.568592
onMessage(): [jij
]
20191008 08:49:09.272725Z 5983 INFO TcpServer::removeConnection [0.0.0.0:9981] - connection 0.0.0.0:9981#1 - TcpServer.cc:76
onConnection(): connection [0.0.0.0:9981#1] is down
20191008 08:49:09.272830Z 5983 DEBUG ~TcpConnection TcpConnection::dtor[0.0.0.0:9981#1] at 0x10624a0 fd=14 - TcpConnection.cc:50