我们再次回顾这幅图,通过先前的讲解,现在是不是亲切很多了。图中绿色的acceptor应该是你最熟悉的部分,之前我们在ServerBootstrap中进行了详细分析。我们知道了mainReactor是一个线程池,处理Accept事件负责接受客户端的连接;subReactor也是一个线程池,处理Read(读取客户端通道上的数据)、Write(将数据写入到客户端通道上)等事件。在这一节中,我们将深入分析这两个线程池的实现,不断完善其中的细节。我们首先从类图开始。
4.1 类图
看到这幅类图,如果你的第一印象是气势恢宏,那么恭喜你,你已经成功了一半。但不难预料的是,大多数人和我的感受是一样的:这么多类,一定很累。好在这只是第一印象,我们仔细观察,便会发现其中明显的脉络,两条线索(这里使用自下而上):NioEventLoop以及NioEventLoopGroup即线程和线程池。忽略其中大量的接口,剩余这样的两条线:
NioEventLoop --> SingleThreadEventLoop --> SingleThreadEventExecutor -->
AbstractScheduledEventExecutor --> AbstractScheduledEventExecutor -->
AbstractEventExecutor --> AbstractExecutorService
NioEventLoopGroup --> MultithreadEventLoopGroup -->
MultithreadEventExecutorGroup --> AbstractEventExecutorGroup
下面我们正式开始分析,依旧使用自顶向下的方法,从类图顶部向下、从线程池到线程分析。
4.2 EventExecutorGroup
EventExecutorGroup在类图中处于承上启下的位置,其上是Java原生的接口和类,其下是Netty新建的接口和类,由于它处于如此重要的位置,我们详细分析其中的方法。
4.2.1 Executor
首先看其继承自Executor的方法:
// Executes the given command at some time in the future
void execute(Runnable command);
只有一个简单的execute()方法,但这个方法奠定了java并发的基础,提供了异步执行任务的。
4.2.2 ExecutorService
ExecutorService的关键方法如下(其中的invoke***方法并非关键,不再列出):
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
这些方法我们能从命名中便能知道方法的作用。我们主要看submit()方法,该方法是execute()方法的扩展,相较于execute不关心执行结果,submit返回一个异步执行结果Future。这无疑是很大的进步,但这里的Future不提供回调操作,显得很鸡肋,所以Netty将Java原生的java.util.concurrent.Future扩展为io.netty.util.concurrent.Future,我们将在之后进行介绍。
4.2.3 ScheduledExecutorService
从名字可以看出,该接口提供了一系列调度方法:
ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
long period,TimeUnit unit);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,TimeUnit unit);
schedule()方法调度任务使任务在延迟一段时间后执行。scheduleAtFixedRate延迟一段时间后以固定频率执行任务,scheduleWithFixedDelay延迟一段时间后以固定延时执行任务。是不是有点头晕?那就对了,这里有一个例子专门治头晕。专家建议程序员应该每小时工作50分钟,休息10分钟,类似这样:
13:00 - 13:10 休息
13:10 - 14:00 写代码
14:00 - 14:10 休息
14:10 - 15:00 写代码
实现这样的调度我们可以使用(假设现在时间为13:00):
executor.scheduleAtFixedRate(new RestRunnable(), 0 , 60, TimeUnit.MINUTES);
executor.scheduleWithFixedDelay(new RestRunnable(), 0 , 50, TimeUnit.MINUTES);
4.2.4 EventExecutorGroup
boolean isShuttingDown();
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
Future<?> terminationFuture();
EventExecutor next();
EventExecutorGroup扩展的方法有5个,前四个可以从命名中推断出功能。shutdownGracefully()我们已经在Bootstrap一节中使用过,优雅关闭线程池;terminationFuture()返回线程池终止时的异步结果。重点关注next()方法,该方法的功能是从线程池中选择一个线程。EventExecutorGroup还覆盖了一些方法,我们不再列出,如果你感兴趣可以去源码里面查看,需要注意的是,覆盖的方法大部分是将Java原生的java.util.concurrent.Future返回值覆盖为io.netty.util.concurrent.Future。
4.3 线程池
4.3.1 AbstractEventExecutorGroup
AbstractEventExecutorGroup实现了EventExecutorGroup接口的大部分方法,实现都长的和下面的差不多:
@Override
public void execute(Runnable command) {
next().execute(command);
}
从这段代码可以看出这个线程池和程序员有一个相同点:懒。当线程池执行一个任务或命令时,步骤是这样的:(1).找一个线程。(2).交给线程执行。
4.3.2 MultithreadEventExecutorGroup
MultithreadEventExecutorGroup实现了线程的创建和线程的选择,其中的字段为:
// 线程池,数组形式可知为固定线程池
private final EventExecutor[] children;
// 线程索引,用于线程选择
private final AtomicInteger childIndex = new AtomicInteger();
// 终止的线程个数
private final AtomicInteger terminatedChildren = new AtomicInteger();
// 线程池终止时的异步结果
private final Promise<?> terminationFuture =
new DefaultPromise(GlobalEventExecutor.INSTANCE);
// 线程选择器
private final EventExecutorChooser chooser;
MultithreadEventExecutorGroup的构造方法很长,我们将选出其中的关键部分分析,故不列出整体代码。如果你是处女座,这里有一个链接MultithreadEventExecutorGroup。
我们先看构造方法签名:
protected MultithreadEventExecutorGroup(int nThreads,
ThreadFactory threadFactory, Object... args)
其中的nThreads表示线程池的固定线程数。
MultithreadEventExecutorGroup初始化的步骤是:
(1).设置线程工厂
(2).设置线程选择器
(3).实例化线程
(4).设置线程终止异步结果
首先我们看设备线程工厂的代码:
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
protected ThreadFactory newDefaultThreadFactory(),() {
return new DefaultThreadFactory(getClass());
}
如果构造参数threadFactory为空则使用默认线程池,创建默认线程池使用newDefaultThreadFactory(),这是一个protected方法,可以在子类中覆盖实现。
接着我们看设置线程选择器的代码:
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
如果线程数是2的幂次方使用2的幂次方选择器,否则使用通用选择器。下次如果有面试官问你怎么判断一个整数是2的幂次方,请甩给他这一行代码:
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
Netty实现了两个线程选择器,虽然代码不一致,功能都是一样的:每次选择索引为上一次所选线程索引+1的线程。如果你没看明白代码的含义,没关系,再看一遍。
private interface EventExecutorChooser {
EventExecutor next();
}
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
最佳实践:线程池数量使用2的幂次方,这样线程池选择线程时使用位操作,能使性能最高。
下面我们接着分析实例化线程的步骤:
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 使用模板方法newChild实例化一个线程
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// 如果不成功,所有已经实例化的线程优雅关闭
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 确保已经实例化的线程终止
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
实现的过程一句话描述就是:使用newChild()依次实例化线程,如果出错,关闭所有已经实例化的线程。也许你对finally中的代码有疑问,这是因为不清楚shutdownGracefully()的含义。你需要提前明白这样的事实:shutdownGracefully()只是通知线程池该关闭,但什么时候关闭由线程池决定,所以需要使用e.isTerminated()来判断线程池是否真正关闭。
实例化线程池正常完成后,Netty使用下面的代码设置异步终止结果:
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// 线程池中的线程每终止一个增加记录数,直到全部终止设置线程池异步终止结果为成功
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
分析完MultithreadEventExecutorGroup的构造方法,我们继续分析普通方法。它的普通方法基本与下面的isTerminated()类似:
@Override
public boolean isTerminated() {
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
总结起来就是:线程池的状态由其中的各个线程决定。明白了这点,我们使用类比的方法可以推知其他方法的实现,故不再具体分析。
4.3.3 MultithreadEventLoopGroup
MultithreadEventLoopGroup实现了EventLoopGroup接口的方法,EventLoopGroup接口作为Netty并发的关键接口,我们看其中扩展的方法:
// 将通道channel注册到EventLoopGroup中的一个线程上
ChannelFuture register(Channel channel);
// 返回的ChannelFuture为传入的ChannelPromise
ChannelFuture register(Channel channel, ChannelPromise promise);
// 覆盖父类接口的方法,返回EventLoop
@Override EventLoop next();
这些方法在MultithreadEventLoopGroup的具体实现很简单。register()方法选择一个线程,该线程负责具体的register()实现。next()方法使用父类实现,即使用上一节所述的选择器选择一个线程。代码如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
分析完这些代码,我们关注一下线程数的默认设置。
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads",
Runtime.getRuntime().availableProcessors() * 2));
默认情况,线程数最小为1,如果配置了系统参数io.netty.eventLoopThreads
,设置为该系统参数值,否则设置为核心数的2倍。
4.3.4 NioEventLoopGroup
NioEventLoopGroup的主要代码实现是模板方法newChild(),用来创建线程池中的单个线程,代码如下:
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args)
throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(),
(RejectedExecutionHandler) args[2]);
}
关于代码中的参数含义,我们放在NioEventLoop中分析。此外NioEventLoopGroup还提供了setIoRatio()和rebuildSelectors()两个方法,一个用来设置I/O任务和非I/O任务的执行时间比,一个用来重建线程中的selector来规避JDK的epoll 100% CPU Bug。其实现也是依次设置各线程的状态,故不再列出。