Muduo_Day4(EventLoop,Acceptor与TcpServer,TcpConnection深究)

EventLoop中时序深究:

时序1.png
时序2.png

过程:
1.在程序中我们一般会先构造一个EventLoop对象(muduo::EventLoop loop),EventLoop构造函数初始化列表,构造poller_,timerQueue_,wakeupFd_和wakeupChannel_等成员,在函数中具体表现为:

poller_(new Poller(this)),
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_))

其中wakeupFd_通过createEventfd()函数来初始化,在后者的函数中通过eventfd()实现线程高效的唤醒,wakeupFd_将用于后面的wakeup()函数,用来唤醒线程.

wakeupChannel_->setReadCallback(
         boost::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
wakeupChannel_->enableReading();

对于wakeupChannel_,调用setReadCallback()函数注册wakeupChannel回调函数为EventLoop::handleRead,后者会调用::read来read掉weakupFd_上的可读事件,避免一直触发,后面会说.另外wakeupChannel会调用enableReading,后者接着会调用Channel::update();进而调用EventLoop::UpdataChannel();最后调用Poller::updataChannel(),在此函数内会添加一个新的Channel或者更新已经存在的此channel关注的事件.在一开始会将wakeupChannel_添加进Poller::channels_中,其类型为(typedef std::map<int, Channel*> ChannelMap;);并使用wakeupChannel_.fd和wakeupChannel_.events构造一个struct pollfd pfd,然后将pfd push_back到Poller::pollfds_,以后会关注wakeupChannel_(wakeupFd_)上的可读事件.

2.在程序中事件的循环开始于EventLoop::loop(),其内部通过调用poller_->poll()(假设是采用的poll而不是epoll).::poll()函数会阻塞返回,即有事件发生,如socket上有数据可读写;Poller::poll中会调用fillActiveChannels()函数,该函数中会将pollfds_中的pollfd结构体中的revents设置为此channel的revents_,然后将次channel压入activeChannels中,然后将其作为EventLoop::activeChannels_返回.接着在EventLoop::loop中遍历activeChannels_,对每个channel调用Channel::handleEvent(),进而调用每个channel注册的读写回调函数.

3.EventLoop::runInLoop(const Functor& cb)函数是为了在IO线程中执行某个回调函数,其主要功能是可以跨线程调用.EventLoop::doPendingFunctors(),用于处理被挂起的事件,该函数只会在当前IO线程中调用.
测试程序

#include "EventLoop.h"
#include <stdio.h>

muduo::EventLoop* g_loop;
int g_flag = 0;

void run4()
{
  printf("run4(): pid = %d, flag = %d\n", getpid(), g_flag);
  g_loop->quit();
}

void run3()
{
  printf("run3(): pid = %d, flag = %d\n", getpid(), g_flag);
  g_loop->runAfter(5, run4);
  
  g_flag = 3;
  printf("%d\n",g_flag);
}

void run2()
{
  printf("run2(): pid = %d, flag = %d\n", getpid(), g_flag);
  g_loop->queueInLoop(run3);
}

void run1()
{
  g_flag = 1;
  printf("run1(): pid = %d, flag = %d\n", getpid(), g_flag);
  g_loop->runInLoop(run2);
  g_flag = 2;
}

int main()
{
  printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);

  muduo::EventLoop loop;
  g_loop = &loop;

  loop.runAfter(2, run1);
  loop.loop();
  printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);
}

执行结果:

main(): pid = 28847, flag = 0
run1(): pid = 28847, flag = 1
run2(): pid = 28847, flag = 1
run3(): pid = 28847, flag = 2
3
run4(): pid = 28847, flag = 3
main(): pid = 28847, flag = 3

此为单线程程序,在loop内定义了一个2秒之后执行的回调函数run1(),在run1()内,由于就是在当前IO线程,那么就会马上执行g_loop->runInLoop(run2),此时flag还是为1,接着在run2()中,执行g_loop->queueInLoop(run3);即把run3()函数加入到队列中,run2()返回,此时flag=2;此时主线程中的loop已经处理完事件了,之后就处理执行挂起事件,即执行doPendingFunctors(),就执行了run3(), run3()内设置另一个5s定时器,run3()执行完回到loop继续poll, 3s后超时执行run4(),此时flag=3.

Acceptor类

Acceptor用于accept(2)新TCP连接,并通过回调通知使用者.它是内部的class,供TcpServer使用,生命期由后者控制.
Acceptor的数据成员包括acceptSocket_、acceptChannel_,Acceptor的acceptSocket_是正在监听的socket(即server socket).acceptChannel_是用于观察acceptSocket_上的可读事件,万一有可读事件发生,那么Channel::handleEvent函数便会被调用,在其中回调Acceptor::handleRead().

  acceptChannel_.setReadCallback( boost::bind(&Acceptor::handleRead, this)); //构造函数中

然后在Acceptor::handleRead()函数中,使用accept(2)来接受新连接,并且可以回调用户的callback,可以看到回调函数中的第一个参数为accept返回的connfd.

void setNewConnectionCallback(const NewConnectionCallback& cb)
  { 
  newConnectionCallback_ = cb; 
}
void Acceptor::handleRead()
{
  loop_->assertInLoopThread();
  InetAddress peerAddr(0);
  //FIXME loop until no more
  int connfd = acceptSocket_.accept(&peerAddr);
  if (connfd >= 0) {
    if (newConnectionCallback_) {
      newConnectionCallback_(connfd, peerAddr);
    } else {
      sockets::close(connfd);
    }
  }
}

在该类中还有一个成员函数:listen,其作用是用于监听.

void Acceptor::listen()
{
  loop_->assertInLoopThread();
  listenning_ = true;
  acceptSocket_.listen();
  acceptChannel_.enableReading();
}

TcpServer与TcpConnection:

在上层应用程序中,我们一般不会直接调用Acceptor类,而是借助于TcpServer,把Acceptor作为数据成员使用.
该类的功能是管理accept(2)获得TcpConnection,是给用户直接使用的,生命期由用户来控制.
TcpServer的Acceptor与TcpConnection数据成员:

typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::map<std::string, TcpConnectionPtr> ConnectionMap
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
ConnectionCallback connectionCallback_;

TcpConnection类使用Channel来获得socket上的IO事件,他会自己处理可写事件,而把可读事件通过MessageCallback传达给客户,其没有直接发起连接的功能,其构造函数的参数是已经建立好的socket.因此其中有两个重要的数据成员:

boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;

时序分析

时序分析.png

在程序中我们创建一个TcpServer对象,在其构造函数中我们会初始化TcpServer::acceptor_对象,并且设置acceptor_中的handleRead中回调TcpServer::newConnection()函数.

acceptor_(new Acceptor(loop, listenAddr));
//setNewConnectionCallback函数在Acceptor中用来设置newConnectionCallback_,而此时将它设置为TcpServer::newConnection.
acceptor_->setNewConnectionCallback(
      boost::bind(&TcpServer::newConnection, this, _1, _2));

然后该TcpServer对象调用start(),在该函数中会调用Acceptor::listen()函数来启动监听;若有活动连接,即TcpServer::acceptor_中的acceptChannel_ 对象可读,则poll返回,调用Channel::handleEvent()来处理活动通道,在该函数中又被注册了Acceptor::handleRead()回调函数,在该函数中,使用accept(2)来接受新连接,并且会回调TcpServer构造函数中注册的TcpServer::newConnection()函数,在该函数中:

 TcpConnectionPtr conn(
 new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));
 connections_[connName] = conn;
 conn->setConnectionCallback(connectionCallback_);
 conn->setMessageCallback(messageCallback_);
 conn->setCloseCallback(
      boost::bind(&TcpServer::removeConnection, this, _1));
conn->connectEstablished();

会创建一个新的TcpConnectionPtr变量conn,在该变量的构造函数中我们设置了

// 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
channel_->setReadCallback(
    boost::bind(&TcpConnection::handleRead, this, _1));

并且在该函数中我们对conn设置了连接到来的回调函数和消息回调函数(这是在TcpServer构造函数中初始化的).最后我们调用TcpConnection::connectEstablished(),会将该TcpConnection所对应的通道加入到Poller关注.至此我们已经成功建立了一个连接,当对方发送数据到connfd,内核接受缓冲区不在为空,触发可读事件,
TcpConnection::channel_ 可读事件发生,poll返回,调用Channel::handleEvent()处理活动通道,调用TcpConnection::handleRead(),在handleRead()中会回调messageCallback_函数.

void TcpConnection::handleRead(Timestamp receiveTime)
{
    ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
    messageCallback_(shared_from_this(), buf, n);
}

测试程序

#include "TcpServer.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <stdio.h>

void onConnection(const muduo::TcpConnectionPtr& conn)
{
  if (conn->connected())
  {
    printf("onConnection(): new connection [%s] from %s\n",
           conn->name().c_str(),
           conn->peerAddress().toHostPort().c_str());
  }
  else
  {
    printf("onConnection(): connection [%s] is down\n",
           conn->name().c_str());
  }
}

void onMessage(const muduo::TcpConnectionPtr& conn,
               muduo::Buffer* buf,
               muduo::Timestamp receiveTime)
{
  printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
         buf->readableBytes(),
         conn->name().c_str(),
         receiveTime.toFormattedString().c_str());

  printf("onMessage(): [%s]\n", buf->retrieveAsString().c_str());
}

int main()
{
  printf("main(): pid = %d\n", getpid());

  muduo::InetAddress listenAddr(9981);
  muduo::EventLoop loop;

  muduo::TcpServer server(&loop, listenAddr);
  server.setConnectionCallback(onConnection);
  server.setMessageCallback(onMessage);
  server.start();

  loop.loop();
}

运行结果:

main(): pid = 5983
20191008 08:48:51.871004Z  5983 INFO  TcpServer::newConnection [0.0.0.0:9981] - new connection [0.0.0.0:9981#1] from 127.0.0.1:41400 - TcpServer.cc:58
20191008 08:48:51.871146Z  5983 DEBUG TcpConnection TcpConnection::ctor[0.0.0.0:9981#1] at 0x10624a0 fd=14 - TcpConnection.cc:36
onConnection(): new connection [0.0.0.0:9981#1] from 127.0.0.1:41400
onMessage(): received 4 bytes from connection [0.0.0.0:9981#1] at 20191008 08:49:05.568592
onMessage(): [jij
]
20191008 08:49:09.272725Z  5983 INFO  TcpServer::removeConnection [0.0.0.0:9981] - connection 0.0.0.0:9981#1 - TcpServer.cc:76
onConnection(): connection [0.0.0.0:9981#1] is down
20191008 08:49:09.272830Z  5983 DEBUG ~TcpConnection TcpConnection::dtor[0.0.0.0:9981#1] at 0x10624a0 fd=14 - TcpConnection.cc:50
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容

  • 多线程与并发服务器: (1)循环式(iterative)服务器:每处理一次请求,就关闭一次,使用的是TCP短连接,...
    angel_贝贝阅读 674评论 1 2
  • 名称 libev - 一个 C 编写的功能全面的高性能事件循环。 概要 示例程序 关于 libev Libev 是...
    hanpfei阅读 15,240评论 0 5
  • 2018-11-06 这一块操作系统主要分为两个部分,一个部分是书本上操作系统的知识,还有一部门是linux的相关...
    zuoerfeng阅读 2,220评论 0 1
  • NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的I/O模型,也...
    Demon_code阅读 390评论 0 0
  • 金融科技行业向来被视为培育独角兽企业的优良土壤,投资者热情持续高涨。根据 CB Insights 近年发布的独角兽...
    translator阅读 159评论 0 0