Netty核心组件之NioEventLoop(一)

开篇

NioEventLoop是Netty框架的Reactor线程;
NioEventLoop负责处理注册在其上面的所有Channel的IO事件,通常情况下一个NioEventLoop会下挂多个Channel;
NioEventLoop同时会负责通过execute方法提交的任务,以及通过schedule方法提交的定时任务;

在接下来几篇文章,我会通过Netty的源码深入讲解NioEventLoop的实现机制。
特别说明:基于4.1.52版本的源码

类继承关系以及重要的成员变量

先来看下NioEventLoop的类关系图和重要的属性,对其有一个整体的感知,便于后面详细分析。

  • 类继承关系
    NioEventLoop继承体系.png

    可以看到NioEventLoop的继承关系非常复杂,最上层是JDK的Executor接口,说明它归根到底是一个执行器,是用来执行任务的。另外,它实现了EventLoop接口、EventExecutorGroup接口和ScheduledExecutorService接口,继承了SingleThreadEventExecutor类,这些接口和类为这个执行器添加了十分繁复的功能特性,要搞清楚NioEventLoop的具体实现机制就要不停的在这些父类和接口中来回跳转。
  • 重要的成员变量
private Selector selector;
private SelectedSelectionKeySet selectedKeys;
private volatile Thread thread;
private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
private final Queue<Runnable> tailTasks;
  1. selector:作为NIO框架的Reactor线程,NioEventLoop需要处理网络IO事件,因此它需要有一个多路复用器,即Java NIO的Selector对象;
  2. selectedKeys:每次select操作选出来的有事件就绪的SelectionKey集合,在NioEventLoop的run方法中会处理这些事件;
  3. thread:即每个NioEventLoop绑定的线程,它们是一对一的关系,一旦绑定,在整个生命周期内都不会改变;
  4. parent:即当前的NioEventLoop所属的EventExecutorGroup;
  5. taskQueue:NioEventLoop中三大队列之一,用于保存需要被执行的任务。
  6. scheduledTaskQueue:NioEventLoop中三大队列之一,是一个优先级队列(内部其实是一个按照任务的下次执行时间排序的小顶堆),用于保存定时任务,当检测到定时任务需要被执行时,会将任务从scheduledTaskQueue中取出,放入taskQueue
  7. tailTasks:NioEventLoop中三大队列之一,用于存储当前或下一次事件循环结束后需要执行的任务;

构造函数

首先来看NioEventLoop的构造函数

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        // 设置parent、executor、addTaskWakesUp(添加任务时是否唤醒select)、创建taskQueue和tailTask队 
        // 列
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        // selector初始化
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

在构造函数中,会创建任务队列和tailTask队列

    private static Queue<Runnable> newTaskQueue(
            EventLoopTaskQueueFactory queueFactory) {
        if (queueFactory == null) {
            return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }

    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }

默认情况下,会创建MPSC,即多生产者单消费者的队列,这里最终会用到JCTools库,这里不过多介绍,感兴趣的可以自己去了解。

构造函数中还会初始化selector和根据配置对selectedKeys进行优化

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        // 如果优化选项没有开启,则直接返回
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

如果设置了优化开关(默认优化选项是开启的),则通过反射的方式从Selector中获取selectedKeys和publicSelectedKeys,将这两个成员设置为可写,通过反射,使用Netty构造的selectedKeySet将原生JDK的selectedKeys替换掉。
我们知道使用Java原生NIO接口时,需要先调Selector的select方法,再调selectedKeys方法才可以获得有IO事件准备好的SelectionKey集合。这里优化过后,只通过一步select调用,就可以从selectedKeySet获得需要的SelectionKey集合。
另外,原生Java的SelectionKey集合是一个HashSet,这里优化过后的SelectedSelectionKeySet底层是一个数组,效率更高。

run方法解析

EventLoop的职责可以用下面这张图形象的表示

EventLoop线程

EventLoop的run方法在一个for死循环中,周而复始的做着三件事:

1、从已注册的Channel监听IO事件;
2、处理IO事件;
3、从任务队列取任务执行。

    protected void run() {
        int selectCnt = 0;
        for (;;) {
              int strategy;
              try {
                    // 计算本次循环的执行策略
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                      // 调用Java NIO的多路复用器,检查注册在NioEventLoop上的Channel的IO状态
                      strategy = select(curDeadlineNanos);
                    }
                } catch (IOException e) {
                }
                // 处理IO事件
                processSelectedKeys();
                // 处理任务队列中的任务
                ranTasks = runAllTasks();
                ...
    }

下面详细解析:

  • 先来看calculateStrategy
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }

    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

    protected boolean hasTasks() {
        assert inEventLoop();
        return !taskQueue.isEmpty() || !tailTasks.isEmpty();
    }

每次循环,都会检测任务队列和IO事件,如果任务队列中没有任务,则直接返回SelectStrategy.SELECT;如果任务队列中有任务,则会调用非阻塞的selectNow检测有IO事件准备好的Channel数。

  • 阻塞的select
    当任务队列中没有任务时,直接进入select分支
                    case SelectStrategy.SELECT:
                        // 找到下一个将要执行的定时任务的截止时间
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                // 阻塞调用select
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through

nextScheduledTaskDeadlineNanos方法返回下一个将要被执行的定时任务的截止时间

    protected final long nextScheduledTaskDeadlineNanos() {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
    }

    final ScheduledFutureTask<?> peekScheduledTask() {
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
    }

NioEventLoop的定时任务队列是一个优先级队列,队列中存储的是ScheduledFutureTask对象

    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                    SCHEDULED_FUTURE_TASK_COMPARATOR,
                    11);
        }
        return scheduledTaskQueue;
    }

通过ScheduledFutureTask的compareTo方法可以看出,优先级队列中的元素是以任务的截止时间来排序的,队首元素的截止时间最小,当截止时间相同时,以任务ID排序,ID小的排在前面。

    public int compareTo(Delayed o) {
        if (this == o) {
            return 0;
        }

        ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
        long d = deadlineNanos() - that.deadlineNanos();
        if (d < 0) {
            return -1;
        } else if (d > 0) {
            return 1;
        } else if (id < that.id) {
            return -1;
        } else {
            assert id != that.id;
            return 1;
        }
    }

当定时任务ScheduledFutureTask执行后,会根据periodNanos的取值决定是否要将任务重新放回队列。从netty的注释可以清晰看到:

periodNanos为0时,表示的是只执行一次的任务,执行完后丢弃就好,不再放回队列;
periodNanos大于0时表示的是以固定的频率执行任务,下一次任务执行的开始时间是以上一次任务的开始时间为基准而得来;
periodNanos小于0时表示的是以固定的延迟时间执行任务,下一次任务的开始时间是以上一次任务的结束时间为基准而得来。

    /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
    private final long periodNanos;

看下ScheduledFutureTask的run方法

    public void run() {
        assert executor().inEventLoop();
        try {
            if (delayNanos() > 0L) {
                // 执行时间还未到,则要么移除任务,要么重新加入队列
                if (isCancelled()) {
                    scheduledExecutor().scheduledTaskQueue().removeTyped(this);
                } else {
                    scheduledExecutor().scheduleFromEventLoop(this);
                }
                return;
            }
            // 只执行一次的任务,执行完后丢弃就好,不再放回队列
            if (periodNanos == 0) {
                if (setUncancellableInternal()) {
                    V result = runTask();
                    setSuccessInternal(result);
                }
            } else {
                if (!isCancelled()) {
                    runTask();
                    if (!executor().isShutdown()) {
                        // 根据periodNanos ,计算截止时间
                        if (periodNanos > 0) {
                            deadlineNanos += periodNanos;
                        } else {
                            deadlineNanos = nanoTime() - periodNanos;
                        }
                        if (!isCancelled()) {
                            // 重新加入队列
                            scheduledExecutor().scheduledTaskQueue().add(this);
                        }
                    }
                }
            }
        } catch (Throwable cause) {
            setFailureInternal(cause);
        }
    }

当任务的执行时间还未到,则判断任务是否已经取消,如果已取消则移除任务,否则重新加入队列。对于只执行一次的任务,执行完了不会再放回队列。其他的任务,则根据periodNanos 的类型,重新计算截止时间,重新放回队列,等待下次调度。
定时任务的优先级队列到此介绍完毕,接着看NioEventLoop的run方法

                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            // 再次判断任务队列中是否有任务
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        }
    private int select(long deadlineNanos) throws IOException {
        // 如果没有定时任务,直接调Java NIO的select,进入阻塞
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        // 如果截止时间小于0.5ms,则timeoutMillis 为0,直接调非阻塞的selectNow()方法
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

在调用select之前,再次调用hasTasks()判断从上次调用该方法到目前为止是否有任务加入,多做了一层防护,因为调用select时,可能会阻塞,这时,如果任务队列中有任务就会长时间得不到执行,所以须小心谨慎。
如果任务队列中还是没有任务,则会调用select方法。在这个方法中会根据入参deadlineNanos来选择调用NIO的哪个select方法:

如果deadlineNanos为NONE,即没有定时任务时,直接调用NIO的无参select方法,进入永久阻塞,除非检测到Channel的IO事件或者被wakeup;
如果存在定时任务,且定时任务的截止时间小于0.5ms,则timeoutMillis 为0,直接调非阻塞的selectNow方法,也就是说马上有定时任务需要执行了,不要再进入阻塞了;
其他情况,调用select(timeout),进入有超时时间的阻塞。

到这里,可能有人要问了:在上面的方法中,如果调用了Java NIO的无参的select方法,就会进入阻塞,除非检测到Channel的IO事件,那么在检测到IO事件之前,加入到任务队列中的任务怎么得到执行呢?

好,你想,在检测到IO事件之前,可以退出阻塞的方法是什么?对,调用wakeup方法。那么我们来搜一下NioEventLoop中有调用Selector的wakeup方法的地方吗:

    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            selector.wakeup();
        }
    }

还真搜到了,再看一下这个方法被调用的地方

wakeup被调用.PNG

看到SingleThreadEventExecutor的execute方法了吗,就是说在调execute方法,向EventLoop提交任务时,会将EventLoop线程从Java NIO的select阻塞中唤醒。

    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
           ...
        }

        if (!addTaskWakesUp && immediate) {
            // 唤醒EventLoop线程,执行任务队列中的任务
            wakeup(inEventLoop);
        }
    }

到这里,NioEventLoop的run方法的职责之一:检测Channel的IO事件就讲解完毕。

至于IO事件的处理以及任务队列中任务的处理会在后面的文章中解析,敬请期待。

总结

在本文中,对Netty的NioEventLoop进行了深入的解读,并且详细讲解了它的三大职责之一:检测Channel的IO事件的机制。
NioEventLoop是Netty最核心的概念,内部运行机制很复杂,在接下来的两篇文章中会继续分析。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342