一. NioUnsafe 接口
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
* 返回底层的NIO通道
*/
SelectableChannel ch();
/**
* 完成连接;
* 在 NioEventLoop 的 `processSelectedKey` 方法中调用,
* 当底层的NIO通道接收到连接事件 OP_CONNECT 时调用
*/
void finishConnect();
/**
* NIO的 SelectableChannel通道中获取到远端的数据
* 在 NioEventLoop 的 `processSelectedKey` 方法中调用,
* 当底层的NIO通道接收到读取事件 OP_READ 时调用。
*/
void read();
/**
* 强制刷新;
* 在 NioEventLoop 的 `processSelectedKey` 方法中调用,
* 当底层的NIO通道接收到可写事件 OP_WRITE 时调用。
*/
void forceFlush();
}
NioUnsafe
接口比 Unsafe
多了四个方法:
-
SelectableChannel ch()
返回底层的NIO通道 - 剩下三个方法都与
NioEventLoop
类中接收到的NIO
通道事件有关:-
finishConnect()
接收到连接事件OP_CONNECT
时调用。 -
read()
接收到可读事件OP_READ
时调用。 -
forceFlush()
收到可写事件OP_WRITE
时调用。
-
二. AbstractNioUnsafe 类
2.1 实现 NioUnsafe
接口中三个方法
2.1.1 ch()
// 在 AbstractNioChannel 中的方法
protected SelectableChannel javaChannel() {
return ch;
}
@Override
public final SelectableChannel ch() {
return javaChannel();
}
2.1.2 finishConnect()
@Override
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
// 注意,只有在连接尝试既没有被取消也没有超时时,事件循环才会调用此方法。
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
// 调用 AbstractNioChannel 的 doFinishConnect 方法进行完成连接操作
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// See https://github.com/netty/netty/issues/1770
// 检查是否为null,因为connectTimeoutFuture仅在超时时间 connectTimeoutMillis > 0时才创建
// 连接已经完成,取消连接超时任务
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
- NIO通道接收到连接事件
OP_CONNECT
,表示已经成功建立连接了。- 调用
AbstractNioChannel
的doFinishConnect
方法进行完成连接操作。- 调用
fulfillConnectPromise
方法,完成ChannelPromise
的通知,以及是否发送ChannelActive
事件和ChannelInactive
事件。- 最后因为连接已经完成,就需要取消连接超时任务。
2.1.3 forceFlush()
@Override
protected final void flush0() {
//只有当没有挂起的刷新时才立即刷新。
//如果有一个挂起的刷新操作,事件循环将在稍后调用forceFlush(),因此不需要现在调用它。
if (!isFlushPending()) {
super.flush0();
}
}
@Override
public final void forceFlush() {
// 直接调用super.flush0(),强制立即刷新
super.flush0();
}
/**
* 返回是否准备刷新
*/
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
// 选择键有效,且有可读事件 OP_WRITE时,返回 true
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
调用父类
flush0()
方法进行刷新。
还会复写父类的flush0()
的方法,只有当没有挂起的刷新时才立即刷新。
2.1.4 read()
这个是 AbstractNioUnsafe
中唯一没有实现NioUnsafe
的方法,它在 AbstractNioByteChannel
和 AbstractNioMessageChannel
中提供不同的实现,等后面再说。
2.2 实现AbstractUnsafe
中连接方法
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// 检查通道是否仍然打开
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
// 当 connectPromise 不为空,说明已经有人尝试连接了
// 防止重复尝试连接
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
// 调用 AbstractNioChannel 的 doConnect 方法进行连接
if (doConnect(remoteAddress, localAddress)) {
// 完成 ChannelPromise 的通知,
// 以及是否发送 ChannelActive 事件和 ChannelInactive 事件
fulfillConnectPromise(promise, wasActive);
} else {
/**
* 因为连接操作是一个异步操作,
* 是否连接成功,是由底层 NIO通道接收到连接事件 OP_CONNECT 为准的,
* 所以这里要设置一个超时任务,当规定时间内,还没有连接成功,
* 那么就要关闭通道和相关的通知操作。
*
* 还要考虑用户主动取消这次连接请求。
*/
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
// 设置一个超时任务,规定时间内,它没有被取消,就会 close(voidPromise()) 关闭通道
// 在 finishConnect() 和 doClose() 方法中,会取消这个超时任务
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
if (connectPromise != null && !connectPromise.isDone()
&& connectPromise.tryFailure(new ConnectTimeoutException(
"connection timed out: " + remoteAddress))) {
// 连接超时,关闭通道
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 用户主动取消这次连接请求, 要取消连接超时任务,以及关闭通道
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
/**
* 完成 ChannelPromise 的通知,以及是否发送 ChannelActive 事件和 ChannelInactive 事件
*/
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
return;
}
boolean active = isActive();
// 尝试设置 promise 为成功完成,
// 如果设置失败,即返回 false,表示用户取消了这次连接请求
boolean promiseSet = promise.trySuccess();
// 无论用户是否取消了这次连接请求,
// 都判断是否发送 ChannelActive 事件
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// 如果用户取消了这次连接请求,
// 则关闭通道,然后可能会发送 ChannelInactive 事件
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// 使用 tryFailure() 而不是 setFailure(),
// 来避免与cancel()的竞争。
promise.tryFailure(cause);
closeIfClosed();
}
方法流程:
- 调用
AbstractNioChannel
的doConnect
方法进行连接。如果返回
true
,表示一直阻塞等待连接成功。
如果返回false
,表示是一个非阻塞连接,需要等待底层NIO
通道接收到连接事件OP_CONNECT
,才代表连接成功。 - 阻塞连接成功
那么就调用
fulfillConnectPromise(...)
方法,完成ChannelPromise
的通知,以及是否发送ChannelActive
事件和ChannelInactive
事件。 - 非阻塞连接
- 需要创建一个超时任务,当规定时间内,还没有连接成功,那么就要关闭通道和相关的通知操作。
- 再考虑用户主动取消这次连接请求时,要取消连接超时任务,以及关闭通道。
三. AbstractNioChannel 中实现的方法
3.1 EventLoop
的兼容性
@Override
protected boolean isCompatible(EventLoop loop) {
// 与当前通道兼容的事件轮询器必须是 NioEventLoop 的子类
return loop instanceof NioEventLoop;
}
与
AbstractNioChannel
匹配的事件轮询器必须是NioEventLoop
的子类。
3.2 注册
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 通过NIO SelectableChannel 的register方法,
// 将NIO通道注册到事件轮询器的 Selector 上,
// 这样就可以监听NIO通道的 IO事件,包括接收,连接,可读,可写。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
通过
NIO SelectableChannel
的register
方法,将NIO
通道注册到事件轮询器的Selector
上。
这样就可以监听NIO
通道的IO
事件,包括接收,连接,可读,可写。
3.3 取消注册
@Override
protected void doDeregister() throws Exception {
// 将通道从已注册的事件轮询器中取消
eventLoop().cancel(selectionKey());
}
将通道从已注册的事件轮询器中取消。
3.4 开始读
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() 方法调用,
// 会调用到这里
final SelectionKey selectionKey = this.selectionKey;
// 当前选择键是否有效
if (!selectionKey.isValid()) {
return;
}
// 设置当前通道是 可读状态
readPending = true;
final int interestOps = selectionKey.interestOps();
/**
* 设置底层NIO通道读事件 OP_READ 或 OP_ACCEPT
* 与 AbstractNioUnsafe 的 removeReadOp() 方法正好相反。
*/
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
就是设置底层
NIO
通道读事件。
3.5 关闭
@Override
protected void doClose() throws Exception {
ChannelPromise promise = connectPromise;
if (promise != null) {
// 使用tryFailure()而不是setFailure() 方法,
// 来避免与取消 cancel()的竞争。
promise.tryFailure(new ClosedChannelException());
connectPromise = null;
}
// 关闭操作时,需要取消连接超时任务
Future<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
}
注意这个方法,没有调用底层
NIO
通道的关闭close
方法;也就是说子类一般都需要复写它。
3.6 小结
AbstractNioChannel
没有实现写操作相关的方法,以及连接操作相关方法 doConnect(...)
和 doFinishConnect()
。
四. 读数据操作
在 AbstractNioByteChannel
和 AbstractNioMessageChannel
类中,实现了两种方式的读数据操作。
4.1 AbstractNioByteChannel
中读数据
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 通过 allocHandle,在接收数据时分配缓存区 ByteBuf
byteBuf = allocHandle.allocate(allocator);
// 通过 doReadBytes(byteBuf) 方法,从底层 NIO 通道中读取数据到 ByteBuf 中,
// 并返回读取数据的大小;
// 通过 lastBytesRead 方法记录上次读操作已读取的字节。
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
// 没有可读数据了;释放缓冲区。
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
// 增加当前读循环中已读消息的数量
allocHandle.incMessagesRead(1);
readPending = false;
// 通过管道 pipeline 发送 ChannelRead 读取事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 通过allocHandle.continueReading()方法,
// 判断是否需要继续读取。
} while (allocHandle.continueReading());
// 这次读取已完成
allocHandle.readComplete();
// 通过管道 pipeline 发送 ChannelReadComplete 读取完成事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
- 通过
doReadBytes(byteBuf)
方法,从底层NIO
通道中读取数据到输入缓冲区ByteBuf
中。- 通过
pipeline.fireChannelRead(...)
方法,发送ChannelRead
读取事件。- 通过
allocHandle.continueReading()
判断是否需要继续读取。- 这次读取完成,调用
pipeline.fireChannelReadComplete()
方法,发送ChannelReadComplete
读取完成事件。
4.2 AbstractNioMessageChannel
中读数据
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 将消息读入给定数组并返回所读入的数量
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
// 小于 0,表示已经关闭
closed = true;
break;
}
// 增加当前读循环中已读消息的数量
allocHandle.incMessagesRead(localRead);
// 判断是否需要继续读取
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
// 遍历读取消息的数组readBuf
for (int i = 0; i < size; i ++) {
readPending = false;
// 通过管道 pipeline 发送 ChannelRead 读取事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// 这次读取已完成
allocHandle.readComplete();
// 通过管道 pipeline 发送 ChannelReadComplete 读取完成事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
- 使用
readBuf
数组,一次读取操作所有的数据对象。- 通过
doReadMessages(readBuf)
方法,将消息读入给定数组readBuf
,并返回所读入的数量localRead
。- 通过
localRead
的值,判断是否读取完成,或者通道已经关闭。- 通过
continueReading(allocHandle)
方法,判断是否需要继续读取。- 遍历读取消息的数组
readBuf
, 通过管道pipeline
发送ChannelRead
读取事件;遍历完成,通过管道pipeline
发送ChannelReadComplete
读取完成事件。