Netty源码_NioEventLoop详解

这一章我们将讲解 netty 中真正使用的事件轮询器 NioEventLoop,通过这一章,你将了解:

  • netty 是如何通过一个事件轮询器管理多个嵌套字 Socket 的通道 channel
  • 又是如何即处理通道的 channelIO 事件,以及添加事件轮询器上任务的。

一. 选择器 Selector

netty 的事件轮询器也是通过的 java nio 的选择器 Selector 来管理多个嵌套字 Socket 的通道 channel
那么选择器 Selector 是如何与通道 channel,并管理它们的呢。

1.1 注册选择器

SelectableChannel 类用 register 方法将通道注册到选择器 Selector 中。

    /**
     *  将此通道注册到给定的选择器,并返回一个选择键SelectionKey。
     */
    public abstract SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException;
  1. sel : 选择器,多个通道 channel 可以注册同一个选择器,因此一个选择器就可以管理多个通道了。
  2. ops: 该通道 channel 关注的 IO 事件类型,分为读事件OP_READ,写事件OP_WRITE,连接事件OP_CONNECT和 接受事件OP_ACCEPT。可以关注一个或者多个事件。
  3. att: 可以绑定在返回值 选择键 SelectionKey 上的值,可以从选择键中获取它。
  4. 返回的选择键 SelectionKey,它其实代表的是 IO 事件类型。

如果当前通道已经注册到给定的选择器 sel 上了,调用这个方法,更改了这个通道关注的 IO 事件类型 ops,和绑定的对象 att
AbstractSelectableChannel 实现中我们看到:

    public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        throws ClosedChannelException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if ((ops & ~validOps()) != 0)
                throw new IllegalArgumentException();
            if (blocking)
                throw new IllegalBlockingModeException();
            // 已经注册到给定的 选择器 sel
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }

1.2 选择键 SelectionKey

public abstract class SelectionKey {

    /**
     * 构造该类的实例
     */
    protected SelectionKey() { }


    // -- Channel and selector operations --

    /**
     * 返回为其创建键的通道 Channel。
     * 即使在该选择键被取消之后,此方法仍将继续返回通道 Channel。
     *
     * @return  This key's channel
     */
    public abstract SelectableChannel channel();

    /**
     * 返回为其创建键的选择器 Selector。
     * 这个方法将继续返回选择器,即使在选择键被取消之后。
     *
     * @return  This key's selector
     */
    public abstract Selector selector();

    /**
     * 返回这个键是否有效。
     * 键在创建时是有效的,
     * 直到它被取消、它的通道关闭或它的选择器关闭后无效。
     */
    public abstract boolean isValid();

    /**
     * 请求取消该选择键的通道与其选择器的注册。
     * 方法返回后该选择键将是无效的,并将它添加到其选择器的 cancelled-key的集合。
     * 在下一次选择操作期间,该键将从所有选择器的键集中删除。
     * 如果此选择键已被取消,则调用此方法没有任何效果。一旦取消,选择键将永远无效。
     *
     * 这个方法可以在任何时候被调用。
     * 选择器的 cancelled-key 集合是进行过同步处理的,
     * 因此如果同时调用涉及相同选择器的取消或选择操作,可能会短暂阻塞。
     */
    public abstract void cancel();


    // -- Operation-set accessors --

    /**
     * 返回此选择键关注的 IO 事件类型集合。
     * 可以保证返回的集合只包含对这个键的通道有效的操作位。
     * 这个方法可以在任何时候被调用。
     * 它是否阻塞以及阻塞多长时间取决于具体实现。
     *
     * 如果这个键被取消,则抛出 CancelledKeyException 异常。
     */
    public abstract int interestOps();

    /**
     * 重新设置此选择键关注的 IO 事件类型集合 ops。
     * 这个方法可以在任何时候被调用。
     * 它是否阻塞以及阻塞多长时间取决于实现。
     *
     * 如果设置的 ops,不是键的通道所支持的操作,抛出 IllegalArgumentException 异常
     * 如果这个键被取消,则抛出 CancelledKeyException 异常。
     */
    public abstract SelectionKey interestOps(int ops);

    /**
     * 返回此键的就绪操作 ready-operation 集合。
     * 可以保证返回的集合只包含对这个键的通道有效的操作位。
     *
     * 如果这个键被取消,则抛出 CancelledKeyException 异常。
     */
    public abstract int readyOps();


    // -- Operation bits and bit-testing convenience methods --

    /**
     * 用于读操作的操作位。
     */
    public static final int OP_READ = 1 << 0;

    /**
     * 写操作的操作位。
     */
    public static final int OP_WRITE = 1 << 2;

    /**
     * socket-connect 操作的操作位。
     */
    public static final int OP_CONNECT = 1 << 3;

    /**
     * socket-accept 操作的操作位。
     */
    public static final int OP_ACCEPT = 1 << 4;

    /**
     * 测试此键的通道是否已准备好读取。
     */
    public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
    }

    /**
     * 测试此键的通道是否已准备好写入。
     */
    public final boolean isWritable() {
        return (readyOps() & OP_WRITE) != 0;
    }

    /**
     * 测试此键的通道是否已完成或未能完成其套接字连接操作。
     */
    public final boolean isConnectable() {
        return (readyOps() & OP_CONNECT) != 0;
    }

    /**
     * 测试此键的通道是否已准备好接受新的套接字连接。
     */
    public final boolean isAcceptable() {
        return (readyOps() & OP_ACCEPT) != 0;
    }


    // -- Attachments --

    private volatile Object attachment = null;

    private static final AtomicReferenceFieldUpdater<SelectionKey,Object>
        attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater(
            SelectionKey.class, Object.class, "attachment"
        );

    /**
     * 将给定对象附加到此键上。
     */
    public final Object attach(Object ob) {
        return attachmentUpdater.getAndSet(this, ob);
    }

    /**
     * 返回此键上当前的附加对象
     */
    public final Object attachment() {
        return attachment;
    }

}

选择键 SelectionKey 的功能并不复杂,主要是和通道channelIO 事件有关,分为四种:

  1. OP_READ = 1 << 0 读事件。
  2. OP_WRITE = 1 << 2 写事件。
  3. OP_CONNECT = 1 << 3 连接事件。
  4. OP_ACCEPT = 1 << 4 接收事件。

1.3 选择器 Selector

public abstract class Selector implements Closeable {

    /**
     * 初始化该类的新实例。
     */
    protected Selector() { }

    /**
     * 静态方法创建一个选择器 Selector
     */
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

    /**
     * 返回这个选择器是否开启。
     */
    public abstract boolean isOpen();

    /**
     * 返回创建次选择器的 provider
     */
    public abstract SelectorProvider provider();

    /**
     * 返回这个选择器的选择键集合。
     *
     * 不能直接修改这个返回集合。只有当选择键被取消且其通道被注销后,这个选择键才会从集合中删除。
     * 任何修改这个集合的尝试都会导致抛出 UnsupportedOperationException 异常。
     *
     * 这个选择键集合不是线程安全的。
     */
    public abstract Set<SelectionKey> keys();

    /**
     * 返回选择器的选定键集合,即准备好IO 事件的选择键集合。
     *
     * 可以从选定键集合中删除选择键,但是不能添加键到选定键集合中,
     * 任何向选定键集合添加对象的尝试都会导致抛出 UnsupportedOperationException 异常。
     *
     * 这个选定键集合也不是线程安全的。
     */
    public abstract Set<SelectionKey> selectedKeys();

    /**
     * 立即返回当前选择器中已准备好 IO 事件通道channel 的数量。
     * 如果此时选择器中没有准备好IO 事件的通道 channel,
     * 那么该方法直接返回 0,不会阻塞当前线程。
     *
     * 当返回值大于 0 的时候,调用 selectedKeys() 方法,
     * 获取已准备好 IO 事件的选择键集合。
     */
    public abstract int selectNow() throws IOException;

    /**
     * 返回当前选择器中已准备好 IO 事件通道 channel 的数量。
     * 如果此时选择器中没有准备好IO 事件的通道channel,
     * 那么该方法将阻塞当前线程,直到
     * 1. 有准备好IO 事件的通道channel
     * 2. 使用 Selector.wakeup 唤醒
     * 3. 阻塞线程被中断
     * 4. 超时时间 timeout 到了
     *
     * 当返回值大于 0 的时候,调用 selectedKeys() 方法,
     * 获取已准备好 IO 事件的选择键集合。
     */
    public abstract int select(long timeout)
        throws IOException;

    /**
     * 返回当前选择器中已准备好 IO 事件通道 channel 的数量。
     * 如果此时选择器中没有准备好IO 事件的通道channel,
     * 那么该方法将阻塞当前线程,直到
     * 1. 有准备好IO 事件的通道channel
     * 2. 使用 Selector.wakeup 唤醒
     * 3. 阻塞线程被中断
     *
     * 当返回值大于 0 的时候,调用 selectedKeys() 方法,
     * 获取已准备好 IO 事件的选择键集合。
     */
    public abstract int select() throws IOException;

    /**
     * 唤醒被 select() 和 select(long timeout) 阻塞的等待线程
     */
    public abstract Selector wakeup();

    /**
     * 关闭当前选择器
     */
    public abstract void close() throws IOException;
}

选择器的主要功能就是获取已经准备好的通道 channel

1.3.1 获取准备好的通道数量

一共有三个方法 int selectNow(),int select(long timeout)int select()

int selectNow() 是立即返回当前准备好的通道数量,如果没有,那就返回0,不阻塞当前线程。
int select(long timeout)int select() 都会阻塞当前线程,直到有准备好的通道,或者阻塞线程被中断,或者其他线程调用选择器的 wakeup 方法唤醒。 int select(long timeout) 方法还多了一个超时返回。

1.3.2 获取准备好的通道集合

Set<SelectionKey> selectedKeys() 方法返回准备好的选择键集合,通过选择键就可以得到对应的通道 channel

1.3.3 唤醒阻塞

Selector wakeup() 可以唤醒被 select()select(long timeout) 阻塞的等待线程。

二. NioEventLoop 中的选择器

2.1 开启选择器

NioEventLoop 的成员变量中有两个选择器实例 unwrappedSelectorselector

那是因为 netty 可能会优化选择器的选择键 SelectionKey, 所以就有了两个选择器。

  1. unwrappedSelector 通过 provider.openSelector() 方法获取的原始选择器。
  2. selector:如果不优化选择键 SelectionKeyselector就是 unwrappedSelector对象;如果优化选择键,那么 selector 就是 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet) 对象,包装了 unwrappedSelector 对象,并优化了选择键。

这些都是在 openSelector() 方法中实现。

2.2 将通道注册到选择器

  1. register(...) 方法
     /**
      * 向这个事件循环器的 Selector 注册一个任意的 SelectableChannel (不一定是由Netty创建的)。
      * 一旦注册了指定的 SelectableChannel,当 SelectableChannel 准备好时,该事件循环器将执行指定的任务。
      */
     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
         ObjectUtil.checkNotNull(ch, "ch");
         if (interestOps == 0) {
             throw new IllegalArgumentException("interestOps must be non-zero.");
         }
         if ((interestOps & ~ch.validOps()) != 0) {
             throw new IllegalArgumentException(
                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
         }
         ObjectUtil.checkNotNull(task, "task");
    
         if (isShutdown()) {
             throw new IllegalStateException("event loop shut down");
         }
    
         if (inEventLoop()) {
             // 在事件轮询器线程中,直接调用 register0 方法注册
             register0(ch, interestOps, task);
         } else {
             try {
                 // 调用 submit  方法,确保在轮询器线程中注册
                 submit(new Runnable() {
                     @Override
                     public void run() {
                         register0(ch, interestOps, task);
                     }
                 }).sync();
             } catch (InterruptedException ignore) {
                 // 即使被中断了,我们也会调度它,所以只需将线程标记为中断。
                 Thread.currentThread().interrupt();
             }
         }
     }
    

    确保在轮询器线程中注册

  2. register0(...) 方法
     private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
         try {
             // 将通道 ch 注册到选择器 unwrappedSelector 中
             ch.register(unwrappedSelector, interestOps, task);
         } catch (Exception e) {
             throw new EventLoopException("failed to register a channel", e);
         }
     }
    

    调用 SelectableChannelregister 方法,将通道注册到选择器unwrappedSelector 中去。

2.3 获取准备好的通道数量

  1. 立即获取,不阻塞
     /**
      * 返回注册在该 Selector 上的已经准备好进行I/O操作的通道 channel 的数量。
      * 如果还没有准备好的通道 channel ,那么直接返回 0,不会阻塞当前线程。
      */
     int selectNow() throws IOException {
         return selector.selectNow();
     }
    
  2. 阻塞获取
     private int select(long deadlineNanos) throws IOException {
         // 如果截止时间 deadlineNanos 是NONE(无限大)
         // 那么就使用 selector.select() 方法,不设置超时,
         // 一直阻塞等待,直到有注册在该 selector 上通道 channel 已经准备好进行I/O操作,
         // 才停止阻塞,返回准备好I/O操作 channel 的数量。
         if (deadlineNanos == NONE) {
             return selector.select();
         }
         // 计算调用 selector.select(timeoutMillis) 的超时阻塞等待时间。
         // 如果截止时间在5微秒内,超时时间将为0
         long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
         return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
     }
    
  3. 再次获取
     /**
      * 重新获取 IO 事件,即再次调用 selector.selectNow(), 不阻塞线程
      */
     private void selectAgain() {
         needsToSelectAgain = false;
         try {
             selector.selectNow();
         } catch (Throwable t) {
             logger.warn("Failed to update SelectionKeys.", t);
         }
     }
    

三. 事件轮询

3.1 run 方法

事件轮询器如何实现事件轮询的,就是主要看它的 run 方法实现:

    @Override
    protected void run() {
        int selectCnt = 0;
        // 必须使用死循环不断进行事件轮询,获取任务和通道的 IO 事件
        for (;;) {
            try {
                int strategy;
                try {
                    /**
                     * 返回处理策略,就分为两种:
                     * 有任务 hasTasks() == true,就不能等待IO事件了,先直接调用 selectNow() 方法,
                     * 获取当前准备好IO 的通道channel 的数量(0 表示一个都没有),处理 IO 事件 和任务。
                     *
                     * 没有任务 hasTasks() == false,返回 SelectStrategy.SELECT (是负数),
                     * 没有要及时处理的任务,先阻塞等待 IO 事件
                     */
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        // 返回下一个计划任务准备运行的截止时间纳秒值
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            // 返回 -1,说明没有下一个计划任务,
                            // 将 curDeadlineNanos 设置为 NONE,
                            // 调用 selector.select 方法时,就没有超时,
                            // 要无限等待了,除非被唤醒或者有准备好的 IO 事件。
                            curDeadlineNanos = NONE;
                        }
                        // 设置 超时等待时间
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                // 当前没有任务,那么就通过 selector 查看有没有 IO 事件
                                // 并设置超时时间,超时时间到了那么就要执行计划任务了
                                // 如果 curDeadlineNanos 是 NONE,就没有超时,无限等待。
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // 这个更新只是为了帮助阻止不必要的选择器唤醒,
                            // 所以使用lazySet是可以的(没有竞争条件)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // 如果我们在这里接收到IOException,那是因为Selector搞错了。
                    // 让我们重新构建选择器并重试。
                    // https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
                /**
                 * 代码走到这里,
                 * 要么有 IO 事件,即 strategy >0
                 * 要么就是有任务要运行。
                 * 如果两个都不是,那么就有可能是 JDK 的 epoll 的空轮询 BUG
                 */

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    // 如果 ioRatio
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // 确保运行了所有待执行任务,包括当前时间已经过期的计划任务
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    // strategy > 0 说明有 IO 事件,
                    // 那么需要调用 processSelectedKeys() 方法,执行 IO 时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // 计算 IO 操作花费的时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 按照比例计算可以运行任务的超时时间 ioTime * (100 - ioRatio) / ioRatio,
                        // 超时时间到了,即使还有任务没有运行,也直接返回了,等下一个周期在运行这些任务
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    // strategy == 0 说明没有 IO 事件,不用处理 IO 了
                    // 调用 runAllTasks(0) 方法,超时时间为0,这将运行最小数量的任务
                    ranTasks = runAllTasks(0);
                }

                if (ranTasks || strategy > 0) {
                    // 要么有任务运行,要么有 IO 事件处理
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) {
                    // 即没有任务运行,也没有IO 事件处理,就有可能是 JDK 的 epoll 的空轮询 BUG
                    // 调用 unexpectedSelectorWakeup(selectCnt) 方法处理。
                    // 可能会重新建立 Select

                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Error e) {
                throw e;
            } catch (Throwable t) {
                handleLoopException(t);
            } finally {
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        // 如果事件轮询器开始 shutdown,就要关闭 IO 资源
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

我们知道事件轮询器要处理两种事件:通道的 IO 事件 和 任务(包括计划任务和待执行任务),那么就要合理分配时间:

  1. 选择策略 selectStrategy,根据有无待执行任务(hasTasks() ) 进行划分:

    • 有待执行任务,那么不用等待 IO 事件,先直接调用 selectNow() 方法,获取当前准备好IO事件的通道channel ,然后处理 IO 事件和待执行任务。
    • 没有待执行任务,那么就需要阻塞等待准备好IO事件的通道;先获取下一个计划任务的截止时间 curDeadlineNanos,调用 select(curDeadlineNanos) 方法。
  2. 开始处理 IO 事件和待执行任务

    这里面有一个非常重要的属性 ioRatio,它表示事件循环中,处理IO 事件所占的时间比例。这个值越大,处理非IO 事件(待执行任务)的时间就越少;但是如果值变成 100 ,就会禁用该特性,事件循环将不会尝试平衡IO 事件和非I/O事件的时间。

    • ioRatio == 100,如果有IO事件(strategy > 0),通过processSelectedKeys() 处理IO事件,最后调用 runAllTasks() 运行全部的待执行任务。
    • 然后判断IO事件(strategy > 0),通过processSelectedKeys() 处理IO事件,计算处理IO事件的花费的时间 ioTime,根据这个时间,计算出执行非IO事件(即待执行任务)最多花费的时间ioTime * (100 - ioRatio) / ioRatio,这样就可以控制运行任务的时间了。
    • 没有 IO 事件(strategy == 0),调用 runAllTasks(0) 方法,运行最小数量的任务。
    • 最后需要考虑,如果这次被唤醒,即没有任务运行,也没有IO事件处理,那么就有可能是 JDKepoll 的空轮询 BUG;需要重新注册选择器。

3.2 处理 IO 事件

  1. processSelectedKeys
     /**
      * 处理 IO 事件
      */
     private void processSelectedKeys() {
         if (selectedKeys != null) {
             // 如果是优化的 Select, 调用 processSelectedKeysOptimized 方法
             processSelectedKeysOptimized();
         } else {
             // 如果没有优化,
             // 直接调用 selector.selectedKeys() 获取IO事件的channel
             processSelectedKeysPlain(selector.selectedKeys());
         }
     }
    
  2. processSelectedKeysOptimized
     private void processSelectedKeysOptimized() {
         for (int i = 0; i < selectedKeys.size; ++i) {
             final SelectionKey k = selectedKeys.keys[i];
             // 将数组中的条目空出来,以便在通道关闭后对其进行GC
             // See https://github.com/netty/netty/issues/2363
             selectedKeys.keys[i] = null;
    
             final Object a = k.attachment();
    
             if (a instanceof AbstractNioChannel) {
                 processSelectedKey(k, (AbstractNioChannel) a);
             } else {
                 @SuppressWarnings("unchecked")
                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                 processSelectedKey(k, task);
             }
    
             if (needsToSelectAgain) {
                 // null out entries in the array to allow to have it GC'ed once the Channel close
                 // See https://github.com/netty/netty/issues/2363
                 selectedKeys.reset(i + 1);
    
                 selectAgain();
                 i = -1;
             }
         }
     }
    
  3. processSelectedKeysPlain
     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
         // 检查集合是否为空,如果为空,则直接返回。
         // See https://github.com/netty/netty/issues/597
         if (selectedKeys.isEmpty()) {
             return;
         }
    
         // 得到 通道channel IO事件 SelectionKey 的迭代器
         Iterator<SelectionKey> i = selectedKeys.iterator();
         // 循环遍历, 这里使用 for 死循环,
         // 因为可能会再次调用 selector.selectNow() 获取IO 事件
         // 需要继续处理这些 IO 事件
         for (;;) {
             final SelectionKey k = i.next();
             final Object a = k.attachment();
             // 必须调用 remove() 方法,
             // 将这个 SelectionKey 从迭代器中移除
             i.remove();
    
             if (a instanceof AbstractNioChannel) {
                 // 如果 AbstractNioChannel 事件
                 processSelectedKey(k, (AbstractNioChannel) a);
             } else {
                 @SuppressWarnings("unchecked")
                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                 // 如果 NioTask 事件
                 processSelectedKey(k, task);
             }
    
             // 没有 IO 事件了,跳出循环
             if (!i.hasNext()) {
                 break;
             }
    
             if (needsToSelectAgain) {
                 // 如果needsToSelectAgain == true, 需要重新获取IO事件,
                 selectAgain();
                 // 再次获取 IO 事件的 selectedKeys
                 selectedKeys = selector.selectedKeys();
    
                 if (selectedKeys.isEmpty()) {
                     // 没有新的 IO 事件,就直接返回。
                     break;
                 } else {
                     // 重新获取新的迭代器, 以避免ConcurrentModificationException
                     i = selectedKeys.iterator();
                 }
             }
         }
     }
    
  4. processSelectedKey
     /**
      * 处理通道 AbstractNioChannel 的IO事件
      */
     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
         // 如果 SelectionKey 无效
         if (!k.isValid()) {
             final EventLoop eventLoop;
             try {
                 eventLoop = ch.eventLoop();
             } catch (Throwable ignored) {
                 // 如果通道实现因为没有事件循环器而抛出异常,则忽略此异常,
                 // 因为我们只是试图确定ch是否注册到该事件循环器,从而有权关闭ch。
                 return;
             }
             // 只有当ch仍然注册到这个EventLoop时才关闭ch。
             // ch可能已经从事件循环中注销,因此SelectionKey可以作为注销过程的一部分被取消,
             // 但通道仍然健康,不应该关闭。
             // See https://github.com/netty/netty/issues/5125
             if (eventLoop == this) {
                 // 关闭这个通道 channel
                 unsafe.close(unsafe.voidPromise());
             }
             return;
         }
    
         try {
             // 获取 IO 事件类型
             int readyOps = k.readyOps();
             // 首先判断是不是连接的IO事件 OP_CONNECT
             // 在尝试触发read(…)或write(…)之前,
             // 我们首先需要调用finishConnect(),
             // 否则NIO JDK通道实现可能抛出 NotYetConnectedException 异常。
             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                 // See https://github.com/netty/netty/issues/924
                 int ops = k.interestOps();
                 // 删除OP_CONNECT,否则Selector.select(..)将始终返回而不阻塞
                 ops &= ~SelectionKey.OP_CONNECT;
                 k.interestOps(ops);
    
                 unsafe.finishConnect();
             }
    
             // 首先处理写事件 OP_WRITE,因为我们可以写一些队列缓冲区,从而释放内存。
             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                 // 调用forceFlush,即使没有东西可写,它也会清除OP_WRITE
                 ch.unsafe().forceFlush();
             }
    
             // 最后处理读事件
             // 还要检查 readOps 是否为0,以解决可能导致旋转循环的JDK错误
             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                 unsafe.read();
             }
         } catch (CancelledKeyException ignored) {
             unsafe.close(unsafe.voidPromise());
         }
     }
    
     /**
      * 处理 NioTask 任务,
      */
     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
         int state = 0;
         try {
             task.channelReady(k.channel(), k);
             state = 1;
         } catch (Exception e) {
             k.cancel();
             invokeChannelUnregistered(task, k, e);
             state = 2;
         } finally {
             switch (state) {
             case 0:
                 k.cancel();
                 invokeChannelUnregistered(task, k, null);
                 break;
             case 1:
                 if (!k.isValid()) { // Cancelled by channelReady()
                     invokeChannelUnregistered(task, k, null);
                 }
                 break;
             default:
                  break;
             }
         }
     }
    

四. 总结

NioEventLoop 基本逻辑已经说清楚了,我们知道它是如何平衡处理 IO 事件和 待执行的任务的。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容