多线程与并发服务器:
(1)循环式(iterative)服务器:每处理一次请求,就关闭一次,使用的是TCP短连接,只使用单线程模式,没法充分发挥多核CPU的优势.
(2)惊群现象:当一个客户端连接过来时,有多个子进程处于accept状态,多个进程都有返回,但只有一个进程返回成功.
TCP网络编程的本质是处理三个半事件:
- 连接建立:服务器accept(被动)接受连接,客户端connect(主动)发起连接
- 连接断开:主动断开(close、shutdown),被动断开(read返回0)
- 消息到达:文件描述符可读
-
消息发送完毕:这算半个。对于低流量的服务,可不必关心这个事件;这里的发送完毕是指数据写入操作系统缓冲区,将由TCP协议栈负责数据的发送与重传,不代表对方已经接收到数据。
Eventloop
1.one loop per thread意思是说每个线程最多只能有一个Eventloop对象;Eventloop对象在构造的时候,会检查当前线程是否已经创建了其他Eventloop对象,如果已经创建了,则会终止程序;Eventloop构造函数会记住本对象所属的线程(threadId);创建了eventloop对象的线程成为IO线程,其功能是运行时间循环(Eventloop::loop).
2.EventLoop中添加了一个runInLoop函数接口,该接口用于执行某个用户任务回调,即EventLoop::runInLoop(const Functor& cb),Functor为boost::function<void>.如果用户在当前IO线程调用这个函数,回调会同步执行;如果用户在其他的线程调用runInLoop,则会紧接着调用queueInLoop函数,将回调加入队列,同时通过wakeup()唤醒poll()调用容器内的回调.
3.由于IO线程平时阻塞在时间循环Eventloop::loop()中的poll调用上,为了让IO线程能够立刻执行用户回调,我们需要设法唤醒IO线程,这里用的是eventfd(2)函数,可以高效的实现唤醒,不必管理缓冲区.
TimerQueue定时器类
TimerQueue是EventLoop的组件之一,其功能是提供定时任务和周期任务.
TimerQueue数据结构的选择,能快速根据当前时间找到已到期的定时器,也要高效的添加和删除Timer,因而可以用二叉搜索树,用map或者set
typedef std::pair<Timestamp, Timer> Entry;
typedef std::set<Entry> TimerList;
在该类中,使用timerfd_来实现定时任务,其原因是:
(1)sleep / alarm / usleep 在实现时有可能用了信号 SIGALRM,在多线程程序中处理信号是个相当麻烦的事情,应当尽量避免.
(2)nanosleep 和 clock_nanosleep 是线程安全的,但是在非阻塞网络编程中,绝对不能用让线程挂起的方式来等待一段时间,程序会失去响应。正确的做法是注册一个时间回调函数.
(3)getitimer 和 timer_create 也是用信号来 deliver 超时,在多线程程序中也会有麻烦.
timer_create 可以指定信号的接收方是进程还是线程,算是一个进步,不过在信号处理函数(signal handler)能做的事情实在很受限.
(4)timerfd_create 把时间变成了一个文件描述符,该“文件”在定时器超时的那一刻变得可读,这样就能很方便地融入到 select/poll 框架中,用统一的方式来处理 IO 事件和超时事件,这也正是 Reactor 模式的长处.
在该类中仅对外提供了两个接口:
- addTimer():用于添加一个定时器超时执行回调函数;
- cancel():取消一个定时器;
私有成员函数相对复杂:
void addTimerInLoop(Timer* timer);
void cancelInLoop(TimerId timerId);
// called when timerfd alarms
void handleRead();
// move out all expired timers
std::vector<Entry> getExpired(Timestamp now);
void reset(const std::vector<Entry>& expired, Timestamp now);
bool insert(Timer* timer);
- 添加定时器
addTimer接口使用了EventLoop::runInLoop接口来执行TimerQueue::addTimerInLoop()函数的,在后者中会调用私有成员函数insert()函数插入定时器;如果添加的定时器是最早触发的那一个,则在addTimerInLoop()会调用resetTimerfd->timerfd_settime启动该定时器. -
更新定时器
void handleRead(); //channel事件处理器中注册的回调函数,该函数获取已经到期的事件并处理随后更新列表
std::vector<Entry> getExpired(Timestamp now);//获取一组过期的定时器,并从TimerList列表中删除
void reset(const std::vector<Entry>& expired, Timestamp now);//如果是周期型任务则重新配置时间插入,随后调用resetTimerfd更新下一定时任务.
过程:TimerQueue 中会有多个定时器,一次性的和重复的,事件循环开始EventLoop::loop(),当最早到期定时器超时时,poll() 返回timerfd_ 的可读事件(timerfdChannel_),调用Channel::handleEvent(),调用readCallback_(receiveTime); 进而调用Channel::setReadCallback 注册的TimerQueue::handleRead(), 在函数内先通过readTimerfd()中的read掉timerfd_数据,避免一直触发可读事件,接着遍历TimerQueue中此时所有超时的定时器,调用每个定时器构造时传递的回调函数.
buffer类的设计:
1.Muduo中TcpConnection类中没有提供close(),而只提供shutdown(),这么做是为了服务端半关闭的状态,关闭了写方向上的连接,只能读,保证了数据收发的完整性.
2.Muduo EventLoop使用的epoll的LT模式,原因是:一是为了与传统的poll兼容,在文件描述符数目较少,而活动文件描述符比较高时,epoll并不比poll见得高效,必要时可以切换到poll.二是LT模式编程更为简单.三是读写的时候不用等到出现EAGAIN,可以节省系统调用次数,降低延迟.
3.Muduo中是如何设计并使用缓冲区的?(Muduo服务端编程p208)
为每个连接建立分配一个50kb或者其它固定大小的的缓冲区的话,那么如果有10000个并发连接,那么将耗费巨大的内存.在Muduo中,其设计结合栈上的空间巧妙的解决了这个问题,利用了临时栈上空间,在栈上准备一个65536字节的extrabuf,然后利用readv()读取数据,iovec结构体有两块,一块指向muduo buffer中的writable字节,另一个指向extrabuf.
4.当接收到数据时,从内核缓冲区读到应用层的input buffer内,然后通知上层的应用程序,OnMessage(Buffer* buf)回调,根据应用层协议判定是否是一个完整的包,(解码编码codec),如果不是一条完整的消息,不会取走数据,直到收到一条完整的消息.
5.input和output是针对客户代码而言的,客户代码从input中读,而网络库TcpConnection则是从socket读取数据之后写入到input buffer,利用Buffer::readfd()实现的,里面通过readv函数来读取数据(readv和writev函数用于在一次函数调用中读、写多个非连续缓冲区).其次,客户代码会把数据写入output buffer,这一步是有TcpConnection::send函数实现的.而TcpConnection则是从output buffer读取数据并写入了socket.网络库与客户代码相对于buffer的读写正好相反.
Reactor的关键结构
channel类:
Reactor中最核心的为事件分发机制,即将IO 复用拿到的IO事件分发给各个文件描述符的事件处理函数.
每个channel对象自始至终只属于一个EventLoop,因此每个channel对象也只属于某一个IO线程,每个channel对象自始至终只负责一个文件描述符fd的IO事件分发.channel的生命周期不属于持有它的Eventloop,也不属于epoll或者poll,一般是由TcpConnection,Connector或者自己定义的类(内含channel对象)来管理.
channel是Acceptor、Connector、EventLoop、TimerQueue、TcpConnection的成员,生命期由后者控制.
- Channel::handleEvent()是channel的核心,它由EventLoop::loop()调用,它的功能是根据revents_值分别调用不同的用户回调.
- Channel::update()它会调用Eventloop::updateChannel(),后者又会调用Poller::updateChannel(),在该函数中会注册IO事件.
Poller类:
Poller类是IO复用的封装,在muduo中是为数不多的抽象基类,因为为了支持poll和epoll两种IO复用的机制.Poller是EventLoop的间接成员,只供其owner EventLoop在IO线程中调用,因此无需加锁.Poller并不拥有管理文件描述符事件的Channel, Channel在析构之前必须自己.unregister(EventLoop::removeChannel(),避免空悬指针
连接过程:EventLoop中的loop函数实际上调用了poller中的poll函数,其会返回活跃的channel;之后会调用这些channel的handleEvent()函数,就会调用一些用户定义的回调函数;而且有可能会有另一个channelB,同样会调用handleEvent()函数和回调函数.
Acceptor类
Acceptor类的数据成员包括Socket,Channel对象.Acceptor类中的socket对象是正在监听的套接字,当Channel对象观察到该套接字的可读事件,处于活跃的状态,poller::poll函数就返回这活跃的通道,并调用Channel::handleEvent函数进行处理事件,由于是可读事件,它又会调用Acceptor::handleRead函数;在handleReactor中,调用accept继续接受新连接,并回调用户的callback.
TcpConnection类:
TcpConnection类是muduo里默认使用shared_ptr管理的类,也是唯一一个继承enable_shared_from_this的class.其中enable_shared_from_this的作用是使得即使该对象已经被 shared_ptr 管理着, 也不会造成对象被两个独立的智能指针管理,以免二次析构.
enable_shared_from_this:
其常见的使用场合:当类A被share_ptr管理,且在类A的成员函数里需要把当前类对象作为参数传给其他函数时,就需要传递一个指向自身的share_ptr。我们就使类A继承enable_share_from_this,然后通过其成员函数share_from_this()返回当指向自身的share_ptr.
shared_from_this() 会用当前对象的裸指针构造一个临时智能指针对象,引用计数加1,但马上会被析构,又减1,故无论调用多少次,对引用计数都没有影响.而若直接使用this指针传递对象(shared_ptr(this)),可能会构建一个新的shared_ptr对象,并不是直接将我们之前管理的对象的shared_ptr拷贝过去,两个非共享的shared_ptr指向同一个对象,未增加引用计数,反而会被析构两次.
#include <iostream>
#include <memory>
using namespace std;
class hhyA : public enable_shared_from_this<hhyA>
{
public:
shared_ptr<hhyA>getptr()
{
//return shared_ptr<hhyA>(shared_from_this());
return shared_ptr<hhyA>(this);
};
~hhyA()
{
cout << "析构" << endl;
}
};
int main()
{
shared_ptr<hhyA>B(new hhyA);
shared_ptr<hhyA>C = B->getptr();
cout << B.use_count() << endl;
cout << C.use_count() << endl;
}
输出结果为:
1
1
析构
析构
被析构两次,将return shared_ptr<hhyA>(this)改为return shared_ptr<hhyA>(shared_from_this())之后,运行结果为
2
2
析构
shared_from_this构建一个临时对象,会自动析构.
boost::any
可以存放任何类型的上下文对象;其比void*的优点在于,对于任意类型都可以安全的存储和安全的取回;可以在标准库容器中存放不用类型的方法,比如说vector<boost::any>.
muduo中TcpServer接收新连接的时序图:
过程:当一个连接到来loop函数->返回一个活跃的channel通道->返回之后调用channel的handlEvent()来处理该事件->连接到来属于可读事件->又回调了accept中的handleRead()函数来处理该可读事件->在该函数中又调用了accept函数来处理新连接->又回调了TcpServer中的newconnection函数(在TcpServer构造函数中就设定好了)->在该函数中新建了一个TcpConnection对象,通过该对象调用了connectEstablished()->在该函数中回调了一个用户注册的函数connCb(),该函数是在TcpServer中的setconnectioncallback()的函数注册的.
TcpConnectionPtr conn(
new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_)
conn->setMessageCallback(messageCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1));
conn->connectEstablished();
实际上我们的应用程序不会实际上去调用TcpConnection中的setConnectionCallback,setMessageCallback函数,只会调用TcpServer中的setConnectionCallback函数,并把设置好的connectionCallback_传递给TcpConnection对象conn.
muduo中TcpConnection断开连接的时序图:
muduo中只有一种关闭连接的方式:被动关闭,即对方客户先关闭,通过本地的TcpConnection中的handleRead函数中的read,当read()函数返回0时,触发关闭逻辑.
简要的过程:当连接断开时,TcpConnection中的通道会处于活跃的状态,然后loop会返回该活跃的通道,并调用handleEvent函数处理,在该函数中连接关闭也属于可读事件,于是会调用handleRead函数,当read返回值是否为0,若为0则调用handleClose函数,该函数的主要功能是调用closeCallback_,但这个对象是给TcpServer和TcpClient用的,用于通知它们移除持有的TcpConnection,而不是给用户使用的.
void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpConnection::handleClose state = " << state_;
assert(state_ == kConnected);
// we don't close fd, leave it to dtor, so we can find leaks easily.
channel_->disableAll();
// must be the last line
TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);
closeCallback_(guardThis); //会调用Tcpserver的removeConnection
}
在该函数中又会调用TcpServer中的removeConnection函数,并用erase从这个连接对象移除时,并不会马上销毁析构,因为使用shared_ptr管理,引用计数并不为0.此时不能立即销毁的原因,因为TcpConnection包含channel对象,否则channel对象也会一并销毁,而我们正在调用channel中的handleEvent函数.
因此为了保证TcpConnection对象的生存期长于channel对象,我们要使用shared_ptr来管理对象.当连接到来时,创建一个TcpConnection对象,立刻用shared_ptr对象来管理.并在removeConnection函数中要用EventLoop::queueInLoop().避免channel对象过早销毁,这里的boost::bind让TcpConnection的生命期长到调用connectDestroy的时刻.
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnection [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());
assert(n == 1); (void)n;
loop_->queueInLoop(
boost::bind(&TcpConnection::connectDestroyed, conn));
}