一个 NIO 服务端启动需要哪些要素
一个典型的 NIO 服务端应该有哪些东西来支撑他的服务呢?
ServerSocketChannel
首先要有一个 ServerSocketChannel,就像流操作都要基于 Stream 对象一样, NIO 中的所有 I/O 操作都基于 Channel 对象。
一个 Channel 代表着和某一实体的连接,这个实体可以是硬件设备、文件或者是网络套接字,通过 Channel 可以读写数据。
NIO 中的 ServerSocketChannel 相当于普通 IO 中的 ServerSocket,而客户端的 SocketChannel 则相当于普通 IO 中的 Socket。
Selector
另外还需要有一个 Selector,用来获取 Channel 的事件,Channel 有4种事件,分别是:
Accept:有可以接受的连接
Connect:已经连接成功
Read:有数据可以读取
Write:可以进行数据写入
如果我们直接从 Channel 中读取数据时,很可能会有问题,因为这时 Channel 中可能根本就没有数据可以读,而强行进行读取的话,会使线程挂起一直等待着数据的到来,直到有数据到达才被唤醒,那该线程在数据到达这段时间内将不做任何事情。
通过 Selector 来检查 Channel 的状态变化,当具体的状态满足条件时向外发出通知即可,就跟监听器一样,我们将 Channel 注册到 Selector 上,然后告诉 Selector 我这个 Channel 所感兴趣的事件列表,接下来 Selector 就会负责把满足条件的事件通知到 Channel,这样的话 Channel 就不需要每次傻傻的来读取数据了。
那 Channel 怎样才能知道他感兴趣的事件已经发生了呢,当 Channel 把自己感兴趣的事件注册到 Selector 上之后,只需要通过 Selector 提供的 select 方法去查询就好了。
所以一个典型的 NIO 服务端是这样的:
// 初始化服务端TCP连接通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置为非阻塞模式
serverChannel.configureBlocking(false);
// 创建一个selector
Selector selector = Selector.open();
// 把Channel注册到selector上
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 绑定端口
serverChannel.bind(new InetSocketAddress(8864));
了解了 NIO 是怎么玩的之后,我们来分析下 Netty 服务端是怎么启动的,首先看一个最简单的 EchoServer 的启动代码:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 1
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 2
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)) // 3
.childHandler(new ChannelInitializer<SocketChannel>() { // 4
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync(); // 5
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
整个过程主要有五个部分组成,如下图所示:
简单解释一下每个步骤的作用:
1、初始化两个 NioEventLoopGroup,这两个对象可以看做是传统 IO 编程模型中的线程组,bossGroup 主要进行监听端口,accept 新连接,并将新连接转交给 worker 线程去执行,workerGroup 主要是处理客户端请求的。
2、通过指定一个 Channel 的 Class 来创建一个 Channel 工厂,后续通过该工厂来创建 Channel,实际上这里就统一了服务端的 IO 模型了,通过统一的 api 就能轻松的指定服务端的 IO 模型是 NIO 还是 BIO,只需要指定不同的 Channel 类型即可。
3、添加一个 Server 端专属的 ChannelHandler。
4、添加一个自定义的用来处理客户端请求的 ChannelHandler,主要用来进行编解码、数据读写、逻辑处理等。
5、绑定端口并启动服务端。
下面就对这五个步骤进行详细的分析,不过上面还有一个很重要的类没有说到,就是 ServerBootstrap 引导,他主要就是负责把所有的对象聚集在一起,然后把服务启动起来,所以不进行具体的描述。
Netty 是怎样启动的
初始化 NioEventLoopGroup
NioEventLoopGroup 简单点理解就是线程组,他会持有一组线程,这里的线程就是 EventLoop。
整个 NioEventLoopGroup 的初始化的过程如下图所示,具体的流程可以查看 NioEventLoopGroup 的构造方法的执行过程,这里不贴具体的代码了:
在 NioEventLoopGroup 初始化时,依次初始化了三个对象,分别是虚线框中黄色部分对应的对象。
其中有一个对象 SelectorProvider 会在后续使用它来创建 Selector 对象。
在这些对象都初始化好之后,NioEventLoopGroup 会依次调用父类的构造方法,最终调用到父类 MultithreadEventExecutorGroup 中,然后再父类的构造方法中完成剩下的初始化工作,这些工作又可以分为四个部分:
我们来看这四个部分具体的实现。
初始化 Executor
首先是初始化一个 Executor 对象,代码如下所示:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 1.初始化 executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 省略部分代码
}
executor 从 NioEventLoopGroup 中未进行初始化,一直到这里才进行初始化,这里默认是初始化的一个 ThreadPerTaskExecutor 对象,从类名中我们可以发现,这个 Executor 在接收到新的任务时,会为每个任务都创建一个线程来执行。
创建 EventExecutor 数组
第二步是创建一个 EventExecutor 数组,如下图所示:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 1.初始化 executor
// 2.创建 EventExecutor 数组
children = new EventExecutor[nThreads];
// 省略部分代码
}
初始化 EventExecutor 数组
第三步就是对这个数组进行实例化,如下图所示:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 1.初始化 executor
// 2.创建 EventExecutor 数组
// 3.初始化每一个 EventExecutor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// 如果当前对象初始化失败,则将之前初始化的所有对象都关闭
}
}
}
// 省略部分代码
}
这里需要注意的是,初始化 EventExecutor 对象的方法是 newChild,而这个方法的实现是在 NioEventLoopGroup 类中,具体的代码如下所示:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
可以看到最终创建的是一个 NioEventLoop 对象,并且该对象持有了 SelectorProvider 和 Executor 以及其他对象。
但是我们的 children 数组中需要的 EventExecutor 对象,为什么这里 new 出来的是一个 NioEventLoop 对象呢?我们可以看一下 NioEventLoop 类的结构,如下图所示:
从类的结构图中可以看到, NioEventLoop 类继承自 SingleThreadEventLoop,而 SingleThreadEventLoop 类继承自 SingleThreadEventExecutor 类,并且实现了 EventLoop 接口。然后 SingleThreadEventExecutor 又继承自 AbstractScheduledEventExecutor 类。
AbstractScheduledEventExecutor 类又继承自 AbstractEventExecutor,该类直接实现了 EvenExecutor 接口,所以 NioEventLoop 也就是成了一个 EventExecutor。
从图中还可以看出,EventExecutor 还是继承自 ScheduleExecutorService 接口,这就说明了,EventExecutor 除了有线程池的能力,还具备调度的能力。
所以 NioEventLoop 是一个具有线程池功能的事件循环器,并且还具有调度任务的功能,为什么要拥有调度的功能呢,因为 Netty 中有很多任务,包括需要定时执行的调度任务,和一次性执行的任务。
但是 NioEventLoop 中负责执行具体任务的线程,是在 SingleThreadEventLoop 中创建的,并且只创建了一个线程,这一点从类名中就可以看出来。
那为什么只创建一个线程呢,线程池如果只有一个线程的话,那意义不就很小了吗?其实这正是 Netty 设计的精美之处,通过一个线程来支撑所有的事件循环,从而避免了多线程之间的并发问题,也减少了线程切换所带来的性能损耗。Netty 通过充分压榨这一个线程的能力,实现了一种无锁化的高效的编程模型。
初始化 EventExecutorChooser
当上面的三个步骤都完成之后,最后一个步骤就是初始化一个 EventExecutorChooser,如下所示:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 1.初始化 executor
// 2.创建 EventExecutor 数组
// 3.初始化每一个 EventExecutor
// 4.初始化 EventExecutorChooser
chooser = chooserFactory.newChooser(children);
// 省略部分代码
}
这步主要是通过一个 chooserFactory 创建了一个 EventExecutorChooser,传递进去的参数则是 EventExecutor 数组,而 EventExecutorChooser 的作用就是从 EventExecutor 数组中选择一个可用的 EventExecutor。
需要注意的是 newChooser 创建的 chooser 对象有两种类型,这取决于 children 的个数,如果个数是偶数,则选择 PowerOfTwoEventExecutorChooser,该类型的 chooser 在选择 EventExecutor 时采用位运算,效率非常高;如果个数是奇数则选择 GenericEventExecutorChooser,该类型的 chooser 在选择 EventExecutor 时采用取余运算,效率较低,这充分体现了 Netty 在性能优化上的考虑。
现在我们来总结一下初始化完 NioEventLoopGroup 之后一共创建了哪些对象:
Executor:创建了一个 Executor 对象,具体实例为:ThreadPerTaskExecutor
EventExecutor 数组:创建了一个大小为 nThread 的 EventExecutor 数组,每个实例都是一个 NioEventLoop
NioEventLoop:一个同时具备 execute 和 schedule 能力的 EventExecutor,并且每一个 NioEventLoop 只有一个 Thread 来支撑其运行
EventExecutorChooser:一个可以获取一个可用的 EventExecutor 的选择器
指定 Channel 类型
通过 .channel() 方法指定了一个 Channel 的 Class,该方法会创建一个 ChannelFactory,后续创建新的 Channel 则由该 ChannelFactory 来创建。
具体是如何创建 Channel 的,只需要看下 ChannelFactory 是如何实现的即可,实际的 ChannelFactory 实例,是一个 ReflectiveChannelFactory:
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
可以很清楚的知道,是通过反射获取到该类的构造方法,然后创建了一个实际。
这里的 Channel 都是服务端的 Channel,另外该方法除了是创建了一个 Channel 工厂之外,更重要的是他指定了 IO 模型。
我们使用 Netty 来进行编写网络应用时,一般都是使用它的 NIO 模型,因为这样才能够发挥出他最大的价值。但是如果你想使用 BIO 模型的话,也是支持的,只需要指定 Class 为:OioServerSocketChannel 即可。
添加服务端 ChannelHandler
通过 .handler() 方法可以添加一个服务端的 ChannelHandler,该 ChannelHandler 是用来处理服务端的一些逻辑的,比如纪录一些日志等等。
添加客户端 ChannelHandler
通过 .childHandler() 方法就可以添加一个用来处理 Client 端请求的 ChannelHandler,该 ChannelHandler 就是所有实际业务逻辑处理的核心部分。
包括对客户端发送过来的数据进行解码,对数据进行逻辑处理,然后生成响应数据后写回客户端。
绑定端口并启动
当以上的所有准备工作都执行完毕之后,服务端启动过程中最重要的部分就要开始执行了:那就是绑定端口,也就是执行 ServerBootstrap 的 bind 方法。该方法也可以拆分成两个独立的部分:
初始化 Channel
initAndRegister 方法,主要做的就是初始化并注册 Channel 对象,这个 Channel 的类型就是开始的时候通过 .channel() 方法指定的类型。
首先通过 ChannelFactory 创建一个我们所需类型的 Channel 对象,然后对这个 Channel 进行初始化,具体的代码如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建一个 channel
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
// 省略部分代码
}
}
创建 Channel
我们先看下创建 Channel 的过程,上面我们已经分析过了 channelFactory.newChannel() 是通过反射创建了一个 Channel 的实例,而该 Channel 的 Class 是我们指定的 NioServerSocketChannel,现在我们需要知道该 Channel 是如何被创建的,整个过程也分为四个部分:
第一步,创建一个 ServerSocketChannel:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
这里就是通过 JDK 底层的 SelectorProvider 创建了一个 ServerSocketChannel,后续都是通过该 Channel 对客户端请求进行 accept 操作。
PS:在创建 NioEventLoopGroup 时也创建了一个 SelectorProvider,不过该 provider 是为了在某个条件下重新创建一个新的 Selector ,通过 rebuildSelector 的方式来解决 JDK 的 epoll 空转的bug。
第二步,将第一步创建的 ServerSocketChannel 以及一个感兴趣的事件传入并调用父类构造方法:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这里的第二行代码,是将当前 NioServerSocketChannel 和 一个 ServerSocket 封装成一个 NioServerSocketChannelConfig。
第三步,调用父类构造方法初始化一些变量:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
// 省略部分代码
}
}
这一步主要是将一些变量保存起来,并设置了 channel 的阻塞模式为非阻塞模式。
第四步,创建核心对象,包括 channelId,unsafe,pipeline:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
因为我们本篇文章是对服务器启动流程的分析,所以这里不对每一个对象的创建过程进行分析了,只需要知道是在创建 NioServerSocketChannel 的时候,创建了 channelId,unsafe 和 pipeline 对象。后面的操作中会用到这些对象。
初始化 Channel
初始化 Channel 的 init 方法 在 AbstractBootstrap 中是一个抽象方法,具体的实现在 ServerBootstrap 中,我们来看下具体的实现:
void init(Channel channel) throws Exception {
// 获取Channel的pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
// 该childHandler对象是在创建ServerBootstrap对象时,通过childHandler方法创建的
final ChannelHandler currentChildHandler = childHandler;
// 省略部分代码
// 把在创建ServerBootstrap对象时,创建的channelHandler和childHandler对象都添加到
// channel的pipeline中去
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这里初始化最主要的工作就是为 NioServerSocketChannel 的 pipeline 添加了一个 ChannelHandler。
该 ChannelHandler 就是来处理客户端连接的接入的,通过 ServerBootstrapAcceptor 来实现。
其中的 currentChildGroup 就是我们创建的 workerGroup,currentChildHandler 则是我们通过 childHandler 方法指定的处理客户端请求的 ChannelHandler。
所以 ServerBootstrapAcceptor 主要的工作就是接受客户端的请求,并将请求转发给 workGroup 去处理,具体的处理逻辑由用户自定义的 ChannelHandler 确定。
注册 Channel
对 Channel 初始化完了之后,现在就需要对 Channel 进行注册了,如下所示:
final ChannelFuture initAndRegister() {
Channel channel = null;
// 创建并初始化 channel
ChannelFuture regFuture = config().group().register(channel);
}
config().group() 返回的是 bootstrap 的 group 属性,也就是我们创建 ServerBootstrap 时传入的 bossGroup,他是一个 NioEventLoopGroup 实例。
整个注册的过程也分为四个部分:
第一步,调用 NioEventLoopGroup 的注册方法,如下所示:
public abstract class MultithreadEventLoopGroup
extends MultithreadEventExecutorGroup
implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
其中 next() 的方法实现是在父类 MultithreadEventExecutorGroup 中,也就是第二步的操作。
第二步,获取一个可用的 EventExecutor,如下所示:
public abstract class MultithreadEventExecutorGroup
extends AbstractEventExecutorGroup {
@Override
public EventExecutor next() {
return chooser.next();
}
}
看到 chooser 我们应该马上反应出来,这个对象是在初始化 NioEventLoopGroup 的时候创建的,同时还通过 newChild 方法创建了 nThreads 个 EventExecutor,并将这些 EventExecutor 都保存在了一个叫 children 的数组中,这里的 EventExecutor 实例是一个 NioEventLoop 。
chooser 选择器的 next 方法其实就是从 children 数组中获取一个可用的 EventExecutor,也就是获取一个可用的 NioEventLoop,所以 next().register(channel) 的方法,实际上是执行的 NioEventLoop 的 register(channel) 方法。
第三步,执行 NioEventLoop 的 register 方法,该方法是在 NioEventLoop 的父类 SingleThreadEventLoop 中实现的,如下所示:
public abstract class SingleThreadEventLoop
extends SingleThreadEventExecutor
implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
}
第四步,执行 unsafe 的 register 方法,看到 unsafe 我们也应该马上就想到,该对象是在初始化 NioServerSocketChannel 的时候,在父类 AbstractChannel 中通过 newUnsafe() 方法初始化的,所以我们回到 AbstractChannel 中查看 newUnsafe 方法创建的 unsafe 对象到底是什么。
但是很可惜,该方法是一个抽象方法,如下所示:
public abstract class AbstractChannel
extends DefaultAttributeMap
implements Channel {
protected abstract AbstractUnsafe newUnsafe();
}
所以需要到 AbstractChannel 的子类中寻找该方法的实现,并且该子类也要满足是 NIOServerSocketChannel 的父类,所以很容易找到,该实现类是:AbstractNioMessageChannel,现在看下 newUnsafe 方法创建的 unsafe 对象是什么,如下所示:
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
private final class NioMessageUnsafe
extends AbstractNioUnsafe {
}
所以现在我们得到了 unsafe 对象的实例是 NioMessageUnsafe,接下来就看 NioMessageUnsafe 的 register 方法了,该方法是在父类 AbstractUnsafe 中实现的,如下所示:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略部分代码
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 省略部分代码
}
}
}
但是该方法仍然不是最终的注册方法,又调用了一个叫 register0 的方法,但是该方法调用在两个分支内:
如果 eventLoop.inEventLoop 为 true 则直接调用
否则将该方法封装成一个 Runnable 交给 eventLoop 去执行
这里我们需要注意的是 Netty 的 EventLoop 底层是使用的一个单线程来支撑他的工作的,很多操作都会看到对于当前线程的判断,这正是 Netty 线程模型高效的原因所在。
让该线程执行某个方法之前,要先判断,当前是否在 EventLoop 线程之中,如果在的话就直接执行,否则将需要执行的方法封装成一个 Runnable 交给 EventLoop 去调度,EventLoop 会在下个时间点来执行该任务,并且是在 EventLoop 线程中执行。
该方法可以拆分成两个部分,如下图所示:
第一步调用的是 doRegister 方法,如下所示:
private void register0(ChannelPromise promise) {
try {
// 省略部分代码
// 执行具体的注册
doRegister();
} catch (Throwable t) {
// 省略部分的代码
}
}
doRegister 方法在 AbstractChannel 中是一个空实现,所以我们需要到他的子类中去寻找具体的实现,很容易我们在 AbstractNioChannel 中找到了该方法的实现,如下所示:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
到这里就是最底层的调用了,这里的 JavaChannel() 方法获取到的就是我们创建 NioServerSocketChannel 时通过 provider.openServerSocketChannel() 方法创建的 ServerSocketChannel。
而 eventLoop().unwrappedSelector() 方法获取到的则是通过 provider.openSelector() 创建的一个原始的 Selector 对象。
换一个写法这步操作实际上就是:
ServerSocketChannel.register(Selector, interest set);
就是通过这一步操作把 ServerSocketChannel 感兴趣的事件注册到 JDK 底层的一个 Selector 上。
但是有可能这个这个时候,注册事件有可能失败,所以需要立即执行一次 selector 的 selectNow 方法,因为这个被“取消”的SelectionKey 可能还缓存着没有被移除。然后尝试进行第二次注册,如果成功的话就直接返回,如果还是失败的话就抛出异常。
第二步,通过 pipeline 触发 handler 中的某些回调方法,如下所示:
private void register0(ChannelPromise promise) {
try {
// 省略部分代码
// 执行具体的注册
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// 省略部分代码
}
}
首先通过 pipeline 的 invokeHandlerAddedIfNeeded 方法来触发调用那些通过 pipeline.addLast() 方法添加的 ChannelHandler 的 channelAdded() 方法。
然后通知 promise 已经成功了,现在可以执行监听器的 operationComplete 方法了。
最后 isActive() 方法默认是返回的 false,因为到现在我们还没有绑定端口呢。
绑定端口
当 Channel 已经创建、初始化、并成功注册好之后,最后就需要执行端口绑定了,现在回到 ServerBootstrap 的 doBind 方法中,如下所示:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化Channel并进行注册
// 省略部分代码
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
当注册的结果 regFuture 已经完成了,那么就可以直接执行绑定操作了,否则需要在 regFuture 上增加一个监听器,当注册完成时再执行绑定操作,不管怎么样,具体的绑定操作都是在 doBind0 方法中,如下所示:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
doBind0 的方法可以拆分成以下几个部分:
中间的几步调用过程这里就不再赘述了,大家可以从源码中跟一下,我们直接跳到最后一步调用 unsafe 的 bind 方法,不出意外就是在这一步进行了 JDK 底层 Channel 的端口绑定了,最终代码如下:
public class NioServerSocketChannel
extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
}
看到这里一切就都清楚了,终于将端口绑定到了 ServerSocketChannel 上去了。
Netty 启动过程总结
下面我们总结下 Netty 是如何在整个启动过程中,把 NIO 那一套启动步骤完美的分散到各个地方去的。
ServerSocketChannel 在哪创建的
首先需要知道 Netty 是在哪里创建的 ServerSocketChannel,是在初始化 NioServerSocketChannel 对象的时候在 newSocket 方法中,通过 provider.openServerSocketChannel() 创建的。
非阻塞模式在哪设置的
同样是在初始化 NioServerSocketChannel 对象的时候,在调用到父类 AbstractNioChannel 的构造方法的时候,执行的 ch.configureBlocking(false)。
Selector 在哪创建的
在 NioEventLoop 中通过 provider.openSelector() 创建的,并将该 Selector 对象存放在一个叫 unwrappedSelector 的变量中。
而 NioEventLoop 中的 provider,是在初始化 NioEventLoopGroup 时,通过 SelectorProvider.provider() 创建的,并最终传递给了 NioEventLoop。
Channel 是什么时候注册到 Selector 上去的
在 initAndRegister 方法中最终执行到了 AbstractNioChannel 中的 doRegister 方法,在该方法中将 Channel 注册到 Selector 上去的。
端口是什么时候绑定的
在 ServerBootstrap 的 doBind 方法中会先执行 initAndRegister 方法,执行完之后就会执行 doBind0 来进行端口绑定,最终会执行到 NioServerSocketChannel 类中的 doBind 方法,在该方法中完成了 JDK 底层端口的绑定。