一. AbstractChannel
1.1 构造方法
/**
* 创建一个新实例。
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 创建
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
/**
* 创建一个新实例。
*/
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
可以看出在构造方法中,就绑定了这个通道的四个成员变量
parent
,id
,unsafe
,pipeline
。
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
/**
* 由子类来实现,创建对应的 Unsafe 类型实例
*/
protected abstract AbstractUnsafe newUnsafe();
id
和pipeline
都是直接创建,默认是DefaultChannelId
和DefaultChannelPipeline
类型。newUnsafe()
是抽样方法,有子类才能创建对应的Unsafe
类型实例。
1.2 ChannelOutboundInvoker
接口方法
Channel
还继承了ChannelOutboundInvoker
接口,也就是说通道是可以发送出站IO
操作的。
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
........
你会发现基本上都是调用 ChannelPipeline
对应的方法。
- 也就是说直接调用通道
Channel
的发送出站IO
事件的方法,和调用管道pipeline()
发送出站IO
事件的方法是一样的。- 根据 DefaultChannelPipeline 的分析,我们知道这些出站
IO
事件最后都会调用到该通道的Unsafe
属性对应方法进行处理。
1.3 抽样方法
AbstractChannel
还有几个需要子类实现抽样方法,由子类提供不同的处理逻辑:
-
AbstractUnsafe newUnsafe()
不同类型的
Channel
有自己特定的Unsafe
类型。 -
boolean isCompatible(EventLoop loop)
判断给定的事件轮询器
EventLoop
和当前的通道类型是不是兼容。每种类型的通道Channel
都有自己特定的事件轮询器。 -
SocketAddress localAddress0()
和SocketAddress remoteAddress0()
通道绑定的本地地址和通道连接的远程地址。
-
void doBind(SocketAddress localAddress)
进行绑定操作,每种类型的通道绑定处理是不一样的。
-
void doDisconnect()
进行连接操作。
-
void doClose()
进行关闭连接操作。
-
void doBeginRead()
将通道设为开始读操作。
-
void doWrite(ChannelOutboundBuffer in)
进行写操作。
AbstractChannel
真正重点的操作都是在 AbstractUnsafe
中实现的啊,下面讲解 AbstractUnsafe
。
二. AbstractUnsafe 类
2.1 成员属性
// 写缓冲区 ChannelOutboundBuffer
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
// 用于在接收数据时分配缓存区 ByteBuf
private RecvByteBufAllocator.Handle recvHandle;
// 当前是否正在刷新数据,防止重复刷新数据
private boolean inFlush0;
// 如果通道从未被注册,则为true,否则为false
private boolean neverRegistered = true;
2.2 注册 register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
// 当前通道已经注册,失败,调用 promise 的setFailure方法进行通知
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
// 这个事件轮询器和当前通道不兼容,失败,调用 promise 的setFailure方法进行通知
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
// 当前线程就是通道 事件轮询器线程,直接调用 register0 方法
register0(promise);
} else {
try {
// 通过 eventLoop.execute 方法,
// 保证 register0 方法在通道事件轮询器线程中调用
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
// 发生异常,要关闭通道,并进行相关通知
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// 检查通道是否仍然打开
// 当注册操作在 eventLoop 线程之外调用的话,
// 有可能这时通道被别的线程关闭了
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 第一次注册
boolean firstRegistration = neverRegistered;
// 调用 AbstractChannel 的 doRegister 方法,进行注册操作
doRegister();
neverRegistered = false;
// 设置 AbstractChannel 的 registered 成员属性,表示已经注册
registered = true;
// 确保在通道未注册前添加到管道上的 ChannelHandler 的 handlerAdded(…) 也会被调用
// 这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件。
pipeline.invokeHandlerAddedIfNeeded();
// 注册成功的通知
safeSetSuccess(promise);
// 发送注册 入站IO事件
pipeline.fireChannelRegistered();
// 只有在通道从未注册的情况下才触发 channelActive 事件。
// 这可以防止在通道被取消注册和重新注册时触发多个通道 channelActive 事件。
if (isActive()) {
if (firstRegistration) {
// 第一次注册时,才会发送 channelActive 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 此通道在注册之前,设置了 autoRead()。
// 这意味着我们需要重新设置开始读取操作,以便介绍入站数据。
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// 发生异常,要关闭通道,并进行相关通知
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
将通道注册到事件轮询器EventLoop
上:
- 如果当前通道已经注册,或者当前通道和事件轮询器不兼容,那么注册失败,调用
promise
的setFailure
方法进行通知。- 保证在事件轮询器线程调用实际注册
register0
方法。- 调用
AbstractChannel
的doRegister
方法,进行注册操作,发送注册事件。- 如果通道已活跃,第一次注册的时候,就会发送
channelActive
事件;- 如果不是,那么就可能设置开始读的操作。
- 如果这期间发生异常,就关闭通道,并进行相关通知。
2.3 取消注册 deregister
@Override
public final void deregister(final ChannelPromise promise) {
assertEventLoop();
deregister(promise, false);
}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
// 如果 promise不是 不可取消的,那么直接返回
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
// 当前通道没有注册,那么也表示取消注册成功,进行成功通知
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
// See:
// https://github.com/netty/netty/issues/4435
// 通过 invokeLater 方法,将 doDeregister() 方法放在下一个事件轮询周期进行
invokeLater(new Runnable() {
@Override
public void run() {
try {
// 调用 AbstractChannel 的 doDeregister 方法,进行取消注册操作
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
// 如果通道之前是注册成功了,
// 这里才发送取消注册的 IO 事件
registered = false;
pipeline.fireChannelUnregistered();
}
// 取消绑定成功通知
safeSetSuccess(promise);
}
}
});
}
重点就是调用
AbstractChannel
的doDeregister
方法,进行取消注册操作。
如果fireChannelInactive == true
,将发送ChannelInactive
事件。
2.4 绑定 bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
// 检查通道是否仍然打开
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 调用 AbstractChannel 的 doBind 方法,进行绑定操作
doBind(localAddress);
} catch (Throwable t) {
// 绑定失败的通知
safeSetFailure(promise, t);
// doBind(localAddress) 方法有可能关闭这个通道,
// 就可能需要进行关闭通道的通知
closeIfClosed();
return;
}
// 如果绑定操作后,通道从不活跃变成活跃,就要发送 ChannelActive 事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// 绑定成功的通知
safeSetSuccess(promise);
}
- 这个方法逻辑比较简单,重点就是调用
AbstractChannel
的doBind
方法,进行绑定操作。- 如果绑定操作后,通道从不活跃变成活跃,就要发送
ChannelActive
事件。
2.5 取消连接 disconnect
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
// 如果 promise不是 不可取消的,那么直接返回
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
// 重置 remoteAddress and localAddress
remoteAddress = null;
localAddress = null;
} catch (Throwable t) {
safeSetFailure(promise, t);
// doDisconnect() 方法有可能关闭这个通道,
// 就可能需要进行关闭通道的通知
closeIfClosed();
return;
}
// 如果取消连接后,通道从活跃变成不活跃,就要发送 ChannelInactive 事件
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
// doDisconnect() 方法有可能关闭这个通道,
// 就可能需要进行关闭通道的通知
closeIfClosed();
}
- 这个方法逻辑比较简单,重点就是调用
AbstractChannel
的doDisconnect()
方法,进行取消连接操作。- 如果取消连接操作成功后,通道从活跃变成不活跃,就要发送
ChannelInactive
事件。
2.6关闭 close
public void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException =
StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
close(promise, closedChannelException, closedChannelException, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
// 如果 promise不是 不可取消的,那么直接返回
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {
// closeInitiated == true,已经调用过关闭操作了,就要return 返回了。
if (closeFuture.isDone()) {
// 已经通道已经关闭了,通知 promise
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// 当前通道正在关闭,那么就添加一个监听器,当关闭成功后,再通知 promise
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
// 保证关闭方法只调用一次,不能重复调用
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 禁止再向写缓冲区 outboundBuffer 添加任何消息和刷新操作。
this.outboundBuffer = null;
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// 使写缓冲区中所有排队的消息失败
outboundBuffer.failFlushed(cause, notify);
// 关闭写缓冲区
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// 使写缓冲区中所有排队的消息失败
outboundBuffer.failFlushed(cause, notify);
// 关闭写缓冲区
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
// 如果正在刷新操作,那么就让 fireChannelInactiveAndDeregister 操作,
// 放到下一个事件轮询周期中处理
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
// 取消注册和可能发送 ChannelInactive 事件
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
deregister(voidPromise(), wasActive && !isActive());
}
方法流程:
- 通过
closeInitiated
成员属性保证关闭方法只调用一次,不能重复调用。 - 因为关闭连接,需要考虑写缓冲区
ChannelOutboundBuffer
中的待写入数据的问题。 - 通过
prepareToClose()
方法,返回一个关闭通道的事件执行器。- 如果不为空,那么就在这个事件执行器中进行接下来的关闭操作。
- 如果为空,那么就在当前线程进行接下来的关闭操作。
- 调用
doClose0(promise)
方法,进行关闭以及操作成功或失败的相关通知。 - 处理写缓冲区
outboundBuffer
中的数据,并关闭写缓冲区。 - 最后调用
fireChannelInactiveAndDeregister
方法,取消管道注册,以及可能会发送ChannelInactive
事件。如果在
doClose()
方法之后,通道从活跃变成不活跃的情况下,才会发送ChannelInactive
事件。
2.7 shutdownOutput
@UnstableApi
public final void shutdownOutput(final ChannelPromise promise) {
assertEventLoop();
shutdownOutput(promise, null);
}
/**
* 关闭相应通道的输出部分。
* 例如,这将清理ChannelOutboundBuffer并不再允许任何写操作。
*/
private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
if (!promise.setUncancellable()) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
promise.setFailure(new ClosedChannelException());
return;
}
// 禁止再向写缓冲区 outboundBuffer 添加任何消息和刷新操作。
this.outboundBuffer = null;
final Throwable shutdownCause = cause == null ?
new ChannelOutputShutdownException("Channel output shutdown") :
new ChannelOutputShutdownException("Channel output shutdown", cause);
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 调用 AbstractChannel 的 doShutdownOutput 方法,
// 进行 shutdown 操作
doShutdownOutput();
// 操作成功通知
promise.setSuccess();
// 操作失败通知
} catch (Throwable err) {
promise.setFailure(err);
} finally {
// Dispatch to the EventLoop
eventLoop().execute(new Runnable() {
@Override
public void run() {
// 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
// 并发送用户通知事件
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
});
}
}
});
} else {
try {
// 调用 AbstractChannel 的 doShutdownOutput 方法,
// 进行 shutdown 操作
doShutdownOutput();
// 操作成功通知
promise.setSuccess();
} catch (Throwable err) {
// 操作失败通知
promise.setFailure(err);
} finally {
// 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
// 并发送用户通知事件
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
}
}
/**
* 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
* 并发送用户通知事件
*/
private void closeOutboundBufferForShutdown(
ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
// 使写缓冲区中所有排队的消息失败
buffer.failFlushed(cause, false);
// 关闭写缓冲区
buffer.close(cause, true);
// 发送一个通道 Shutdown 的用户通知事件
pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
}
shutdown
的方法流程和 close
很像,区别点:
shutdown
是调用AbstractChannel
的doShutdownOutput()
方法进行相关操作,而close
是调用AbstractChannel
的doClose()
方法。close
最后会取消注册,以及可能会发送ChannelInactive
事件。- 而
shutdown
会发送一个ChannelOutputShutdownEvent.INSTANCE
用户自定义的通知事件。
2.8 强制关闭 closeForcibly
@Override
public final void closeForcibly() {
assertEventLoop();
try {
doClose();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
}
你会发现只调用了
AbstractChannel
的doClose()
方法进行关闭操作,不触发任何事件,也不处理写缓冲区。只可能在某些特殊情况下调用,例如尝试注册失败的时候。
2.9 开始读 beginRead
@Override
public final void beginRead() {
assertEventLoop();
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
调用
AbstractChannel
的doBeginRead()
方法设置通道开始读取数据。
2.10 写操作 write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// 写缓冲区为 null,
try {
// 现在释放资源,以防止资源泄漏
ReferenceCountUtil.release(msg);
} finally {
// 如果outboundBuffer为空,我们就知道通道被关闭了,所以立即进行失败通知。
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}
int size;
try {
// 进行消息的转换,例如将堆缓冲区变成直接缓冲区
msg = filterOutboundMessage(msg);
// 估算数据的大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
try {
// 失败时需要释放资源,以防止资源泄漏
ReferenceCountUtil.release(msg);
} finally {
// 进行操作失败的通知
safeSetFailure(promise, t);
}
return;
}
// 将数据添加到写缓冲区 outboundBuffer 中
outboundBuffer.addMessage(msg, size, promise);
}
方法流程
- 先判断写缓冲区
outboundBuffer
是不是为null
,为空说明通道已关闭,进行失败通知。 - 通过
filterOutboundMessage(msg)
方法进行数据转换,例如将堆缓冲区变成直接缓冲区。 - 估算数据大小。
- 通过
outboundBuffer.addMessage(...)
方法,将数据添加到写缓冲区outboundBuffer
中。 - 如果发送异常,记得释放数据
msg
的引用,防止内存泄露,并进行操作失败通知。
2.11 刷新 flush
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 写缓冲区为空,直接返回
if (outboundBuffer == null) {
return;
}
// 将写缓冲区中的消息都标记成待刷新
outboundBuffer.addFlush();
// 进行刷新操作
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// 避免重复刷新
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 当前写缓冲区没有数据,那么直接返回
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
// 避免重复刷新
inFlush0 = true;
// 如果通道处于非活动状态,则将所有挂起的写请求标记为失败。
if (!isActive()) {
try {
// Check if we need to generate the exception at all.
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
// 刷新操作完成,将 inFlush0重新设置为 false,以便下次刷新。
inFlush0 = false;
}
return;
}
try {
// 将给定缓冲区的内容刷新到远端
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
// 刷新操作完成,将 inFlush0重新设置为 false,以便下次刷新。
inFlush0 = false;
}
}
- 通过
inFlush0
成员属性,来避免重复刷新。- 如果通道处于非活动状态,则将所有挂起的写请求标记为失败。
- 通过
AbstractChannel
的doWrite(outboundBuffer)
方法,将缓冲区的内容刷新到远端。
2.12 小结
对比 Unsafe
的方法,你会发现 AbstractUnsafe
中没有实现 connect(...)
连接方法。
对比发送入站IO
事件:
-
ChannelRegistered
和ChannelUnregistered
-
register
方法会发送ChannelRegistered
事件。 -
deregister
方法只有在通道之前已经注册之后,才会发送ChannelUnregistered
事件。
-
-
ChannelActive
和ChannelInactive
- 一般都是通道
Channel
从不活跃变成活跃,要发送ChannelActive
事件;可能引起这个变化的操作有bind
和connect
操作。 - 通道
Channel
从活跃变成不活跃,就要发送ChannelInactive
事件;可能引起这个变化的操作有disconnect
,close
和shutdown
。 - 最后如果第一次注册时,且当前通道是活跃状态,也会发送
ChannelActive
事件。
- 一般都是通道
三. ChannelOutboundBuffer
在 AbstractChannel.Unsafe
中看到用户调用write(...)
方法写的数据,会先添加到写缓冲区 ChannelOutboundBuffer
中,然后调用 flush()
方法,才将写缓冲区中的数据发送到远端。
3.1 重要成员属性
// 在链表结构中第一个被刷新的节点
private Entry flushedEntry;
// 在链表结构中第一个未刷新的节点
private Entry unflushedEntry;
// 表示链表中最后一个节点
private Entry tailEntry;
// 等待刷新节点的数量
private int flushed;
写缓冲区通过链表来储存数据(依靠 Entry.next
来实现链表),链表形式 Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
flushedEntry
表示第一个被刷新的节点,在链表头,当然也是通过addFlush()
方法设置的。unflushedEntry
表示第一个未刷新的节点,表示还没有被标记刷新的第一个节点。tailEntry
最后一个节点。flushed
刷新节点的数量,这个属性很重要,靠它来标记刷新节点,也就是说从flushedEntry
开始,flushed
数量的节点都被标记为刷新节点了。
3.2 重要方法
3.2.1 添加数据
这个方法一般在 AbstractChannel.AbstractUnsafe
的 write(...)
方法中调用。
/**
* 将给定的消息 msg 添加到ChannelOutboundBuffer中。
* 一旦消息写入,给定的ChannelPromise将被通知。
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 将给定消息封装成一个节点
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
// 将新消息节点添加到队列尾
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
// 如果未刷新节点为空,说明队列节点都变成刷新节点了,
// 那么这个新添加的节点,就是未刷新节点的头了。
unflushedEntry = entry;
}
// See https://github.com/netty/netty/issues/1619
// 向未刷新的数组添加消息后,增加挂起的字节数。
incrementPendingOutboundBytes(entry.pendingSize, false);
}
- 先将数据
msg
封装成一个节点entry
,并将节点添加到链表尾。- 如果
unflushedEntry
是null
,那么这个节点就是第一个未刷新节点。incrementPendingOutboundBytes(...)
方法,增加挂起的字节数,看是否需要改变通道的 可写属性。
3.2.2 标记刷新
这个方法一般在 AbstractChannel.AbstractUnsafe
的 flush()
方法中调用。
/**
* 向此ChannelOutboundBuffer添加刷新。
* 这意味着所有以前添加的消息都被标记为刷新,因此您将能够处理它们。
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
// 未刷新节点后面的链表示新添加的节点列表,都是要加入到刷新中
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
// 将所有要刷新的节点变成不可取消的
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
// 挂起消息被取消,所以确保我们释放内存并通知释放的字节
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// 节点都变成已刷新的了,未刷新节点就设置为 null
unflushedEntry = null;
}
}
- 将从
unflushedEntry
未刷新节点开始到链表尾的所有节点都标记为刷新。通过flushed++
来增加刷新节点数量。- 调用
setUncancellable(...)
要写入的节点是不可取消的,如果设置失败,就要取消挂起数据,并调用decrementPendingOutboundBytes(...)
减少挂起字节数,看是否需要改变通道的 可写属性。
3.2.3 删除节点
/**
* 将删除当前消息,将其ChannelPromise标记为success并返回true。
* 如果在调用此方法时不存在刷新的消息,则返回false,表示没有准备好处理的消息。
*/
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
e.recycle();
return true;
}
private void removeEntry(Entry e) {
if (-- flushed == 0) {
// flushed == 0, 表示所有刷新节点都被处理了
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
// 将下一个节点变成刷新节点
flushedEntry = e.next;
}
}
当缓存区当前刷新节点数据被写入到远端了,那么调用这个
remove()
方法,移除当前节点,得到下一个刷新节点。