在netty开发中,当调用pipeline的write方法时,并不会将数据直接写入到底层channel通道发送出去,而是先添加到缓冲区中;只有当调用flush方法,才会真正将数据从缓冲区写入到channel并发送出去。netty还提供了一个简便的方法,结合两者的功能writeAndFlush。
在之前的文章说过,应用程序开发中,主动调用IO操作,比如write、bind等等,触发的IO操作在pipeline上会从尾部的出站handler传播到头部出站handler,中间可能会经过各种编码器等等,但最终都会经过netty内置在pipeline上的HeadContext,而HeadContext的IO操作方法又是委托给内部的unsafe。
因此本文就来讲述下unsafe的write和flush方法的处理逻辑。
write源码
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise, t);
}
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
write方法主要有两个步骤
- 会先对msg进行校验,校验msg是不是ByteBuf或者FileRegion类型的,非这两种类型直接抛出异常
- 将消息添加到缓冲区outboundBuffer中
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
其实,netty底层维护的是一个链表,链表的每个元素是一个Entry对象,而待发送的消息msg和promise就是维护在entry对象中的。
将消息添加到链表后,还会将此条消息占用的字节数(实际发送的+entry本身的开销)维护到channel的一个全局变量totalPendingSize中,表示待发送的总字节数。并且与高水位值比较,若比较大,那么会触发handler的unwritable方法。这个后续再用专门的章节来说明。
至此,write方法主要是将消息封装成entry对象,然后添加到channel对应的outboundBuffer维护的链表当中,并且会判断待发送的总字节数是否超过高水位值,若超出了,则触发handler的unwritable方法
接下来,来看下flush方法处理逻辑,调用这方法会真正的将数据从buffer写入到channel通道去
flush源码
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
addFlushed()方法主要是将outboundBuffer底层的链表的首指针flushedEntry指向链表第一个元素,然后unflushedEntry置为null,再算下待发送的entry数量。
flush0()会先判断channel注册到的selector有没正在监控write事件,没有的话才会去处理数据,有的话,在NioEventLoop的事件循环中会处理,这里就无需处理了
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
下面为真正的处理数据发送逻辑
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
//此次写操作,调用channel的write方法次数上限,默认值为16
int writeSpinCount = config().getWriteSpinCount();
do {
//若输出缓冲区是空的,也就是底层entry链表没有元素了,一般是已经写完数据了,那么取消selector的OP_WRITE事件,然后直接return
if (in.isEmpty()) {
clearOpWrite();
return;
}
// 每次写请求,gathering的最大字节数
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
//将outboundBuffer的entry链表的消息转成nio的ByteBuffer数组
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
//ByteBuffer的数量
int nioBufferCnt = in.nioBufferCount();
//将ByteBuffer写入到channel,分为三种情况处理
switch (nioBufferCnt) {
case 0:
//为0的情况正常不会出现
writeSpinCount -= doWrite0(in);
break;
case 1: {
//只有1个ByteBuffer,通常是write后便直接flush的,只有一个buffer的情况,直接用普通的
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// 有多个ByteBuffer,一般是调用多次write方法后,再调flush方法的
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
//若这此次的循环中未将所有数据写入到channel中,那么注册selector的OP_WRITE事件,再下次的事件循环中处理
incompleteWrite(writeSpinCount < 0);
}
上述switch块内有几个重要点
- 不同nioBufferCnt数量的不同处理逻辑,主要有当为1时,直接用底层channel的write方法,写入单个ByteBuffer;当于1,一次性写入多个ByteBuffer,使用的是gathering技术
- 当调用write方法返回后,如实际写入的字节数<=0,可能是此时channel的socket缓冲区已经满了,不允许写入数据了,所以需要向selector注册OP_WRITE事件,待下次事件循环中再处理。
- 会根据试图写入的数据和实际写入的数据动态调整maxBytesPerGatheringWrite
- 已写入的数据需要从缓冲区移除,不然下次循环会重复写入数据
public void removeBytes(long writtenBytes) {
for (;;) {
//获取当前flushedEntry对象
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
//比较当前entry对应的消息字节数和已写入字节数,
//当写入的较大或者相等时,表示这个entry对象消息已经全部写入到channel了
//因此,会调用remove方法(这方法主要作用是将entry对象从链表移除,且通知ChannelPromise的监听器)
//当写入的字节数<这个entry对象的消息字节数时,说明只写入了一部分,那么只需要移动readerIndex的索引值
..
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
//将ByteBuffer数组的所有ByteBuffer值置为null
clearNioBuffers();
}
在上面这个方法内部的remove方法,主要逻辑是将entry对象从链表移除,并且通知promise的监听器
public boolean remove() {
Entry e = flushedEntry;
//当前链表为空了,那么将线程的ByteBuffer数组的所有元素都置为null
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
//当前entry对象从链表移除
removeEntry(e);
if (!e.cancelled) {
// 每个entry对象维护的msg是ByteBuf类型的,因为已经写入到channel了,所以将引用计数-1
ReferenceCountUtil.safeRelease(msg);
//promise结果值置为成功,并通知promise的监听器
safeSuccess(promise);
//扣掉缓冲区待发送的字节数,并且判断是否小于低水位值了,若小于,则触发channelWritabilityChanged方法
decrementPendingOutboundBytes(size, false, true);
}
//回收entry对象
e.recycle();
return true;
}