Netty源码解析—— EventLoop(二)之 EventLoopGroup
1.类结构图
2. EventExecutorGroup
EventExecutorGroup
实现 ScheduledExecutorService 、Iterable接口,这两个接口都是jdk原生接口,具体看EventExecutorGroup接口中的方法,代码如下:
// ========== 自定义接口 ===================================
//是否正在关闭
boolean isShuttingDown();
//优雅关闭线程池
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
//返回线程池终止时的异步结果
Future<?> terminationFuture();
//选择一个 EventExecutor 对象
EventExecutor next();
// ========== 实现自 Iterable 接口 ==========
@Override
Iterator<EventExecutor> iterator();
// ========== 实现自 ExecutorService 接口 ==========
@Override
@Deprecated
void shutdown();
@Override
@Deprecated
List<Runnable> shutdownNow();
@Override
Future<?> submit(Runnable task);
@Override
<T> Future<T> submit(Runnable task, T result);
@Override
<T> Future<T> submit(Callable<T> task);
// ========== 实现自 ScheduledExecutorService 接口 ==========
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
- 重点关注next()方法,该方法的功能是从线程池中选择一个线程
- 比较特殊的是,接口方法返回类型为 Future 不是 Java 原生的
java.util.concurrent.Future
,而是 Netty 自己实现的 Future 接口,如下代码:
public interface Future<V> extends java.util.concurrent.Future<V>
public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V>
3. AbstractEventExecutorGroup
io.netty.util.concurrent.AbstractEventExecutorGroup
,实现 EventExecutorGroup 接口,EventExecutor ( 事件执行器 )的分组抽象类。
3.1 submit
#submit(...)
方法,提交一个普通任务到 EventExecutor 中, 提交的 EventExecutor ,通过 #next()
方法选择
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
}
3.2 schedule
#schedule(...)
方法,提交一个定时任务到 EventExecutor 中,提交的 EventExecutor ,通过 #next()
方法选择。代码如下:
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return next().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return next().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
3.3 execute
#execute(...)
方法,在 EventExecutor 中执行一个普通任务,不需要返回结果,代码如下:
@Override
public void execute(Runnable command) {
next().execute(command);
}
3.4 invokeAll
#invokeAll(...)
方法,在 EventExecutor 中执行多个普通任务, 多个任务使用同一个 EventExecuto。代码如下:
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return next().invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return next().invokeAll(tasks, timeout, unit);
}
3.5 invokeAny
#invokeAll(...)
方法,在 EventExecutor 中执行多个普通任务,有一个执行完成即可,多个任务使用同一个 EventExecutor 。代码如下:
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return next().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return next().invokeAny(tasks, timeout, unit);
}
3.6 shutdown
#shutdown(...)
方法,关闭 EventExecutorGroup 。代码如下:
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
@Override
@Deprecated
public abstract void shutdown();
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
- 具体的
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
和#shutdown()
方法,由子类实现。
4. MultithreadEventExecutorGroup
4.1 构造方法
/**
* EventExecutor 数组
*/
private final EventExecutor[] children;
/**
* 不可变( 只读 )的 EventExecutor 数组
*/
private final Set<EventExecutor> readonlyChildren;
/**
* 已终止的 EventExecutor 数量
*/
private final AtomicInteger terminatedChildren = new AtomicInteger();
/**
* 用于终止 EventExecutor 的异步 Future
*/
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
/**
* EventExecutor 选择器
*/
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 创建执行器
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建 EventExecutor 数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
// 是否创建成功
boolean success = false;
try {
// 创建 EventExecutor 对象,newChild抽象方法,具体有子类实现
children[i] = newChild(executor, args);
// 标记创建成功
success = true;
} catch (Exception e) {
// 创建失败,抛出 IllegalStateException 异常
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 创建失败,关闭所有已创建的 EventExecutor
if (!success) {
// 优雅的关闭所有已创建的 EventExecutor,只负责关闭线程,并不知道关闭的结果
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 确保所有已创建的 EventExecutor 已关闭
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
//isTerminated() 若关闭后所有任务都已完成,则返回true。注意除非首先调用shutdown或shutdownNow,否则isTerminated永不为true。
// 返回:若关闭后所有任务都已完成,则返回true。
while (!e.isTerminated()) {
//等所有已提交的任务(包括正在跑的和队列中等待的)执行完
//或者等超时时间到
//或者线程被中断,抛出InterruptedException
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建 EventExecutor 选择器
chooser = chooserFactory.newChooser(children);
// 创建监听器,用于 EventExecutor 终止时的监听
//回调的具体逻辑是,当所有 EventExecutor 都终止完成时,
// 通过调用 Future#setSuccess(V result) 方法,通知监听器们。至于为什么设置的值是 null ,因为监听器们不关注具体的结果。
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// 线程池中的线程每终止一个增加记录数,直到全部终止设置线程池异步终止结果为成功
if (terminatedChildren.incrementAndGet() == children.length) {// 全部关闭
terminationFuture.setSuccess(null);// 设置结果,并通知监听器们。
}
}
};
// 设置监听器到每个 EventExecutor 上
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 创建不可变( 只读 )的 EventExecutor 数组
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
//设置不可变的EventExecutor集合
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
4.2 ThreadPerTaskExecutor
- 创建执行器的代码如下:
// 创建执行器
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
具体看下 ThreadPerTaskExecutor这个类,代码如下:
/**
* 实现 Executor 接口,每个任务一个线程的执行器实现类
*/
public final class ThreadPerTaskExecutor implements Executor {
/**
* 线程工厂对象
* Netty 实现自定义的 ThreadFactory 类,为 io.netty.util.concurrent.DefaultThreadFactory
*/
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
/**
* 执行任务
*
* @param command 任务
*
* 通过 ThreadFactory#newThread(Runnable) 方法,创建一个 Thread ,然后调用 Thread#start() 方法,启动线程执行任务
*/
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
io.netty.util.concurrent.ThreadPerTaskExecutor
,实现 Executor 接口,每个任务一个线程的执行器实现类threadFactory 属性,线程工程实例,通过构造方法来初始化,Netty 实现自定义的 ThreadFactory 类,为
io.netty.util.concurrent.DefaultThreadFactory
具体的创建看如下方法,创建默认的线程工厂类/** * 创建线程工厂对象,并且使用类名作为 poolType * @return */ protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); }
#execute(Runnable command)
方法,通过ThreadFactory#newThread(Runnable)
方法,创建一个 Thread ,然后调用Thread#start()
方法,启动线程执行任务
4.3 DefaultThreadFactory
4.4 EventExecutorChooserFactory
io.netty.util.concurrent.EventExecutorChooserFactory
,EventExecutorChooser 工厂接口。代码如下:
/**
* Factory that creates new {@link EventExecutorChooser}s.
*
* EventExecutorChooser 工厂接口
*/
@UnstableApi
public interface EventExecutorChooserFactory {
/**
* Returns a new {@link EventExecutorChooser}.
*
* 创建一个 EventExecutorChooser 对象
*/
EventExecutorChooser newChooser(EventExecutor[] executors);
/**
* Chooses the next {@link EventExecutor} to use.
*
* EventExecutor 选择器接口
*/
@UnstableApi
interface EventExecutorChooser {
/**
* Returns the new {@link EventExecutor} to use.
*
* 选择下一个 EventExecutor 对象
*/
EventExecutor next();
}
}
-
#newChooser(EventExecutor[] executors)
方法,创建一个 EventExecutorChooser 对象; - EventExecutorChooser 接口,EventExecutor 选择器接口。
-
#next()
方法选择下一个 EventExecutor对象;
-
4.4.1 DefaultEventExecutorChooserFactory
io.netty.util.concurrent.DefaultEventExecutorChooserFactory
,实现 EventExecutorChooserFactory 接口,默认 EventExecutorChooser 工厂实现类。代码如下
/**
* Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
*
* 实现 EventExecutorChooserFactory 接口,默认 EventExecutorChooser 工厂实现类
*/
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
/**
* 单例
*/
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {// 是否为 2 的幂次方
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
/**
* 是否为 2 的幂次方
* @param val
* @return
*/
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
}
- DefaultEventExecutorChooserFactory是个单例;
- Netty实现了两个线程选择器,虽然代码不一致,功能都是一样的:每次选择索引为上一次所选线程索引+1的线程
-
#newChooser(EventExecutor[] executors)
创建具体的选择器,根据#isPowerOfTwo(executors.length)
方法来判断,创建哪种选择器,EventExecutor 数组的大小是否为 2 的幂次方,如果是,创建PowerOfTwoEventExecutorChooser选择器,不是则创建GenericEventExecutorChooser选择器; -
#isPowerOfTwo(int val)
方法,为什么(val & -val) == val
可以判断数字是否为 2 的幂次方呢?
我们以 8 来举个例子:
- 8 的二进制为 `1000`
- -8 的二进制使用补码表示。所以,先求反生成反码为 `0111` ,然后加一生成补码为 `1000`
- 8 和 -8 与操作后,还是 8 。与操作是都为1则为1,其他都为0,所以结果还是1000&1000还是1000;
- 实际上,以 2 为幂次方的数字,都是最高位为 1 ,剩余位为 0 ,所以对应的负数,求完补码还是自己
4.4.2 PowerOfTwoEventExecutorChooser
PowerOfTwoEventExecutorChooser 实现 EventExecutorChooser 接口,基于 EventExecutor 数组的大小为 2 的幂次方的 EventExecutor 选择器实现类。这是一个优化的实现,线程池数量使用2的幂次方,这样线程池选择线程时使用位操作,能使性能最高,PowerOfTwoEventExecutorChooser 是 DefaultEventExecutorChooserFactory 的静态内部类,代码如下:
/**
* 实现 EventExecutorChooser 接口,基于 EventExecutor 数组的大小为 2 的幂次方的 EventExecutor 选择器实现类
*/
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
/**
* 自增序列
*/
private final AtomicInteger idx = new AtomicInteger();
/**
* EventExecutor 数组
*/
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
/**
* 因为 - ( 二元操作符 ) 的计算优先级高于 & ( 一元操作符 ) 。
*
* 因为 EventExecutor 数组的大小是以 2 为幂次方的数字,那么减一后,除了最高位是 0 ,剩余位都为 1 ( 例如 8 减一后等于 7 ,而 7 的二进制为 0111 。),
* 那么无论 idx 无论如何递增,再进行 & 并操作,都不会超过 EventExecutor 数组的大小。并且,还能保 证顺序递增。
* @return
*/
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
4.4.3 GenericEventExecutorChooser
GenericEventExecutorChooser 实现 EventExecutorChooser 接口,通用的 EventExecutor 选择器实现类。代码如下:
GenericEventExecutorChooser 内嵌在 DefaultEventExecutorChooserFactory 类中。
/**
* GenericEventExecutorChooser 内嵌在 DefaultEventExecutorChooserFactory 类中。
* 实现 EventExecutorChooser 接口,通用的 EventExecutor 选择器实现类
*/
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
/**
* 使用 idx 自增,并使用 EventExecutor 数组的大小来取余
* @return
*/
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
4.5 next
#next()
方法,选择下一个 EventExecutor 对象。代码如下:
/**
* 选择下一个 EventExecutor 对象
* @return
*/
@Override
public EventExecutor next() {
return chooser.next();
}
4.6 iterator
/**
* 获得 EventExecutor 数组的迭代器
* 为了避免调用方,获得迭代器后,对 EventExecutor 数组进行修改,
* 所以返回是不可变的 EventExecutor 数组 readonlyChildren 的迭代器
* @return
*/
@Override
public Iterator<EventExecutor> iterator() {
return readonlyChildren.iterator();
}
4.7 executorCount
/**
* Return the number of {@link EventExecutor} this implementation uses. This number is the maps
* 1:1 to the threads it use.
*
* 获得 EventExecutor 数组的大小
*/
public final int executorCount() {
return children.length;
}
4.8 newChild
/**
* Create a new EventExecutor which will later then accessible via the {@link #next()} method. This method will be
* called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
*
* 抽象方法,子类实现该方法,创建其对应的 EventExecutor 实现类的对象
*
*/
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
4.9 shutdownGracefully
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
优雅的关闭EventExecutor线程组,返回terminationFuture,在构造方法中由于已经设置了监听,如下代码,通过success属性来判断是否全部都关闭;
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { // 线程池中的线程每终止一个增加记录数,直到全部终止设置线程池异步终止结果为成功 if (terminatedChildren.incrementAndGet() == children.length) {// 全部关闭 terminationFuture.setSuccess(null);// 设置结果,并通知监听器们。 } } };
4.10 terminationFuture
/**
* 返回用于终止 EventExecutor 的异步 Future
* @return
*/
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
4.11 shutdown
/**
* 废弃的方法,EventExecutor线程组关闭
*/
@Override
@Deprecated
public void shutdown() {
for (EventExecutor l: children) {
l.shutdown();
}
}
4.12 isShuttingDown
/**
* 判断所有的EventExecutor是否在优雅的关闭,或者已经关闭,
* 任何一个EventExecutor没有关闭则返回false
* @return
*/
@Override
public boolean isShuttingDown() {
for (EventExecutor l: children) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
4.13 isShutdown
/**
* 判断所有的EventExecutor是否都关闭
* @return
*/
@Override
public boolean isShutdown() {
for (EventExecutor l: children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
4.14 isTerminated
/**
* EventExecutor线程组关闭后,所有任务是否都已完成
* @return
*/
@Override
public boolean isTerminated() {
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
4.15 awaitTermination
/**
* 等待所有的EventExecutor任务都执行完或者等待时间超时,返回任务是否都已经执行完
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
loop: for (EventExecutor l: children) {
for (;;) {
//超时则跳出loop循环
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
//等所有已提交的任务(包括正在跑的和队列中等待的)执行完
//或者等超时时间到
//或者线程被中断,抛出InterruptedException
//跳出for(;;)循环
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
5. EventLoopGroup
io.netty.channel.EventExecutorGroup
,继承 EventExecutorGroup 接口,EventLoop 的分组接口。代码如下:
// ========== 实现自 EventExecutorGroup 接口 ==========
/**
* Return the next {@link EventLoop} to use
* 覆盖父类接口的方法,选择下一个 EventLoop 对象
*/
@Override
EventLoop next();
// ========== 自定义接口 ==========
/**
* 注册 Channel 到 EventLoopGroup 中的一个线程上。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册
*/
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
-
#next()
方法,覆盖父类接口的方法,选择下一个 EventLoop 对象 -
#register(...)
方法,注册 Channel 到 EventLoopGroup 中的一个线程上。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册
6.MultithreadEventLoopGroup
io.netty.channel.MultithreadEventLoopGroup
,实现 EventLoopGroup 接口,继承 MultithreadEventExecutorGroup 抽象类,基于多线程的 EventLoop 的分组抽象类。
6.1 构造方法
/**
* 默认 EventLoop 线程数
* EventLoopGroup 默认拥有的 EventLoop 数量。因为一个 EventLoop 对应一个线程,所以为 CPU 数量 * 2 。
* 为什么会 * 2 呢?因为目前 CPU 基本都是超线程,一个 CPU 可对应 2 个线程。
* 在构造方法未传入 nThreads 方法参数时,使用 DEFAULT_EVENT_LOOP_THREADS 。
*/
private static final int DEFAULT_EVENT_LOOP_THREADS;
/**
* 初始化线程数
*/
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
* EventExecutorChooserFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
- 主要初始化了线程数,然后调用父类的构造方法
- 默认情况,线程数最小为1,如果配置了系统参数
io.netty.eventLoopThreads
,设置为该系统参数值,否则设置为核心数的2倍。
6.2 newDefaultThreadFactory
#newDefaultThreadFactory()
方法,创建线程工厂对象,覆盖父类方法,增加了线程优先级为 Thread.MAX_PRIORITY
,代码如下:
/**
* 创建线程工厂对象
*
* 覆盖父类方法,增加了线程优先级为 Thread.MAX_PRIORITY 。
* @return
*/
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
6.3 next()
#next()
方法,选择下一个 EventLoop 对象,覆盖父类方法,将返回值转换成 EventLoop 类,代码如下:
/**
* 选择下一个 EventLoop 对象
*
* 覆盖父类方法,将返回值转换成 EventLoop 类
* @return
*/
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
6.4 newChild
#newChild(...)
抽象方法,创建 EventExecutor 对象,覆盖父类方法,返回值改为 EventLoop 类。
/**
* 抽象方法,创建 EventExecutor 对象
*
* 覆盖父类方法,返回值改为 EventLoop 类。
* @param executor
* @param args
* @return
* @throws Exception
*/
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
6.5 register(
#register(...)
方法,注册 Channel 到 EventLoopGroup 中,通过#next()
方法来选择一个EventLoop来注册,也就是通过EventExecutorChooser选择器从线程组中选择一个;
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}
7. NioEventLoopGroup
io.netty.channel.nio.NioEventLoopGroup
,继承 MultithreadEventLoopGroup 抽象类,NioEventLoop 的分组实现类。
7.1 构造方法
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
构造方法比较多,主要是明确了父构造方法的 Object ... args
方法参数
- 第一个参数,
selectorProvider
,java.nio.channels.spi.SelectorProvider
,用于创建 Java NIO Selector 对象。 - 第二个参数,
selectStrategyFactory
,io.netty.channel.SelectStrategyFactory
,选择策略工厂。详细解析,见后续文章。 - 第三个参数,
rejectedExecutionHandler
,io.netty.channel.SelectStrategyFactory
,拒绝执行处理器。详细解析,见后续文章。
7.2 newChild
#newChild(Executor executor, Object... args)
方法,创建 NioEventLoop 对象
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
- 模板方法newChild(),用来创建线程池中的单个线程,现在我们知道
MultithreadEventExecutorGroup
中EventExecutor[] children
保存的就是NioEventLoop
7.3 setIoRatio
#setIoRatio(int ioRatio)
方法,设置所有 EventLoop 的 IO 任务占用执行时间的比例
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}
7.4 rebuildSelectors
#rebuildSelectors()
方法,重建所有 EventLoop 的 Selector 对象
/**
* Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*
* 重建所有 EventLoop 的 Selector 对象
*
* 因为 JDK 有 epoll 100% CPU Bug 。实际上,NioEventLoop 当触发该 Bug 时,
* 也会自动调用 NioEventLoop#rebuildSelector() 方法,进行重建 Selector 对象,以修复该问题。
*/
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}