io.netty.channel.Channel是Netty网络操作抽象类,它聚合了一组功能,包括但不限于网路的读,写,客户端发起连接,主动关闭连接,链路关闭,获取通信双方的网络地址等。它也包含了Netty框架相关的一些功能,包括获取该Channel的EventLoop,获取缓冲分配器ByteBufAllocator和pipeline等。
Unsafe是个内部接口,聚合在Channel中协助进行网络读写相关的操作,因为它的设计初衷就是Channel的内部辅助类,不应该被Netty框架的上层使用者调用,所以被命名为Unsafe。这里不能仅从字面理解认为它是不安全的操作,而要从这个架构的设计层面体会它的设计初衷和职责。
1、Channel功能说明
1.1、netty自定义Channel的原因
- JDK的SocketChannel和ServerSocketChannel没有统一的Channel接口供业务开发者使用,对于用户而言,没有统一的操作视图,使用起来不方便。
- JDK的SocketChannel和ServerSocketChannel的主要职责就是网络I/O操作,由于它们是SPI类接口,由具体的虚拟机厂家来提供,所以通过继承SPI功能类来扩展其功能的难度很大;直接实现ServerSocketChannel和SocketChannel抽象类,其工作量和重新开发一个新的Channel功能类是差不多的。
- Netty的Channel需要能够跟Netty的整体架构融合在一起,例如I/O模型,基于ChannelPipeline的定制模型,以及基于元数据描述配置化的TCP参数等,这些JDK的SocketChannel和ServerSocketChanel都没有提供,需要重新封装。
- 自定义的Channel,功能实现更加灵活。
1.2、netty自定义Channel的设计理念
- 在Channel接口层,采用Facade模式进行统一封装,将网络I/O操作,网络I/O相关的其他操作封装起来,统一对外提供。
- Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度地实现功能和接口的重用。
- 具体实现采用聚合而非包含的方式,将相关的功能类聚合在Channel中,由Channel统一分配和调度,功能实现更加灵活。
1.3、Channel主要API
接口名 | 描述 |
---|---|
EventLoop eventLoop() | Channel需要注册到EventLoop的多路复用器上,用于处理I/O事件,通过eventLoop()方法可以获取到Channel注册的EventLoop。EventLoop本质上就是处理网络读写事件的Reactor线程。在Netty中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义NioTask等任务。 |
ChannelPipeline pipeline() | 返回channel分配的ChannelPipeline |
boolean isActive() | 判断channel是否激活。激活的意义取决于底层的传输类型。例如,一个Socket传输一旦连接到了远程节点便是活动的,而一个Datagram传输一旦被打开便是活动的 |
boolean isOpen() | 判断Channel是否已经打开 |
boolean isRegistered() | 判断Channel是否已经在对应的EventLoop中注册 |
SocketAddress localAddress() | 返回本地的socket地址 |
SocketAddress remoteAddress() | 返回远程的socket地址 |
Channel flush() | 将之前已写的数据冲刷到底层Channel上去 |
boolean isWritable() | 当且仅当I/O线程可以立即处理写请求时,返回true;当本方法返回false时,任何写操作将进行入队,直到i/o线程准备好处理队列中的写请求 |
ChannelMetadata metadata() | 熟悉TCP协议的读者可能知道,当创建Socket的时候需要指定TCP参数,例如接收和发送的TCP缓冲区大小,TCP的超时时间。是否重用地址等。在Netty中,每个Channel对应一个物理链接,每个连接都有自己的TCP参数配置。所以,Channel会聚合一个ChannelMetadata用来对TCP参数提供元数据描述信息,通过metadata()方法就可以获取当前Channel的TCP参数配置。 |
Channel read() | 从Channel中读取数据到第一个inBound缓冲区,当读取完毕,触发Handler的channelRead()事件,同时触发Handler的channelReadComplete()事件,以让Handler决定是否继续进行数据读取。如果有正在读取的操作,则此方法不做任何操作。 |
ChannelFuture closeFuture() | 当Channel关闭时,通知对应的ChannelFuture。此方法总是返回同一个实例 |
Unsafe unsafe() | 提供一个内部使用的类,此类实现了Unsafae相关接口 |
Channel parent() | 对于服务端Channel而言,它的父Channel为空;对于客户端Channel,它的父Channel就是创建它的ServerSocketChannel。 |
ChannelId id() | 返回ChannelId对象,ChannelId是Channel的唯一标识。 |
ChannelConfig config() | 获取当前Channel的配置信息,例如CONNECT_TIMEOUT_MILLS。 |
long bytesBeforeUnwritable() | isWritable()返回false时,其返回可写的字节数;否则返回0 |
long bytesBeforeWritable() | isWritable()返回true时,其返回底层缓存未写的的字节数;否则返回0 |
ByteBufAllocator alloc() | 返回内存分配器 |
2、Channel类继承图
Channel类继承图如下:
从类继承图可以看出:
(1)Channel是所有通用类的基础接口,ServerChannel是所有服务端Channel的通用接口;
(2)AbstractChannel为Channel的基础抽象类,其对Channel的一些通用功能做了简单实现;
(3)从AbstractChannel继承出基于不同协议及I/O类型的Channel实现类;
- NioSocketChannel:异步I/O的客户端 TCP Socket 实现
- NioServerSocketChannel:异步I/O的服务端 TCP Socket 实现
- NioDatagramChannel:异步I/O的 UDP Socket 实现
- NioSctpChannel:异步I/O的客户端 Sctp Socket 实现
- NioSctpServerChannel:异步I/O的服务端 Sctp Socket 实现
- EpollSocketChannel:基于linux的Epoll实现的事件驱动的客户端TCP Socket实现;
- EpollServerSocketChannel:基于linux的Epoll实现事件驱动的服务端TCP Socket实现;
- OioSocketChannel:同步I/O的客户端 TCP Socket 实现
- OioServerSocketChannel:同步I/O的服务端 TCP Socket 实现
- OioDatagramChannel:同步I/O的 UDP Socket 实现
- OioSctpChannel:同步I/O的客户端 Sctp Socket 实现
- OioSctpServerChannel:同步I/O的服务端 Sctp Socket 实现
3、AbstractChannel源码分析
AbstractChannel 是Channel的部分实现,维护了一个通道相关的资源,如channel id, pipeline等;而且实现了对该套接字的IO操作,以及设置interestOps;这里还没有牵扯到底层的细节,只是这个框架的结构。
3.1、成员变量
private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
- FLUSH0_CLOSED_CHANNEL_EXCEPTION:当Channel已关闭时调用AbstractUnsafe的flush0(),设置此异常;
- ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION :当Channel已关闭时调用AbstractUnsafe的ensureOpen(),设置此异常;
- CLOSE_CLOSED_CHANNEL_EXCEPTION :当Channel已关闭时调用AbstractUnsafe的close(),设置此异常;
- WRITE_CLOSED_CHANNEL_EXCEPTION:当Channel已关闭时调用AbstractUnsafe的write(),设置此异常;
- FLUSH0_NOT_YET_CONNECTED_EXCEPTION :当Channel未连接时调用AbstractUnsafe的flush0(),设置此异常;
- parent:父Channel;
- id:Channel对应的全局唯一ID;
- unsafe:Unsafe实例;
- pipeline:当前Channel对应的DefaultChannelPipeline;
- unsafeVoidPromise :异常通知,默认不使用。当我们不需要异常在pipeline中传播时,无需设置;
- closeFuture:Channel关闭通知;
- localAddress:本地地址;
- remoteAddress:远程地址;
- eventLoop:当前Channel注册的EventLoop;
- registered:是否已经注册到EventLoop;
- closeInitiated:关闭时的基础参数是否已设置;
- strValActive:是否已经缓存Channel变成active之后的toString()值;
- strVal:Channel的toString()值的缓存;
3.2、构造函数
Channel构造函数如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
- id:通过newId()接口实现,默认实现方式为DefaultChannelId的newInstance()实现,DefaultChannelId的ID生成规则为:机器ID+处理器ID+序列号+时间戳+随机码;newId()为protected类型接口,可由子类重载;
- unsafe:通过newUnsafe()接口实现,此方法为抽象方法,具体实现由子类实现;
- pipeline:通过newChannelPipeline()接口实现,默认实现类为DefaultChannelPipeline,其设定了pipeline的头处理器(head)和尾处理器(tail)等基本信息;newChannelPipeline()接口未protected类型,可由子类重载;
3.3、核心方法
AbstractChannel的接口实现都比较简单,其具体实现基本都交由pipeline和unsafe进行处理。AbstractUnsafe是AbstractChannel的对应Unsafe接口实现。此处对AbstractUnsafe的核心接口实现进行分析。
3.3.1、register事件框架
register()实现源码:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
此处对一些基本信息进行检查,如是否已经注册,EventLoop是否兼容等;同时判断当前线程是否与EventLoop在同一线程中,如果是则进行注册(register0()),否则将以任务方式将注册事件放入EventLoop的执行队列中,以防止多线程多线程并发情况。具体注册处理交由register0()。
register0()实现源码:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
注册流程如下:
- 设置不可取消表示,并确保Channel已经打开;
- 具体注册细节由子类的doRegister()实现;
- 确保用户的handler已经添加到pipeline中;
- 异步设置注册成功通知,并调用fireChannelRegistered()方法异步通知register事件;
- 对于服务端接受的客户端连接,如果首次注册,触发Channel的Active事件,如果已设置autoRead,则调用beginRead()开始读取数据。
beginRead()实现源码:
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
对EventLoop及Channel是否为active进行检查,调用doBeginRead()模板方法执行具体的处理;若处理异常,则异步调用fireExceptionCaught()方法,进行异常通知。
3.3.2、bind事件框架
bind()实现源码:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
处理流程:
- 对基础的EventLoop、Channel是否打开等进行检查;
- 广播检测、地址检查、平台相关检查、权限检查等;
- 调用doBind()进行实际的绑定,具体有由子类实现;
- 如果是首次Active,则异步进行fireChannelActive()通知;
3.3.3、disconnect事件框架
disconnect()实现源码:
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
处理流程:
- EventLoop检查,设置不可取消等;
- 调用doDisconnect()进行实际的断开连接处理;
- Channel从非Inactive变为Active时,异步调用fireChannelInactive()进行Inactive通知;
3.3.4、close事件框架
close()实现源码:
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
处理流程:
- 判断关闭的初始化信息是否设置,若未设置,当Channel已经关闭时,设置关闭成功通知;否则添加异步关闭完成监听器;
- 获取输出缓冲区ChannelOutboundBuffer及关闭执行线程;
- 若执行线程不为空,则在执行线程中添加任务,进行实际的关闭处理,并处理输出缓冲区的冲刷数据失败处理及fireChannelInactiveAndDeregister()事件处理;
- 若执行线程为空,则直接在本线程中执行关闭Channel的处理,并处理输出缓冲区的冲刷数据失败处理;
- 当inFlush0为true,即当前输出缓冲区正在冲刷数据,则异步进行fireChannelInactiveAndDeregister()事件通知,否则直接进行通知;
doColse0()实现源码:
private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
doClose()由具体子类实现。
3.3.5、deregister事件框架
deregister()实现源码:
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);
}
}
});
}
处理流程:
- 如果还未注册,则直接设置成功通知;否则异步进行注销处理;
- 异步调用doDeregister()进行注销处理,进行inactive事件通知处理,进行unregister的事件通知处理;
3.3.6、write事件框架
write()实现源码:
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
如果outboundBuffer为空,表示Channel正在关闭,则不进行写处理,直接设置写失败,并释放msg;filterOutboundMessage()为消息过滤器,由子类实现;outboundBuffer.addMessage()将消息添加到输出缓冲区中;
3.3.7、flush事件框架
flush()实现源码:
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
对输出缓冲区器进行检查,为空则不进行处理;冲刷unflushedEntry中的数据;冲刷输出缓冲区的数据;
flush0()实现源码:
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
处理流程:
- 若正在冲刷数据,则不进行处理;
- 若输出缓冲区为空,则不进行处理;
- 设置inFlush0为true,表示正在冲刷数据;
- 若Channel为Inactive状态,则设置冲刷数据失败;
- 执行doWrite()进行实际的写数据,若写异常,设置相应的写失败;
- 最后设置inFlush0为false,表示完成数据冲刷;
4、AbstractNioChannel源码分析
AbstractNioChannel类继承图:
4.1、成员变量
private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
DO_CLOSE_CLOSED_CHANNEL_EXCEPTION:调用doClose()时错误的异常;
ch:NioSocketChannel和ServerSocketChannel的公共父类,用于设置参数和进行I/O操作;
readInterestOp:Read事件,服务端OP_ACCEPT,其他OP_READ
selectionKey:Channel注册到EventLoop后返回的选择键;
readPending:底层读事件标记
clearReadPendingRunnable:清除底层读事件标记任务
connectPromise:链接的异步结果
connectTimeoutFuture:连接超时检测任务异步结果
requestedRemoteAddress:连接的远端地址
4.2、核心方法
AbstractNioChannel的方法实现很简单,其主要是通过NioUnsafe接口进行实现的,而AbstractNioUnsafe为NioUnsafe的具体实现类,其接口继承图如下:
故只需要分析AbstractNioUnsafe的核心方法即可。
4.2.1、connect处理
connect()实现源码:
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
处理流程:
- 设置不可取消标志,确保Channel已经打开,检查连接异步监听是否已存在等;
- 调用doConnect()进行实际连接处理,此方法为抽象方法,由子类实现;
- 如果连接成功,则设置连接成功的异步通知;
- 如果设置连接超时时间,则添加超时任务,进行连接超时的异步通知;
- 添加异步连接通知的监听器,若异步结果被取消,则取消连接超时任务,清除连接的异步结果;
fulfileConnectPromise()实现源码:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
fulfileConnectPromise()主要设置异步结果为成功,并出发Channel的Active事件。
finishConnect()实现源码:
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
finishConnect()只由EventLoop处理就绪selectionKey的OP_CONNECT事件时调用,从而完成连接操作。注意:连接操作被取消或者超时不会使该方法被调用。
4.2.2、doRegister处理
doRegister()实现源码:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
对于Register事件,将Channel注册到给定NioEventLoop的selector上即可。注意,其中第二个参数0表示注册时不关心任何事件,第三个参数为Netty的NioChannel对象本身。如果抛出CancelledKeyException 表示当前的Channel对应的SelectionKey已经被取消,此时立即对Selector进行select操作,其原因如下:
当select( )中的任意一种被调用时,如下步骤将被执行:
(1)已取消的键的集合将会被检查。如果它是非空的,每个已取消的键的集合中的键将从另外两个集合中移除,并且相关的通道将被注销。这个步骤结束后,已取消的键的集合将是空的。
(2)已注册的键的集合中的键的interest集合将被检查。在这个步骤中的检查执行过后,对interest集合的改动不会影响剩余的检查过程。
对于Deregister事件,选择键执行cancle()操作,选择键表示JDK Channel和selctor的关系,调用cancle()终结这种关系,从而实现从NioEventLoop中Deregister。需要注意的是:cancle操作调用后,注册关系不会立即生效,而会将cancle的key移入selector的一个取消键集合,当下次调用select相关方法或一个正在进行的select调用结束时,会从取消键集合中移除该选择键,此时注销才真正完成。一个Cancle的选择键为无效键,调用它相关的方法会抛出CancelledKeyException。
4.2.3、doRead处理
doRead()实现源码:
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
doRead()操作只是简单将read事件添加到SelectionKey中。
5、AbstractNioByteChannel源码分析
AbstractNioByteChannel类继承图如下:
其对应的Unsafe类继承图如下:
AbstractNioByteChannel的实现很简单,本处主要分析其对应的NioByteUnsafe类。其是 NioSocketChannel 的父类,只有一个成员变量 flushTask,负责写半包消息。
5.1、核心方法
5.1.1、read处理
read()实现源码:
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
处理流程:
- 检查输入流是否已关闭或远端已经关闭,是则清除读处理;
- 调用doReadBytes()抽象方法读取数据;
- 若数据长度小于等于0,表示无数据可读,释放缓冲区;若读取的数据长度小于0表读出错,设置关闭相关标志;
- 数据读取完成后,pipeline进行read事件通知;
- 若需要关闭Channel,则调用closeOnRead()进行关闭处理;
- 若读处理异常,则调用handleReadException()进行读异常处理;
closeOnRead()实现源码:
private void closeOnRead(ChannelPipeline pipeline) {
if (!isInputShutdown0()) {
if (isAllowHalfClosure(config())) {
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else {
inputClosedSeenErrorOnRead = true;
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
当Input流已未关闭,且Channel参数ALLOW_HALF_CLOSURE为True时,会触发用户事件ChannelInputShutdownEvent,否则,直接关闭该Channel;否则会触发用户事件ChannelInputShutdownReadComplete。
handleReadException()实现源码:
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
当缓存区有数据时,会触发read事件,触发读完成时间,触发异常时间;
5.1.2、write处理
doWrite()实现源码:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
Object msg = in.current();
//无数据需要写,则清除SelectionKey的写标志
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
//调用doWriteInternal做实际的写处理
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
此处处理比较简单,如果无数据可写,则清除SelectionKeyd的写事件;否则调用doWriteInternal()进行数据的写入;之后调用incompleteWrite()对是否写完成进行处理。
doWriteInternal()实现源码:
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
//msg类型为ByteBuf类型?
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
in.remove();
return 0;
}
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
//msg类型为FileRegion类型?表示写文件数据
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
此处主要对两种缓冲区进行了区分,ByteBuf和FileRegion;如果为ByteBuf,则最终调用doWriteBytes()进行数据的写入;如果为fileRegion则最终调用doWriteFileRegion()对文件进行写入。两个写入方法都为抽象方法,由子类实现。
incompleteWrite()实现源码:
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
// setOpWrite 为 true表示数据未写完,设置 SelectionKey 写标志位
if (setOpWrite) {
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
// 清除写标志
clearOpWrite();
// Schedule flush again later so other tasks can be picked up in the meantime
// 启动清除flushTask任务,继续写半包消息
eventLoop().execute(flushTask);
}
}
6、NioSocketChannel源码分析
NioSocketChannel实现源码:
NioSocketChannel为Netty的NIO底层客户端的具体实现类,一些具体的底层处理在此处实现。
6.1、核心方法
6.1.1、doBind操作
doBind()实现源码:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
doBind0(localAddress);
}
private void doBind0(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
}
此处调用底层JDK进行Channel的绑定操作。
6.1.2、doConnect操作
doConnect()实现源码:
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
@Override
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
JDK中的Channel在非阻塞模式下调用connect()方法时,会立即返回结果:成功建立连接返回True,操作还在进行时返回False。返回False时,需要在底层OP_CONNECT事件就绪时,调用finishConnect()方法完成连接操作。
6.1.3、doWrite操作
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
本段代码做的优化是:当输出缓冲区中有多个buffer时,采用Gathering Writes将数据从这些buffer写入到同一个channel。同时动态调整一次写的最大缓冲区大小。
7、AbstractNioMessageChannel源码分析
AbstractNioMessageChannel类继承图如下:
7.1、核心方法
AbstractNioMessageChannel 是 NioServerSocketChannel、NioDatagramChannel 的父类。其主要方法也是 doWrite,功能和 AbstractNioByteChannel 的 doWrite 也类似,区别只是后者只处理 ByteBuf 和 FileRegion,前者无此限制,处理所有 Object。
7.1.1、doWrite
doWrite实现源码:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
//无数据可写则清除SelectionKey的写事件标志
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
//doWriteMessage为抽象方法,由子类实现
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
//未写完则设置SelectionKey的写标志位,等待底层可写时继续写入
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
}
doWriteMessage 方法在 NioServerSocketChannel 中实现如下所示,是因为 NioServerSocketChannel 只是用来监听端口,接收客户端请求,不负责传输实际数据。
NioServerSocketChannel中doWriteMessage()实现:
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
7.1.2、read实现
read()实现源码:
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//doReadMessages为抽象方法,由子类实现
int localRead = doReadMessages(readBuf);
//无数据可读?
if (localRead == 0) {
break;
}
//读取数据出错?
if (localRead < 0) {
closed = true;
break;
}
//增加消息数
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//触发pipeline的read事件,将读取到的数据交由应用层handler进行处理
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
//触发pipeline的读取完成事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
//触发pipeline的异常事件
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
8、NioServerSocketChannel源码分析
NioServerSocketChannel类继承图:
NioServerSocketChannel 是服务端 Channel 的实现类,有一个用于配置 TCP 参数的 ServerSocketChannelConfig。
8.1、核心方法
NioServerSocketChannel为Netty的服务端NIO的实现,其只支持bind、read和close操作。
8.1.1、doBind实现
doBind()实现源码:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
bind()的实现都是基于底层JDK实现的。
8.1.2、doReadMessage实现
doReadMessage()实现源码:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
NioServerSocketChannel用于服务端的连接监听,此处的doReadMessages()方法每次最多返回一个消息(客户端连接),由此可知NioServerSocketChannel的read操作一次至多处理的连接数为config.getMaxMessagesPerRead(),也就是参数值MAX_MESSAGES_PER_READ。
8.1.3、doClose实现
doClose()实现源码:
protected void doClose() throws Exception {
javaChannel().close();
}
调用底层JDK的Channel进行连接的关闭。
9、其他实现类
通过Channel的相关类继承图知道,Netty还有其他不同实现的Channel,除了NIO类型的,还有基于OIO的OioSocketChannel和OioServerSocketChannel,基于Epoll的EpollSocketChannel和EpollServerSocketChannel等,在此不多赘述。
相关阅读:
Netty源码愫读(一)ByteBuf相关源码学习 【https://www.jianshu.com/p/016daa404957】
Netty源码愫读(三)ChannelPipeline、ChannelHandlerContext相关源码学习【https://www.jianshu.com/p/be82d0fcdbcc】
Netty源码愫读(四)ChannelHandler相关源码学习【https://www.jianshu.com/p/6ee0a3b9d73a】
Netty源码愫读(五)EventLoop与EventLoopGroup相关源码学习【https://www.jianshu.com/p/05096995d296】
Netty源码愫读(六)ServerBootstrap相关源码学习【https://www.jianshu.com/p/a71a9a0291f3】
参考书籍:
《Netty权威指南》第二版
参考博客:
https://www.jianshu.com/p/9258af254e1d
https://blog.csdn.net/vonzhoufz/article/details/39159193
https://hk.saowen.com/a/6cb46b200931fa05dbb222736de625a1d31e664e336a139f1f4794ff83d038ad
https://blog.csdn.net/lz710117239/article/details/77651209
https://juejin.im/post/5bda9ed6f265da3913474110