Netty源码解析—— EventLoop(二)之 EventLoopGroup

Netty源码解析—— EventLoop(二)之 EventLoopGroup

1.类结构图

NioEventLoopGroup.png

2. EventExecutorGroup

EventExecutorGroup 实现 ScheduledExecutorService 、Iterable接口,这两个接口都是jdk原生接口,具体看EventExecutorGroup接口中的方法,代码如下:

   // ========== 自定义接口 ===================================
   
    //是否正在关闭
    boolean isShuttingDown();

    //优雅关闭线程池
    Future<?> shutdownGracefully();
  
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

    //返回线程池终止时的异步结果
    Future<?> terminationFuture();

    //选择一个 EventExecutor 对象
    EventExecutor next();


  // ========== 实现自 Iterable 接口 ==========
    @Override
    Iterator<EventExecutor> iterator();


// ========== 实现自 ExecutorService 接口 ==========

    @Override
    @Deprecated
    void shutdown();

   
    @Override
    @Deprecated
    List<Runnable> shutdownNow();


    @Override
    Future<?> submit(Runnable task);

    @Override
    <T> Future<T> submit(Runnable task, T result);

    @Override
    <T> Future<T> submit(Callable<T> task);

    // ========== 实现自 ScheduledExecutorService 接口 ==========


    @Override
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    @Override
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
  • 重点关注next()方法,该方法的功能是从线程池中选择一个线程
  • 比较特殊的是,接口方法返回类型为 Future 不是 Java 原生的 java.util.concurrent.Future ,而是 Netty 自己实现的 Future 接口,如下代码:
public interface Future<V> extends java.util.concurrent.Future<V> 

public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V> 

3. AbstractEventExecutorGroup

io.netty.util.concurrent.AbstractEventExecutorGroup ,实现 EventExecutorGroup 接口,EventExecutor ( 事件执行器 )的分组抽象类。

3.1 submit

#submit(...) 方法,提交一个普通任务到 EventExecutor 中, 提交的 EventExecutor ,通过 #next() 方法选择

   @Override
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return next().submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return next().submit(task);
    }

3.2 schedule

#schedule(...) 方法,提交一个定时任务到 EventExecutor 中,提交的 EventExecutor ,通过 #next() 方法选择。代码如下:

@Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return next().schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return next().schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return next().scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

3.3 execute

#execute(...) 方法,在 EventExecutor 中执行一个普通任务,不需要返回结果,代码如下:

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

3.4 invokeAll

#invokeAll(...) 方法,在 EventExecutor 中执行多个普通任务, 多个任务使用同一个 EventExecuto。代码如下:

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return next().invokeAll(tasks);
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return next().invokeAll(tasks, timeout, unit);
    }

3.5 invokeAny

#invokeAll(...) 方法,在 EventExecutor 中执行多个普通任务,有一个执行完成即可,多个任务使用同一个 EventExecutor 。代码如下:

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return next().invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        return next().invokeAny(tasks, timeout, unit);
    }

3.6 shutdown

#shutdown(...) 方法,关闭 EventExecutorGroup 。代码如下:

    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    
    @Override
    @Deprecated
    public abstract void shutdown();

  
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }

  • 具体的 #shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)#shutdown() 方法,由子类实现。

4. MultithreadEventExecutorGroup

4.1 构造方法

/**
     * EventExecutor 数组
     */
    private final EventExecutor[] children;
    /**
     * 不可变( 只读 )的 EventExecutor 数组
     */
    private final Set<EventExecutor> readonlyChildren;
    /**
     * 已终止的 EventExecutor 数量
     */
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    /**
     * 用于终止 EventExecutor 的异步 Future
     */
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    /**
     * EventExecutor 选择器
     */
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

   
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }

   
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        // 创建执行器
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        // 创建 EventExecutor 数组
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            // 是否创建成功
            boolean success = false;
            try {
                // 创建 EventExecutor 对象,newChild抽象方法,具体有子类实现
                children[i] = newChild(executor, args);
                // 标记创建成功
                success = true;
            } catch (Exception e) {
                // 创建失败,抛出 IllegalStateException 异常
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 创建失败,关闭所有已创建的 EventExecutor
                if (!success) {
                    // 优雅的关闭所有已创建的 EventExecutor,只负责关闭线程,并不知道关闭的结果
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
                    // 确保所有已创建的 EventExecutor 已关闭
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            //isTerminated() 若关闭后所有任务都已完成,则返回true。注意除非首先调用shutdown或shutdownNow,否则isTerminated永不为true。
                            // 返回:若关闭后所有任务都已完成,则返回true。
                            while (!e.isTerminated()) {
                                //等所有已提交的任务(包括正在跑的和队列中等待的)执行完
                                //或者等超时时间到
                                //或者线程被中断,抛出InterruptedException
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        // 创建 EventExecutor 选择器
        chooser = chooserFactory.newChooser(children);

        // 创建监听器,用于 EventExecutor 终止时的监听
        //回调的具体逻辑是,当所有 EventExecutor 都终止完成时,
        // 通过调用 Future#setSuccess(V result) 方法,通知监听器们。至于为什么设置的值是 null ,因为监听器们不关注具体的结果。
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                 // 线程池中的线程每终止一个增加记录数,直到全部终止设置线程池异步终止结果为成功
                if (terminatedChildren.incrementAndGet() == children.length) {// 全部关闭
                    terminationFuture.setSuccess(null);// 设置结果,并通知监听器们。
                }
            }
        };

        // 设置监听器到每个 EventExecutor 上
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        // 创建不可变( 只读 )的 EventExecutor 数组
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        //设置不可变的EventExecutor集合
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

4.2 ThreadPerTaskExecutor

  • 创建执行器的代码如下:
// 创建执行器
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

具体看下 ThreadPerTaskExecutor这个类,代码如下:


/**
 * 实现 Executor 接口,每个任务一个线程的执行器实现类
 */
public final class ThreadPerTaskExecutor implements Executor {
    /**
     * 线程工厂对象
     * Netty 实现自定义的 ThreadFactory 类,为 io.netty.util.concurrent.DefaultThreadFactory
     */
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }
    /**
     * 执行任务
     *
     * @param command 任务
     *
     *  通过 ThreadFactory#newThread(Runnable) 方法,创建一个 Thread ,然后调用 Thread#start() 方法,启动线程执行任务
     */
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

  • io.netty.util.concurrent.ThreadPerTaskExecutor ,实现 Executor 接口,每个任务一个线程的执行器实现类

  • threadFactory 属性,线程工程实例,通过构造方法来初始化,Netty 实现自定义的 ThreadFactory 类,为 io.netty.util.concurrent.DefaultThreadFactory 具体的创建看如下方法,创建默认的线程工厂类

  • /**
        * 创建线程工厂对象,并且使用类名作为 poolType
        * @return
        */
       protected ThreadFactory newDefaultThreadFactory() {
           return new DefaultThreadFactory(getClass());
       }
    
  • #execute(Runnable command) 方法,通过 ThreadFactory#newThread(Runnable) 方法,创建一个 Thread ,然后调用 Thread#start() 方法,启动线程执行任务

4.3 DefaultThreadFactory

4.4 EventExecutorChooserFactory

io.netty.util.concurrent.EventExecutorChooserFactory ,EventExecutorChooser 工厂接口。代码如下:

/**
 * Factory that creates new {@link EventExecutorChooser}s.
 *
 * EventExecutorChooser 工厂接口
 */
@UnstableApi
public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     *
     * 创建一个 EventExecutorChooser 对象
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     *
     * EventExecutor 选择器接口
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         *
         * 选择下一个 EventExecutor 对象
         */
        EventExecutor next();
    }
}
  • #newChooser(EventExecutor[] executors) 方法,创建一个 EventExecutorChooser 对象;
  • EventExecutorChooser 接口,EventExecutor 选择器接口。
    • #next() 方法选择下一个 EventExecutor对象;

4.4.1 DefaultEventExecutorChooserFactory

io.netty.util.concurrent.DefaultEventExecutorChooserFactory ,实现 EventExecutorChooserFactory 接口,默认 EventExecutorChooser 工厂实现类。代码如下


/**
 * Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
 *
 * 实现 EventExecutorChooserFactory 接口,默认 EventExecutorChooser 工厂实现类
 */
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    /**
     * 单例
     */
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {// 是否为 2 的幂次方
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    /**
     * 是否为 2 的幂次方
     * @param val
     * @return
     */
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }  
}

  • DefaultEventExecutorChooserFactory是个单例;
  • Netty实现了两个线程选择器,虽然代码不一致,功能都是一样的:每次选择索引为上一次所选线程索引+1的线程
  • #newChooser(EventExecutor[] executors)创建具体的选择器,根据#isPowerOfTwo(executors.length)方法来判断,创建哪种选择器,EventExecutor 数组的大小是否为 2 的幂次方,如果是,创建PowerOfTwoEventExecutorChooser选择器,不是则创建GenericEventExecutorChooser选择器;
  • #isPowerOfTwo(int val) 方法,为什么 (val & -val) == val 可以判断数字是否为 2 的幂次方呢?

​ 我们以 8 来举个例子:

      - 8 的二进制为 `1000` 
      - -8 的二进制使用补码表示。所以,先求反生成反码为 `0111` ,然后加一生成补码为 `1000` 
      - 8 和 -8 与操作后,还是 8 。与操作是都为1则为1,其他都为0,所以结果还是1000&1000还是1000;
      - 实际上,以 2 为幂次方的数字,都是最高位为 1 ,剩余位为 0 ,所以对应的负数,求完补码还是自己

4.4.2 PowerOfTwoEventExecutorChooser

PowerOfTwoEventExecutorChooser 实现 EventExecutorChooser 接口,基于 EventExecutor 数组的大小为 2 的幂次方的 EventExecutor 选择器实现类。这是一个优化的实现,线程池数量使用2的幂次方,这样线程池选择线程时使用位操作,能使性能最高,PowerOfTwoEventExecutorChooser 是 DefaultEventExecutorChooserFactory 的静态内部类,代码如下:

 /**
     *  实现 EventExecutorChooser 接口,基于 EventExecutor 数组的大小为 2 的幂次方的 EventExecutor 选择器实现类
     */
    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        /**
         * 自增序列
         */
        private final AtomicInteger idx = new AtomicInteger();
        /**
         * EventExecutor 数组
         */
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        /**
         * 因为 - ( 二元操作符 ) 的计算优先级高于 & ( 一元操作符 ) 。
         *
         * 因为 EventExecutor 数组的大小是以 2 为幂次方的数字,那么减一后,除了最高位是 0 ,剩余位都为 1          ( 例如 8 减一后等于 7 ,而 7 的二进制为 0111 。),
         * 那么无论 idx 无论如何递增,再进行 & 并操作,都不会超过 EventExecutor 数组的大小。并且,还能保            证顺序递增。
         * @return
         */
        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

4.4.3 GenericEventExecutorChooser

GenericEventExecutorChooser 实现 EventExecutorChooser 接口,通用的 EventExecutor 选择器实现类。代码如下:

GenericEventExecutorChooser 内嵌在 DefaultEventExecutorChooserFactory 类中。

/**
     * GenericEventExecutorChooser 内嵌在 DefaultEventExecutorChooserFactory 类中。
     *  实现 EventExecutorChooser 接口,通用的 EventExecutor 选择器实现类
     */
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        /**
         * 使用 idx 自增,并使用 EventExecutor 数组的大小来取余
         * @return
         */
        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

4.5 next

#next() 方法,选择下一个 EventExecutor 对象。代码如下:

 /**
     * 选择下一个 EventExecutor 对象
     * @return
     */
    @Override
    public EventExecutor next() {
        return chooser.next();
    }

4.6 iterator

 /**
     * 获得 EventExecutor 数组的迭代器
     * 为了避免调用方,获得迭代器后,对 EventExecutor 数组进行修改,
     * 所以返回是不可变的 EventExecutor 数组 readonlyChildren 的迭代器
     * @return
     */
    @Override
    public Iterator<EventExecutor> iterator() {
        return readonlyChildren.iterator();
    }

4.7 executorCount

 /**
     * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
     * 1:1 to the threads it use.
     *
     * 获得 EventExecutor 数组的大小
     */
    public final int executorCount() {
        return children.length;
    }

4.8 newChild

   /**
     * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
     * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
     *
     * 抽象方法,子类实现该方法,创建其对应的 EventExecutor 实现类的对象
     *
     */
    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

4.9 shutdownGracefully

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l: children) {
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        return terminationFuture();
    }

  • 优雅的关闭EventExecutor线程组,返回terminationFuture,在构造方法中由于已经设置了监听,如下代码,通过success属性来判断是否全部都关闭;

  • final FutureListener<Object> terminationListener = new FutureListener<Object>() {
               @Override
               public void operationComplete(Future<Object> future) throws Exception {
                   // 线程池中的线程每终止一个增加记录数,直到全部终止设置线程池异步终止结果为成功
                   if (terminatedChildren.incrementAndGet() == children.length) {// 全部关闭
                       terminationFuture.setSuccess(null);// 设置结果,并通知监听器们。
                   }
               }
           };
    

4.10 terminationFuture

    /**
     * 返回用于终止 EventExecutor 的异步 Future
     * @return
     */
    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }

4.11 shutdown

   /**
     * 废弃的方法,EventExecutor线程组关闭
     */
    @Override
    @Deprecated
    public void shutdown() {
        for (EventExecutor l: children) {
            l.shutdown();
        }
    }

4.12 isShuttingDown

    /**
     * 判断所有的EventExecutor是否在优雅的关闭,或者已经关闭,
     * 任何一个EventExecutor没有关闭则返回false
     * @return
     */
    @Override
    public boolean isShuttingDown() {
        for (EventExecutor l: children) {
            if (!l.isShuttingDown()) {
                return false;
            }
        }
        return true;
    }

4.13 isShutdown

    /**
     * 判断所有的EventExecutor是否都关闭
     * @return
     */
    @Override
    public boolean isShutdown() {
        for (EventExecutor l: children) {
            if (!l.isShutdown()) {
                return false;
            }
        }
        return true;
    }

4.14 isTerminated

    /**
     * EventExecutor线程组关闭后,所有任务是否都已完成
     * @return
     */
    @Override
    public boolean isTerminated() {
        for (EventExecutor l: children) {
            if (!l.isTerminated()) {
                return false;
            }
        }
        return true;
    }

4.15 awaitTermination

/**
     * 等待所有的EventExecutor任务都执行完或者等待时间超时,返回任务是否都已经执行完
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        long deadline = System.nanoTime() + unit.toNanos(timeout);
        loop: for (EventExecutor l: children) {
            for (;;) {
                //超时则跳出loop循环
                long timeLeft = deadline - System.nanoTime();
                if (timeLeft <= 0) {
                    break loop;
                }
                //等所有已提交的任务(包括正在跑的和队列中等待的)执行完
                //或者等超时时间到
                //或者线程被中断,抛出InterruptedException
                //跳出for(;;)循环
                if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
                    break;
                }
            }
        }
        return isTerminated();
    }

5. EventLoopGroup

io.netty.channel.EventExecutorGroup ,继承 EventExecutorGroup 接口,EventLoop 的分组接口。代码如下:

 // ========== 实现自 EventExecutorGroup 接口 ==========
    /**
     * Return the next {@link EventLoop} to use
     * 覆盖父类接口的方法,选择下一个 EventLoop 对象
     */
    @Override
    EventLoop next();

    // ========== 自定义接口 ==========

    /**
     * 注册 Channel 到 EventLoopGroup 中的一个线程上。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册
     */
    ChannelFuture register(Channel channel);

  
    ChannelFuture register(ChannelPromise promise);

   
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
  • #next() 方法,覆盖父类接口的方法,选择下一个 EventLoop 对象
  • #register(...) 方法,注册 Channel 到 EventLoopGroup 中的一个线程上。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册

6.MultithreadEventLoopGroup

io.netty.channel.MultithreadEventLoopGroup ,实现 EventLoopGroup 接口,继承 MultithreadEventExecutorGroup 抽象类,基于多线程的 EventLoop 的分组抽象类。

6.1 构造方法

/**
     * 默认 EventLoop 线程数
     * EventLoopGroup 默认拥有的 EventLoop 数量。因为一个 EventLoop 对应一个线程,所以为 CPU 数量 * 2 。
     * 为什么会 * 2 呢?因为目前 CPU 基本都是超线程,一个 CPU 可对应 2 个线程。
     * 在构造方法未传入 nThreads 方法参数时,使用 DEFAULT_EVENT_LOOP_THREADS 。
     */
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    /**
     * 初始化线程数
     */
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
     * EventExecutorChooserFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                     Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
    }
  • 主要初始化了线程数,然后调用父类的构造方法
  • 默认情况,线程数最小为1,如果配置了系统参数io.netty.eventLoopThreads,设置为该系统参数值,否则设置为核心数的2倍。

6.2 newDefaultThreadFactory

#newDefaultThreadFactory() 方法,创建线程工厂对象,覆盖父类方法,增加了线程优先级为 Thread.MAX_PRIORITY ,代码如下:

    /**
     * 创建线程工厂对象
     *
     * 覆盖父类方法,增加了线程优先级为 Thread.MAX_PRIORITY 。
     * @return
     */
    @Override
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }

6.3 next()

#next()方法,选择下一个 EventLoop 对象,覆盖父类方法,将返回值转换成 EventLoop 类,代码如下:

    /**
     * 选择下一个 EventLoop 对象
     *
     * 覆盖父类方法,将返回值转换成 EventLoop 类
     * @return
     */
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

6.4 newChild

#newChild(...) 抽象方法,创建 EventExecutor 对象,覆盖父类方法,返回值改为 EventLoop 类。

    /**
     * 抽象方法,创建 EventExecutor 对象
     *
     * 覆盖父类方法,返回值改为 EventLoop 类。
     * @param executor
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

6.5 register(

#register(...)方法,注册 Channel 到 EventLoopGroup 中,通过#next() 方法来选择一个EventLoop来注册,也就是通过EventExecutorChooser选择器从线程组中选择一个;

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }
}

7. NioEventLoopGroup

io.netty.channel.nio.NioEventLoopGroup ,继承 MultithreadEventLoopGroup 抽象类,NioEventLoop 的分组实现类。

7.1 构造方法

    public NioEventLoopGroup() {
        this(0);
    }

    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

  
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
       
        this(nThreads, executor, SelectorProvider.provider());
    }

    
    public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
       
        this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
        final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
       
        super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    }

构造方法比较多,主要是明确了父构造方法的 Object ... args方法参数

  • 第一个参数,selectorProviderjava.nio.channels.spi.SelectorProvider ,用于创建 Java NIO Selector 对象。
  • 第二个参数,selectStrategyFactoryio.netty.channel.SelectStrategyFactory ,选择策略工厂。详细解析,见后续文章。
  • 第三个参数,rejectedExecutionHandlerio.netty.channel.SelectStrategyFactory ,拒绝执行处理器。详细解析,见后续文章。

7.2 newChild

#newChild(Executor executor, Object... args) 方法,创建 NioEventLoop 对象

 
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
  • 模板方法newChild(),用来创建线程池中的单个线程,现在我们知道MultithreadEventExecutorGroupEventExecutor[] children 保存的就是NioEventLoop

7.3 setIoRatio

#setIoRatio(int ioRatio) 方法,设置所有 EventLoop 的 IO 任务占用执行时间的比例

public void setIoRatio(int ioRatio) {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).setIoRatio(ioRatio);
        }
    }

7.4 rebuildSelectors

#rebuildSelectors() 方法,重建所有 EventLoop 的 Selector 对象

   /**
     * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
     * around the  infamous epoll 100% CPU bug.
     *
     * 重建所有 EventLoop 的 Selector 对象
     *
     * 因为 JDK 有 epoll 100% CPU Bug 。实际上,NioEventLoop 当触发该 Bug 时,
     * 也会自动调用 NioEventLoop#rebuildSelector() 方法,进行重建 Selector 对象,以修复该问题。
     */
    public void rebuildSelectors() {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).rebuildSelector();
        }
    }

源码解析好文

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352