作者: 一字马胡
转载标志 【2017-11-03】
更新日志
日期 | 更新内容 | 备注 |
---|---|---|
2017-11-03 | 添加转载标志 | 持续更新 |
线程模型与并发
什么是线程模型呢?线程模型指定了线程管理的模型。在进行并发编程的过程中,我们需要小心的处理多个线程之间的同步关系,而一个好的线程模型可以大大减少管理多个线程的成本。在阅读本文之前,你可以选择性的阅读下面列出的文章,来快速了解和回顾java中的并发编程内容:
- Java线程池详解(一)
- Java线程池详解(二)
- Java调度线程池ScheduleExecutorService
- Java调度线程池ScheduleExecutorService(续)
- Java中的ThreadLocal和 InheritableThreadLocal
- Java AQS
- Java可重入锁详解
Reactor线程模型
Reactor是一种经典的线程模型,Reactor线程模型分为单线程模型、多线程模型以及主从多线程模型。下面分别分析一下各个Reactor线程模型的优缺点。首先是Reactor单线程模型,下面的图片展示了这个线程模型的结构:
Reactor单线程模型仅使用一个线程来处理所有的事情,包括客户端的连接和到服务器的连接,以及所有连接产生的读写事件,这种线程模型需要使用异步非阻塞I/O,使得每一个操作都不会发生阻塞,Handler为具体的处理事件的处理器,而Acceptor为连接的接收者,作为服务端接收来自客户端的链接请求。这样的线程模型理论上可以仅仅使用一个线程就完成所有的事件处理,显得线程的利用率非常高,而且因为只有一个线程在工作,所有不会产生在多线程环境下会发生的各种多线程之间的并发问题,架构简单明了,线程模型的简单性决定了线程管理工作的简单性。但是这样的线程模型存在很多不足,比如:
- 仅利用一个线程来处理事件,对于目前普遍多核心的机器来说太过浪费资源
- 一个线程同时处理N个连接,管理起来较为复杂,而且性能也无法得到保证,这是以线程管理的简洁换取来的事件管理的复杂性,而且是在性能无 法得到保证的前提下换取的,在大流量的应用场景下根本没有实用性
- 根据第二条,当处理的这个线程负载过重之后,处理速度会变慢,会有大量的事件堆积,甚至超时,而超时的情况下,客户端往往会重新发送请求,这样的情况下,这个单线程的模型就会成为整个系统的瓶颈
- 单线程模型的一个致命缺钱就是可靠性问题,因为仅有一个线程在工作,如果这个线程出错了无法正常执行任务了,那么整个系统就会停止响应,也就是系统会因为这个单线程模型而变得不可用,这在绝大部分场景(所有)下是不允许出现的
介于上面的种种缺陷,Reactor演变出了第二种模型,也就是Reactor多线程模型,下面展示了这种模型:
可以发现,多线程模型下,接收链接和处理请求作为两部分分离了,而Acceptor使用单独的线程来接收请求,做好准备后就交给事件处理的handler来处理,而handler使用了一个线程池来实现,这个线程池可以使用Executor框架实现的线程池来实现,所以,一个连接会交给一个handler线程来复杂其上面的所有事件,需要注意,一个连接只会由一个线程来处理,而多个连接可能会由一个handler线程来处理,关键在于一个连接上的所有事件都只会由一个线程来处理,这样的好处就是消除了不必要的并发同步的麻烦。Reactor多线程模型似乎已经可以很好的工作在我们的项目中了,但是还有一个问题没有解决,那就是,多线程模型下任然只有一个线程来处理客户端的连接请求,那如果这个线程挂了,那整个系统任然会变为不可用,而且,因为仅仅由一个线程来负责客户端的连接请求,如果连接之后要做一些验证之类复杂耗时操作再提交给handler线程来处理的话,就会出现性能问题。
Reactor多线程模型对Reactor单线程模型做了一些改进,但是在某些场景下任然有所缺陷,所以就有了第三种Reactor模型,Reactor主从多线程模型,下面展示了这种模型的架构:
Reactor多线程模型解决了Reactor单线程模型和Reactor多线程模型中存在的问题,解决了handler的性能问题,以及Acceptor的安全以及性能问题,Netty就使用了这种线程模型来处理事件。
Netty线程模型
在了解了线程模型以及Reactor线程模型之后,我们来看一下Netty的线程模型是怎么样的。首先,Netty使用EventLoop来处理连接上的读写事件,而一个连接上的所有请求都保证在一个EventLoop中被处理,一个EventLoop中只有一个Thread,所以也就实现了一个连接上的所有事件只会在一个线程中被执行。一个EventLoopGroup包含多个EventLoop,可以把一个EventLoop当做是Reactor线程模型中的一个线程,而一个EventLoopGroup类似于一个ExecutorService,当然,这只是为了更好的理解Netty的线程模型,它们之间是没有等价关系的,后面的分析中会详细讲到。下面的图片展示了Netty的线程模型:
首先看一下Netty服务端启动的代码:
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(your_handler_name, your_handler_instance);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
Netty的服务端使用了两个EventLoopGroup,而第一个EventLoopGroup通常只有一个EventLoop,通常叫做bossGroup,负责客户端的连接请求,然后打开Channel,交给后面的EventLoopGroup中的一个EventLoop来负责这个Channel上的所有读写事件,一个Channel只会被一个EventLoop处理,而一个EventLoop可能会被分配给多个Channel来负责上面的事件,当然,Netty不仅支持NI/O,还支持OI/O,所以两者的EventLoop分配方式有所区别,下面分别展示了NI/O和OI/O的分配方式:
在NI/O非阻塞模式下,Netty将负责为每个Channel分配一个EventLoop,一旦一个EventLoop呗分配给了一个Channel,那么在它的整个生命周期中都使用这个EventLoop,但是多个Channel将可能共享一个EventLoop,所以和Thread相关的ThreadLocal的使用就要特别注意,因为有多个Channel在使用该Thread来处理读写时间。在阻塞IO模式下,考虑到一个Channel将会阻塞,所以不太可能将一个EventLoop共用于多个Channel之间,所以,每一个Channel都将被分配一个EventLoop,并且反过来也成立,也就是一个EventLoop将只会被绑定到一个Channel上来处理这个Channel上的读写事件。无论是非阻塞模式还是阻塞模式,一个Channel都将会保证一个Channel上的所有读写事件都只会在一个EventLoop上被处理。
Netty EventLoop
上文中分析了Reactor线程模型以及Netty的线程模型,在Netty中,EventLoop是一个极为重要的组件,它翻译过来称为事件循环,一个EventLoop将被分配给一个Channel,来负责这个Channel的整个生命周期之内的所有事件,下面来分析一下EventLoop的结构和实现细节。首先展示了EventLoop的类图:
从EventLoop的类图中可以发现,其实EventLoop继承了Java的ScheduledExecutorService,也就是调度线程池,所以,EventLoop应当有ScheduledExecutorService提供的所有功能。那为什么需要继承ScheduledExecutorService呢,也就是为什么需要延时调度功能,那是因为,在Netty中,有可能用户线程和Netty的I/O线程同时操作网络资源,而为了减少并发锁竞争,Netty将用户线程的任务包装成Netty的task,然后向Netty的I/O任务一样去执行它们。有些时候我们需要延时执行任务,或者周期性执行任务,那么就需要调度功能。这是Netty在设计上的考虑,为我们极大的简化的编程方法。
EventLoop是一个接口,它在继承了ScheduledExecutorService等多个类的同时,仅仅提供了一个方法parent,这个方法返回它属于哪个EventLoopGroup。本文只分析非阻塞模式,而阻塞模式留到未来某个合适的时候再做分析总结。在上文中展示的服务端启动的代码中我们发现我们使用的EventLoop是一个子类NioEventLoopGroup,下面就来分析一下NioEventLoopGroup这个类。首先展示一下NioEventLoopGroup的类图:
可以发现,NioEventLoopGroup的实现非常的复杂,但是只要我们清楚了Netty的线程模型,我们就可以有入口去分析它的代码。首先,我们知道每个EventLoop只要一个Thread来处理事件,那我们就来找到那个Thread在什么地方。可以在SingleThreadEventExecutor类中找到thread,它的初始化在doStartThread这个方法中,而这个方法被startThread方法调用,而startThread 这个方法被execute方法调用,也就是提交任务的入口,这个方法是Executor接口的唯一方法。也就是说,所有我们通过EventLoop的execute方法提交的任务都将被这个Thread线程来执行。我们还知道一个事实,EventLoop是一个循环执行来消耗Channel事件的类,那么它必然会有一个类似循环的方法来作为任务,来提交给这个Thread来执行,而这可以在doStartThread方法中被发现,因为这个方法非常重要,所以下面展示了它的实现细节,但是去掉了一些代码来减少代码量:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
terminationFuture.setSuccess(null);
}
}
}
}
});
}
上面所提到的事件循环就是通过SingleThreadEventExecutor.this.run()这句话来触发的。这个run方法的具体实现在NioEventLoop中,下面展示了它的实现代码:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
首先,我们来分析一下NioEventLoop的相关细节,在一个无限循环里面,只有在遇到shutdown的情况下才会停止循环。然后在循环里会询问是否有事件,如果没有,则继续循环,如果有事件,那么就开始处理时间。上文中我们提到,在事件循环中我们不仅要处理IO事件,还要处理非I/O事件。Netty中可以设置用于I/O操作和非I/O操作的时间占比,默认各位50%,也就是说,如果某次I/O操作的时间花了100ms,那么这次循环中非I/O得任务也可以花费100ms。Netty中的I/O时间处理通过processSelectedKeys方法来进行,而非I/O操作通过runAllTasks反复来进行,首先来看runAllTasks方法,虽然设定了一个可以运行的时间参数,但是实际上Netty并不保证能精确的确保非I/O任务只运行设定的毫秒,下面来看下runAllTasks的代码:
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
// 将任务运行起来
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
可以看到,这个方法是在每运行了64个任务之后再进行比较的,如果超出了设定的运行时间则退出,否则再运行64个任务再比较。所以,Netty强烈要求不要在I/O线程中运行阻塞任务,因为阻塞任务将会阻塞住Netty的事件循环,从而造成事件堆积的现象。现在回头看处理I/O任务的processSelectedKeys方法,跟踪代码之后发现最后实际处理I/O事件的一个方法为processSelectedKey,下面展示了它的代码:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
这个方法运行的流程为:
- 从Channel上获取一个unsafe对象,这个对象 是用来进行NIO操作的一系列系统级API,关于Netty的Channel的深层次分析将在另外的篇章中进行
- 从Channel上获取了eventLoop,而这个eventLoop是什么时候分配给Channel的细节在后文中进行分析
- 根据事件调用底层API来处理事件
下面,我们分析一下是什么时候将一个EventLoop分配给一个Channel的,并且这个EventLoop的那个唯一的Thread是什么时候被赋值的。在这个问题上,服务端的流程和客户端的流程可能不太一样,对于服务端来说,首先需要bind一个端口,然后在进行Accept进来的连接,而客户端需要进行connect到服务端。先来分析一下服务端。
还是看上面提供的服务端的示例代码,其中启动的代码为下面这句代码:
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
也就是我们网络编程中的bind操作,这个操作会发生什么呢?追踪代码如下:
-> AbstractBootstrap.bind(port)
-> AbstractBootstrap.bind(address)
-> AbstractBootstrap.doBind(final SocketAddress localAddress)
-> AbstractBootstrap.initAndRegister
-> AbstractBootstrap.doBind0
-> SingleThreadEventExecutor.execute
-> SingleThreadEventExecutor.startThread()
-> SingleThreadEventExecutor.doStartThread
EventLoop在AbstractBootstrap.initAndRegister中获得了一个新的Channel,然后在AbstractBootstrap.doBind0 方法里面调用接下来的方法来初始化EventLoop的Thread的工作,并且将EventLoop的时间循环打开了,可以开始接收客户端的连接请求了。下面来分析一下客户端的流程。
一个客户端的启动代码示例:
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
其中启动的关键代码为:
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
下面是connect的调用流程:
-> Bootstrap.doResolveAndConnect
-> AbstractBootstrap.initAndRegister
-> Bootstrap.doResolveAndConnect0
-> Bootstrap.doConnect
-> SingleThreadEventExecutor.execute
-> SingleThreadEventExecutor.startThread()
-> SingleThreadEventExecutor.doStartThread
后半部分和服务端的启动过程是一致的,而区别在于服务端是通过bind操作来启动的,而客户端是通过connect操作来启动的。执行到此,客户端和服务端的EventLoop都已经启动起来,服务端可以接受客户端的连接并且处理Channel上的读写事件,而客户端可以去连接远程服务端来请求数据。
EventLoopGroup
到目前为止,我们已经知道了Reactor多个线程模型,并且知道了一个EventLoop会负责一个Channel的生命周期内的所有事件,并且知道了服务端和客户端是如何启动这个EventLoop得,但是还有一个问题没有解决,那就是一个EventLoop是如何被分配给一个Channel的。下文就来分析这个分配的原理和过程。而对于阻塞I/O模型的分配和非阻塞I/O模型的分配是不一样的,在上文中也提到这个内容,所以本文只分析对于非阻塞I/O模型的分配。
EventLoopGroup是用来管理EventLoop的对象,一个EventLoopGroup里面有多个EventLoop,下面展示了EventLoopGroup的类图:
我们从实际的代码出发来分析EventLoopGroup。上文中已经展示了客户端和服务端的启动代码,其中有类似的代码如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
上文中我们分析了EventLoop被启动的过程,我们肯定,EventLoop是在分配之后启动的,因为对于服务端而言,bind是一个最开始的网络操作,对于客户端来说,connect也是最开始的网络操作,在这之前是没有关于网络I/O的操作的,所以,EventLoop的分配和启动是在这两个过程或者之后的流程中进行的,但是EventLoop的分配肯定是在启动之前的,但是EventLoop的分配和启动在bind和connect中进行,那么我们可以肯定,EventLoop的分配也是在这两个方法中进行的。为了证明这个假设,回头再看一下服务端的EventLoop的启动过程,其中有一个方法值得我们注意:AbstractBootstrap.initAndRegister,我们进行了init部分的分析,而register部分我们还没有分析,下面就对服务端来进行register部分的分析,下面展示了register的调用链路:
-> Bootstrap.doResolveAndConnect
-> AbstractBootstrap.initAndRegister
-> EventLoopGroup.register
-> MultithreadEventLoopGroup.register
-> SingleThreadEventLoop.register
-> Channel.register
-> AbstractUnsafe.register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
最后展示了AbstractUnsafe.register这个方法,在这里初始化了一个EventLoop,需要记住的一点是,EventLoopGroup中的是EventLoop,不然在追踪代码的时候会迷失。现在来正式看一下NioEventLoopGroup这个类,它的它继承了MultithreadEventExecutorGroup这个类,而我们在初始化EventLoopGroup的时候传递进去的参数,也就是我们希望这个EventLoopGroup拥有的EventLoop数量,会在MultithreadEventExecutorGroup这个类中初始化,并且是在构造函数中初始化的,如果在new EventLoopGroup的时候没有任何参数,那么默认的EventLoop的数量是机器CPU数量的两倍。现在我们来看一下MultithreadEventExecutorGroup这个类的一个重要的构造函数,这个构造函数初始化了EventLoopGroup的EventLoop。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
一个较为重要的方法为newChild,这是初始化一个EventLoop的方法,下面是它的具体实现,假设我们使用NioEventLoop:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
我们现在知道了EventLoopGroup管理着很多的EventLoop,上文中我们仅仅分析了分配的流程,但是分配的策略还没有分析,现在来分析一下EventLoopGroup是如何分配EventLoop给Channel的,我们仅分析非阻塞I/O下的分配策略,阻塞模式下的分配策略可以参考非阻塞下的分配策略。
在MultithreadEventLoopGroup.register方法中,调用了next()方法,我们来看一下这个流程:
-> MultithreadEventExecutorGroup.next()
public EventExecutor next() {
return chooser.next();
}
chooser是什么东西?
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
它是怎么初始化的呢?
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
这是它初始化最后调用的方法,这个方法在DefaultEventExecutorChooserFactory中被实现,这个参数是MultithreadEventExecutorGroup类中的children,也就是EventLoopGroup中的所有EventLoop,那这个newChooser得分配方法就是如果EventLoop的数量是2的n次方,那么就使用PowerOfTwoEventExecutorChooser来分配,否则使用GenericEventExecutorChooser来分配。这两个策略类的分配方法实现分别如下:
1、PowerOfTwoEventExecutorChooser
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
2、GenericEventExecutorChooser
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
所以,到此为止,我们可以解决为什么一个EventLoop会被分配给多个Channel的疑惑。本文到此也就结束了。篇幅较长,内容涉及到Reactor的三种线程模型,然后分析了Netty的线程模型,然后分析了Netty的EventLoop,以及EventLoopGroup,以及分析了EventLoop是怎么被分配给一个Channel的,和一个EventLoop是如何启动起来来处理事件的。最后分析了EventLoopGroup分配EventLoop的策略,对于本文涉及的内容的更为深入的分析总结,将在未来的某个适宜的时刻进行。