TcpConnection发送数据
之前我们的Channel仅仅用到了ReadCallback,而并没有启用WriteCallback,在本节中会设置为在需要时关注可写事件,在TcpConnection中添加如下:
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
相应的添加接收缓冲区与发送缓冲区:
Buffer inputBuffer_; // 应用层接收缓冲区
Buffer outputBuffer_; // 应用层发送缓冲区
在TcpServer::newConnection()中注册写完成回调函数:writeCompleteCallback_,即当所有数据都已经拷贝到内核缓冲区时回调该函数,即发送缓冲区被清空时.
conn->setWriteCompleteCallback(writeCompleteCallback_);
此函数的作用是如果我们向一个连接发送send()大流量的数据,发送频率不能太快,因为如果对等方接收不及时,则内核发送缓冲区会堆积数据,根据前面的分析,我们会将数据添加到outputBuffer_,导致outputBuffer_ 增长太快,对此可以关注WriteCompleteCallback_ ,当它被调用时表示outputBuffer_ 已经被清空,此时再次send(),可以有效的调整send()函数的发送频率.相应的还有一个highWaterMarkCallback_,可以当作是”高水位标“ 回调函数,即如果对等方接收不及时,outputBuffer_ 会一直增大,当增长到highWaterMark_ (具体数值)时,回调highWaterMarkCallback_ 函数,很可能在函数内主动shutdown.
除此之外,并在TcpConnection的接口中增加了send()函数,这两个函数都是可以跨线程调用的,因为其内部实现都增加了两个*InLoop函数(sendInloop函数),对应前新的接口函数,并使用Buffer作为输出缓冲区.
若在当前IO线程调用send()函数,它会把message复制一份,传给IO线程中的sendInLoop()来发送,这是通过当前线程处理doPendingFunctors 时被调用的sendInLoop函数.
void TcpConnection::sendInLoop(const StringPiece& message)
{
sendInLoop(message.data(), message.size());
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool error = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
// 通道没有关注可写事件并且应用层发送缓冲区没有数据,直接write
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
// 写完了,回调writeCompleteCallback_
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE) // FIXME: any others?
{
error = true;
}
}
}
}
assert(remaining <= len);
// 没有错误,并且还有未写完的数据(说明内核发送缓冲区满,要将未写完的数据添加到output buffer中)
if (!error && remaining > 0)
{
LOG_TRACE << "I am going to write more data";
size_t oldLen = outputBuffer_.readableBytes();
// 如果超过highWaterMark_(高水位标),回调highWaterMarkCallback_
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting(); // 关注POLLOUT事件
}
}
写的过程是先尝试直接往内核缓冲区内write,如果内核缓冲区满了则将未写完的数据添加到outputBuffer_中,并开始关注可写事件,即POLLOUT事件,当内核缓冲区不满时,调用handleWrite()函数,在TcpConnection中设置了可写回调函数,以发送剩余的数据.
// 内核发送缓冲区有空间了,回调该函数
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) // 发送缓冲区已清空
{
channel_->disableWriting(); // 停止关注POLLOUT事件,以免出现busy loop
if (writeCompleteCallback_) // 回调writeCompleteCallback_
{
// 应用层发送缓冲区被清空,就回调用writeCompleteCallback_
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting) // 发送缓冲区已清空并且连接状态是kDisconnecting, 要关闭连接
{
shutdownInLoop(); // 关闭连接
}
}
else
{
LOG_TRACE << "I am going to write more data";
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
从outputBuffer中取出剩余的数据写入到内核缓冲区,当然有可能这一次还是不能完全的写入,但只要应用层的发送缓冲区还有数据就会一直关注POLLOUT事件,当内核缓冲区又有空间时,就再次回调此函数,继续写入发送.一旦数据发送完毕,立刻停止关注可写事件,以免造成busy loop.
TcpConnection中的shutdown()函数
TcpConnection中提供了shutdown()函数,没有提供close函数,就是为了保证数据收发的完整性.即当连接正在处于关闭状态,网络库会转而调用shutdownInloop()函数,继续执行关闭过程,该函数也是线程安全的.
void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)
{
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
}
}
void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())
{
// we are not writing
socket_->shutdownWrite();
}
}
void Socket::shutdownWrite()
{
sockets::shutdownWrite(sockfd_);
}
void sockets::shutdownWrite(int sockfd)
{
int ret = ::shutdown(sockfd, SHUT_WR);
// 检查错误
}
若在关闭连接的过程中,应用层缓冲区还有数据没有发完,即还在关注POLLOUT事件,那么shutdownInLoop()中会先判断isWriting() 为true,所以并不会直接执行shutdownWrite()函数.而当数据已经完全发完时,在
handleWrite() 函数中,会执行channel_->disableWriting();停止关注了POLLOUT事件,而且会判断是否为kDisconnecting状态,随即调用shutdownInLoop()函数关闭连接.
当我们这边也已经发送完数据了,于是我们调用shutdownInloop中的shutdownWrite,发送TCP FIN分节,对方会读到0 字节,然后对方通常会关闭连接(无论shutdownWrite() 还是close()),可读事件发生调用handleRead(),这样muduo 会读到0 字节,调用handleClose(),进而调用connectionCallback_(定义了默认的muduo::net::defaultConnectionCallback), 这样客户代码就知道对方断开连接了(判断是否connected()),最后调用closeCallback_ (TcpServer::removeConnection(),newConnection函数中对conn注册了)
测试:
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
{
public:
TestServer(EventLoop* loop,
const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "TestServer")
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
message1_.resize(100);
message2_.resize(200);
std::fill(message1_.begin(), message1_.end(), 'A');
std::fill(message2_.begin(), message2_.end(), 'B');
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
conn->send(message1_);
conn->send(message2_);
conn->shutdown();
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr& conn,Buffer* buf, Timestamp receiveTime)
{
muduo::string msg1 = buf->retrieveAllAsString();
muduo::string msg(msg1);
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",msg.size(), conn->name().c_str(),receiveTime.toFormattedString().c_str());
conn->send(msg);
}
EventLoop* loop_;
TcpServer server_;
muduo::string message1_;
muduo::string message2_;
};
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TestServer server(&loop, listenAddr);
server.start();
loop.loop();
}
运行结果
main(): pid = 28316
20191013 06:25:26.252774Z 28316 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20191013 06:25:26.252905Z 28316 TRACE EventLoop EventLoop created 0x7FFDBEA4B3C0 in thread 28316 - EventLoop.cc:62
20191013 06:25:26.252926Z 28316 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20191013 06:25:26.253053Z 28316 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20191013 06:25:26.253075Z 28316 TRACE loop EventLoop 0x7FFDBEA4B3C0 start looping - EventLoop.cc:94
20191013 06:25:28.237390Z 28316 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 06:25:28.237708Z 28316 TRACE printActiveChannels {6: IN } - EventLoop.cc:257
20191013 06:25:28.237782Z 28316 INFO TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:39706 - TcpServer.cc:93
20191013 06:25:28.237824Z 28316 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0xE777D0 fd=8 - TcpConnection.cc:62
20191013 06:25:28.237843Z 28316 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20191013 06:25:28.237860Z 28316 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20191013 06:25:28.237875Z 28316 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:231
20191013 06:25:28.237888Z 28316 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:39706
20191013 06:25:28.238018Z 28316 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:236
20191013 06:25:28.238036Z 28316 TRACE newConnection [5] usecount=2 - TcpServer.cc:122
20191013 06:25:28.238095Z 28316 TRACE poll 1 events happended - EPollPoller.cc:65
20191013 06:25:28.238133Z 28316 TRACE printActiveChannels {8: IN HUP } - EventLoop.cc:257
20191013 06:25:28.238147Z 28316 TRACE handleEvent [6] usecount=2 - Channel.cc:67
20191013 06:25:28.238183Z 28316 TRACE handleClose fd = 8 state = 3 - TcpConnection.cc:297
20191013 06:25:28.238194Z 28316 TRACE updateChannel fd = 8 events = 0 - EPollPoller.cc:104
onConnection(): connection [TestServer:0.0.0.0:8888#1] is down
20191013 06:25:28.238215Z 28316 TRACE handleClose [7] usecount=3 - TcpConnection.cc:305
20191013 06:25:28.238229Z 28316 INFO TcpServer::removeConnectionInLoop [TestServer] - connection TestServer:0.0.0.0:8888#1 - TcpServer.cc:153
20191013 06:25:28.238238Z 28316 TRACE removeConnectionInLoop [8] usecount=6 - TcpServer.cc:157
20191013 06:25:28.238253Z 28316 TRACE removeConnectionInLoop [9] usecount=5 - TcpServer.cc:159
20191013 06:25:28.238272Z 28316 TRACE removeConnectionInLoop [10] usecount=6 - TcpServer.cc:170
20191013 06:25:28.238284Z 28316 TRACE handleClose [11] usecount=3 - TcpConnection.cc:308
20191013 06:25:28.238295Z 28316 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20191013 06:25:28.238308Z 28316 TRACE removeChannel fd = 8 - EPollPoller.cc:147
20191013 06:25:28.238328Z 28316 DEBUG ~TcpConnection TcpConnection::dtor[TestServer:0.0.0.0:8888#1] at 0xE777D0 fd=8 - TcpConnection.cc:69
分析:程序使用nc命令进行测试,当程序建立之后,会回调TestServer::onConnection()函数,然后会依次会调用send函数发送message1与message2,然后shutdown()会调用shutdownInLoop函数,会一直等到outputBuffer_ 数据全部写到内核发送缓冲区才会真正关闭写端,客户端读到数据后最后read 返回0,客户端close导致服务端最终removeConnection.由最后可以看到,TcpConnection的析构会在handleEvent处理完了之后才会执行.