Muduo_Day6(TcpConnection完善,发送数据以及shutdown)

TcpConnection发送数据

之前我们的Channel仅仅用到了ReadCallback,而并没有启用WriteCallback,在本节中会设置为在需要时关注可写事件,在TcpConnection中添加如下:

channel_->setWriteCallback(
      boost::bind(&TcpConnection::handleWrite, this));

相应的添加接收缓冲区与发送缓冲区:

Buffer inputBuffer_;            // 应用层接收缓冲区
Buffer outputBuffer_;           // 应用层发送缓冲区

在TcpServer::newConnection()中注册写完成回调函数:writeCompleteCallback_,即当所有数据都已经拷贝到内核缓冲区时回调该函数,即发送缓冲区被清空时.

conn->setWriteCompleteCallback(writeCompleteCallback_);

此函数的作用是如果我们向一个连接发送send()大流量的数据,发送频率不能太快,因为如果对等方接收不及时,则内核发送缓冲区会堆积数据,根据前面的分析,我们会将数据添加到outputBuffer_,导致outputBuffer_ 增长太快,对此可以关注WriteCompleteCallback_ ,当它被调用时表示outputBuffer_ 已经被清空,此时再次send(),可以有效的调整send()函数的发送频率.相应的还有一个highWaterMarkCallback_,可以当作是”高水位标“ 回调函数,即如果对等方接收不及时,outputBuffer_ 会一直增大,当增长到highWaterMark_ (具体数值)时,回调highWaterMarkCallback_ 函数,很可能在函数内主动shutdown.
除此之外,并在TcpConnection的接口中增加了send()函数,这两个函数都是可以跨线程调用的,因为其内部实现都增加了两个*InLoop函数(sendInloop函数),对应前新的接口函数,并使用Buffer作为输出缓冲区.
若在当前IO线程调用send()函数,它会把message复制一份,传给IO线程中的sendInLoop()来发送,这是通过当前线程处理doPendingFunctors 时被调用的sendInLoop函数.

void TcpConnection::sendInLoop(const StringPiece& message)
{
  sendInLoop(message.data(), message.size());
}

void TcpConnection::sendInLoop(const void* data, size_t len)
{
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  size_t remaining = len;
  bool error = false;
  if (state_ == kDisconnected)
  {
    LOG_WARN << "disconnected, give up writing";
    return;
  }
  // if no thing in output queue, try writing directly
  // 通道没有关注可写事件并且应用层发送缓冲区没有数据,直接write
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  {
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      // 写完了,回调writeCompleteCallback_
      if (remaining == 0 && writeCompleteCallback_)
      {
        loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0
    {
      nwrote = 0;
      if (errno != EWOULDBLOCK)
      {
        LOG_SYSERR << "TcpConnection::sendInLoop";
        if (errno == EPIPE) // FIXME: any others?
        {
          error = true;
        }
      }
    }
  }

  assert(remaining <= len);
  // 没有错误,并且还有未写完的数据(说明内核发送缓冲区满,要将未写完的数据添加到output buffer中)
  if (!error && remaining > 0)
  {
    LOG_TRACE << "I am going to write more data";
    size_t oldLen = outputBuffer_.readableBytes();
    // 如果超过highWaterMark_(高水位标),回调highWaterMarkCallback_
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)
    {
      loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
    if (!channel_->isWriting())
    {
      channel_->enableWriting();        // 关注POLLOUT事件
    }
  }

写的过程是先尝试直接往内核缓冲区内write,如果内核缓冲区满了则将未写完的数据添加到outputBuffer_中,并开始关注可写事件,即POLLOUT事件,当内核缓冲区不满时,调用handleWrite()函数,在TcpConnection中设置了可写回调函数,以发送剩余的数据.

// 内核发送缓冲区有空间了,回调该函数
void TcpConnection::handleWrite()
{
  loop_->assertInLoopThread();
  if (channel_->isWriting())
  {
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)    // 发送缓冲区已清空
      {
        channel_->disableWriting();     // 停止关注POLLOUT事件,以免出现busy loop
        if (writeCompleteCallback_)     // 回调writeCompleteCallback_
        {
          // 应用层发送缓冲区被清空,就回调用writeCompleteCallback_
          loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
        }
        if (state_ == kDisconnecting)   // 发送缓冲区已清空并且连接状态是kDisconnecting, 要关闭连接
        {
          shutdownInLoop();     // 关闭连接
        }
      }
      else
      {
        LOG_TRACE << "I am going to write more data";
      }
    }
    else
    {
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
      //   shutdownInLoop();
      // }
    }
  }
  else
  {
    LOG_TRACE << "Connection fd = " << channel_->fd()
              << " is down, no more writing";
  }
}

从outputBuffer中取出剩余的数据写入到内核缓冲区,当然有可能这一次还是不能完全的写入,但只要应用层的发送缓冲区还有数据就会一直关注POLLOUT事件,当内核缓冲区又有空间时,就再次回调此函数,继续写入发送.一旦数据发送完毕,立刻停止关注可写事件,以免造成busy loop.

TcpConnection中的shutdown()函数

TcpConnection中提供了shutdown()函数,没有提供close函数,就是为了保证数据收发的完整性.即当连接正在处于关闭状态,网络库会转而调用shutdownInloop()函数,继续执行关闭过程,该函数也是线程安全的.

void TcpConnection::shutdown()
{
  // FIXME: use compare and swap
  if (state_ == kConnected)
  {
    setState(kDisconnecting);
    // FIXME: shared_from_this()?
    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
  }
}

void TcpConnection::shutdownInLoop()
{
  loop_->assertInLoopThread();
  if (!channel_->isWriting())
  {
    // we are not writing
    socket_->shutdownWrite();
  }
}
void Socket::shutdownWrite()
{
    sockets::shutdownWrite(sockfd_);
}

void sockets::shutdownWrite(int sockfd)
{
    int ret = ::shutdown(sockfd, SHUT_WR);
    // 检查错误
}

若在关闭连接的过程中,应用层缓冲区还有数据没有发完,即还在关注POLLOUT事件,那么shutdownInLoop()中会先判断isWriting() 为true,所以并不会直接执行shutdownWrite()函数.而当数据已经完全发完时,在
handleWrite() 函数中,会执行channel_->disableWriting();停止关注了POLLOUT事件,而且会判断是否为kDisconnecting状态,随即调用shutdownInLoop()函数关闭连接.

shutdown时序图.png

当我们这边也已经发送完数据了,于是我们调用shutdownInloop中的shutdownWrite,发送TCP FIN分节,对方会读到0 字节,然后对方通常会关闭连接(无论shutdownWrite() 还是close()),可读事件发生调用handleRead(),这样muduo 会读到0 字节,调用handleClose(),进而调用connectionCallback_(定义了默认的muduo::net::defaultConnectionCallback), 这样客户代码就知道对方断开连接了(判断是否connected()),最后调用closeCallback_ (TcpServer::removeConnection(),newConnection函数中对conn注册了)
测试:

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <boost/bind.hpp>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

class TestServer
{
 public:
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer")
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection, this, _1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));

    message1_.resize(100);
    message2_.resize(200);
    std::fill(message1_.begin(), message1_.end(), 'A');
    std::fill(message2_.begin(), message2_.end(), 'B');
  }
   void start()
  {
      server_.start();
  }

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
      conn->send(message1_);
      conn->send(message2_);
      conn->shutdown();
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
  void onMessage(const TcpConnectionPtr& conn,Buffer* buf, Timestamp receiveTime)
  {
    muduo::string msg1 = buf->retrieveAllAsString();
    muduo::string msg(msg1); 
    printf("onMessage(): received %zd bytes from connection [%s] at %s\n",msg.size(), conn->name().c_str(),receiveTime.toFormattedString().c_str());
    conn->send(msg);
  }

  EventLoop* loop_;
  TcpServer server_;

  muduo::string message1_;
  muduo::string message2_;
};
int main()
{
  printf("main(): pid = %d\n", getpid());

  InetAddress listenAddr(8888);
  EventLoop loop;

  TestServer server(&loop, listenAddr);
  server.start();
  loop.loop();
}

运行结果
main(): pid = 28316
20191013 06:25:26.252774Z 28316 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20191013 06:25:26.252905Z 28316 TRACE EventLoop EventLoop created 0x7FFDBEA4B3C0 in thread 28316 - EventLoop.cc:62
20191013 06:25:26.252926Z 28316 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20191013 06:25:26.253053Z 28316 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20191013 06:25:26.253075Z 28316 TRACE loop EventLoop 0x7FFDBEA4B3C0 start looping - EventLoop.cc:94
20191013 06:25:28.237390Z 28316 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 06:25:28.237708Z 28316 TRACE printActiveChannels {6: IN } - EventLoop.cc:257
20191013 06:25:28.237782Z 28316 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:39706 - TcpServer.cc:93
20191013 06:25:28.237824Z 28316 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0xE777D0 fd=8 - TcpConnection.cc:62
20191013 06:25:28.237843Z 28316 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20191013 06:25:28.237860Z 28316 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20191013 06:25:28.237875Z 28316 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:231
20191013 06:25:28.237888Z 28316 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:39706
20191013 06:25:28.238018Z 28316 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:236
20191013 06:25:28.238036Z 28316 TRACE newConnection [5] usecount=2 - TcpServer.cc:122
20191013 06:25:28.238095Z 28316 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 06:25:28.238133Z 28316 TRACE printActiveChannels {8: IN HUP } - EventLoop.cc:257
20191013 06:25:28.238147Z 28316 TRACE handleEvent [6] usecount=2 - Channel.cc:67
20191013 06:25:28.238183Z 28316 TRACE handleClose fd = 8 state = 3 - TcpConnection.cc:297
20191013 06:25:28.238194Z 28316 TRACE updateChannel fd = 8 events = 0 - EPollPoller.cc:104
onConnection(): connection [TestServer:0.0.0.0:8888#1] is down
20191013 06:25:28.238215Z 28316 TRACE handleClose [7] usecount=3 - TcpConnection.cc:305
20191013 06:25:28.238229Z 28316 INFO TcpServer::removeConnectionInLoop [TestServer] - connection TestServer:0.0.0.0:8888#1 - TcpServer.cc:153
20191013 06:25:28.238238Z 28316 TRACE removeConnectionInLoop [8] usecount=6 - TcpServer.cc:157
20191013 06:25:28.238253Z 28316 TRACE removeConnectionInLoop [9] usecount=5 - TcpServer.cc:159
20191013 06:25:28.238272Z 28316 TRACE removeConnectionInLoop [10] usecount=6 - TcpServer.cc:170
20191013 06:25:28.238284Z 28316 TRACE handleClose [11] usecount=3 - TcpConnection.cc:308
20191013 06:25:28.238295Z 28316 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20191013 06:25:28.238308Z 28316 TRACE removeChannel fd = 8 - EPollPoller.cc:147
20191013 06:25:28.238328Z 28316 DEBUG ~TcpConnection TcpConnection::dtor[TestServer:0.0.0.0:8888#1] at 0xE777D0 fd=8 - TcpConnection.cc:69
分析:程序使用nc命令进行测试,当程序建立之后,会回调TestServer::onConnection()函数,然后会依次会调用send函数发送message1与message2,然后shutdown()会调用shutdownInLoop函数,会一直等到outputBuffer_ 数据全部写到内核发送缓冲区才会真正关闭写端,客户端读到数据后最后read 返回0,客户端close导致服务端最终removeConnection.由最后可以看到,TcpConnection的析构会在handleEvent处理完了之后才会执行.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,717评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,501评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,311评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,417评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,500评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,538评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,557评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,310评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,759评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,065评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,233评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,909评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,548评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,172评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,420评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,103评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,098评论 2 352