Netty源码(三)NioEventLoop三部曲

前言

本文将会具体分析NioEventLoop中的thread,它的启动时机,以及所履行的职责。还会分析一些netty的实现细节,比如解决NIO的bug和一些优化等。

thread启动

之前说到NioEventLoop是由一个thread处理I/O事件和提交的任务。先看一下这个thread启动的流程。

execute 简化流程

private void execute(Runnable task, boolean immediate) {
       //是当前线程调用,直接加入队列
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            //启动线程
            startThread();
         // ......
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

可以看出启动thread是一个延迟加载的过程,在执行第一个任务的时候才会启动thread。跟进去看startThread()

    private void startThread() {
      //判断线程状态是否已启动
        if (state == ST_NOT_STARTED) {
           //CAS设置线程状态为已启动
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                  //真正去启动线程
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

doStartThread

private void doStartThread() {
        assert thread == null;
        //调用传入参数的executor的execute方法,
        //executor会新建一个线程去执行任务
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //将执行该任务的线程赋值给thread 
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //执行任务
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                // ......
            }
    }

前文分析了executor为ThreadPerTaskExecutor,执行execute方法时候为新建一个线程去执行任务,NioEventLoop的thread就是在此时赋值。
thread的启动流程简化为,首先thread启动是一个懒加载的过程,在第一次执行任务才会启动。在启动的过程中,会有一个CAS的状态判断当前线程是否已经被启动,如果thread没有启动,则通过传入的executor对象去创建thread对象,并执行SingleThreadEventExecutor.this.run()这个方法。

下面分析SingleThreadEventExecutor.this.run()这个方法,

    /**
     * Run the tasks in the {@link #taskQueue}
     */
    protected abstract void run();

可以看见是一个抽象方法,然后找到文本分析的NioEventLoop对于run的实现,这里做一个将代码做一个简化,只有主要流程

    protected void run() {
        int selectCnt = 0;
        for (; ; ) {
            //1、检测IO事件
            select();
            try {
              //2、处理准备就绪的IO事件
                processSelectedKeys();
            } finally {
                // 3、执行队列里的任务
                final long ioTime = System.nanoTime() - ioStartTime;
                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

        }
    }

NioEventLoop的职责只有三个,1、检测IO事件 ;2、处理准备就绪的IO事件;3、执行队列里的任务,用一个死循环去不断执行这三件事情。如之前画的图所示:


run

接下来就着重分析这三个步骤。

select

select步骤的核心是调用通过NIO中的selector的select()方法,返回selector上所监听到IO事件。

                    case SelectStrategy.SELECT:
                        // 获取当前任务队列的延迟时间
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            //当前任务队列为空,监听IO事件
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }

select方法

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        // Timeout will only be 0 if deadline is within 5 microsecs
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

流程整体比较简单,如果时间参数deadlineNanos为NONE,就调用selector.select()方法,这个方法会一直阻塞直到有IO事件返回。否则再判断deadlineNanos是否小于等于0,如果是调用selectNow()会立即返回当前selector上准备就绪的IO事件,否则调用selector.select(timeoutMillis)方法,会在指定时间内返回,不管是否有IO事件发生。然后跟select()方法,找到实现类io.netty.channel.nio.SelectedSelectionKeySetSelector,

    public int select() throws IOException {
        selectionKeys.reset();
        return delegate.select();
    }

一共有两步操作,第一步是将之前的selectionKeys清空,检测到就绪的IO事件都会放入selectionKeys中,这里表示新的一轮IO循环开始,所以要将之前的清空(selectionKeys后续会在详细介绍)。第二步是调用NIO中的Selector对象的select(),将最后底层的IO实现委托给它。

processSelectedKeys

processSelectedKeys这一步将会处理监测到的IO事件,比如连接、读写的IO操作。

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

这里有个细节,处理优化过后的selectedKeys还是处理原生的selectedKeys。所谓优化的selectedKeys就是将原生的selectedKeys的HashSet替换成数组实现,提高空间利用率和遍历的效率,待会儿会详细将到是怎么替换的selectedKeys。

然后跟进去看processSelectedKeysOptimized()的具体实现:

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

整体流程就是在遍历selectedKeys,将绑定在SelectionKey上的Channel取下来,然后做对应的IO操作,最后再判断是否需要重置selectedKeys。下面我会逐步分析里面的细节

第一: selectedKeys.keys[i] = null;

将SelectionKey取出之后把数组这个位置的地方置为null。为什么这么做?https://github.com/netty/netty/issues/2363描述的很清楚,简单来说就是我们并不会去清空selectedKeys数组,这就会导致在Channel关闭之后,依然会保持SelectionKey的强引用。

selectedKeys.jpg

如上图所示,假如数组原有长度为2,一次高峰期的IO事件导致数组扩容到8,之后新的IO事件的数量又达不到之前数组的位置,为导致上图坐标[6]、[7]位置会长时间持有已经关闭的Channel的引用,所以这里将其置为null,有助于GC。

第二: processSelectedKey

            
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } 

首先是将SelectionKey绑定的属性取下来,判断是否是AbstractNioChannel的类型。这里可以追踪一下netty是什么时候将AbstractNioChannel设置进去的。在AbstractNioChannel的doRegister方法

  //最后一个参数就是att
  selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

其Channel注册到底层jdk的组件中,然后将AbstractNioChannel作为参数传递进去,后续轮询出IO事件之后,再将AbstractNioChannel取出做后续操作。
具体处理IO事件
processSelectedKey(SelectionKey k, AbstractNioChannel ch)
这里贴一点核心流程,主要是判断当前Channel的操作类型,是连接还是读、写

           int readyOps = k.readyOps();
            //连接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
           //写事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            //读事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }

这里面的内部流程就不具体分析了,大致分为两个部分bossGroup监听的连接事件,将接受到的Channel转交给workGroup,然后workGroup处理读写事件,然后将事件通过ChannelPipeline将事件传播出去。具体细节可以看AbstractNioMessageChannel和AbstractNioByteChannel的read()方法,后续可能会具体分析这里的代码。

第三: needsToSelectAgain

最后一个步骤,重新设置selectedKeys

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }

什么时候需要重新select?找到needsToSelectAgain被设置为true的地方,只有唯一的一处cancel

    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }

然后看cancel被调用的地方doDeregister

    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }

由上面的两部分代码分析可以知道,channel的关闭是通过移除在selector上的注册实现的,同时会把cancelledKeys加一 。当达到了阈值CLEANUP_INTERVAL(默认256)后将cancelledKeys重置为0、needsToSelectAgain 为true。
当needsToSelectAgain 为true之后,有两个步骤:
1.selectedKeys清空 -> selectedKeys.reset(i + 1);

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }
  1. 再次填充selectedKeys ->selectAgain
    private void selectAgain() {
        needsToSelectAgain = false;
        try {
            selector.selectNow();
        } catch (Throwable t) {
            logger.warn("Failed to update SelectionKeys.", t);
        }
    }

至于为什么需要重新去填充selectedKeys,可能是需要保持selectedKeys里面的Channel都随时保持的是活跃的。

processSelectedKeys到这就分析完了,总共分为三步

  1. 遍历selectedKeys
  2. 处理IO事件
  3. 是否需要重置selectedKeys

ranTasks

现在分析thread的最后一步工作ranTasks,执行队列里的任务。
1. 任务类型
NioEventLoop里的任务类型分为两部分,一个是由taskQueue(MpscUnboundedArrayQueue)存放普通的任务,还有一个scheduledTaskQueue存放定时任务的队列。之前分析过EventLoop继承自ScheduledExecutorService,所以也需要提供执行定时任务的功能,而这里的定时任务是通过PriorityQueue来实现的。(定时任务的实现方式有很多,优先队列只是其中一种)ranTasks执行的任务其实就是两部分的内容,一个是普通队列中的任务和定时队列中的任务。
2. ioRatio
在分析执行细节之前,在提一个很重要的参数ioRatio,代表设置事件循环中I/O所需时间的百分比,意思就是在一次循环中,处理IO事件的时间与处理队列任务所占时间做一个百分比的分配,范围是1到100,当设置为100时,这个参数就失效了,默认参数为50。下面代码就是对ioRatio的使用

                //等于100的时候,参数失效,不再平衡IO事件所占时间的比例
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    //开始执行IO事件的时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // 获得IO执行总共耗时
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //按照ioRatio计算出将花费多少时间执行ranTasks 
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }

3. runAllTasks

    protected boolean runAllTasks(long timeoutNanos) {
        //将scheduledTaskQueue队列中的任务转移到taskQueue中
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        //任务为空结束
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
        //计算本次执行任务最迟的时间
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            //执行任务
            safeExecute(task);

            runTasks ++;

            //每执行64个任务之后判断时间是否超出,若超出结束循环
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            //没有任务结束循环
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

主要流程为

  1. scheduledTaskQueue队列中的任务转移到taskQueue中;
  2. 安全的执行任务(其实就是将任务try catch,以免任务执行发生异常,影响其他任务执行);
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }
  1. 每执行64个任务之后判断执行时间是否超出deadline,这里采用64个任务为一个批次,没有每次任务执行去判断,也是对性能的一个优化;
  2. 执行afterRunningAllTasks方法,其实就是执行tailTasks队列中的任务,然后记录一下最后的执行时间this.lastExecutionTime = lastExecutionTime;

一些细节

selectedKeySet

前面提到过netty将NIO中Selector的selectedKeys替换,这里分析一下为什么需要替换和么去替换的selectedKeys。

  1. 为什么替换

NIO原生的selectedKeys使用的是HashSet,而NioEventLoop将其替换成了SelectedSelectionKeySet

//SelectorImpl
protected Set<SelectionKey> selectedKeys = new HashSet();
//NioEventLoop
private SelectedSelectionKeySet selectedKeys;

SelectedSelectionKeySet构造函数

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

SelectedSelectionKeySet使用的是数组存储元素,而HashSet是基于HashMap去存储数据,采用数组使得空间利用率和遍历的效率有所提高。

2.怎么替换

要在运行时替换掉类的属性,很明显是通过反射来做到的。

  • 获取sun.nio.ch.SelectorImpl Class对象
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });
  • 创建selectedKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
  • 设置属性
    //获取属性
    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    ......
    //将selectedKeySet设置到属性中
    selectedKeysField.set(unwrappedSelector, selectedKeySet);
    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);

NIO空轮询bug

NIO有一个很出名的bug就是epoll空轮询的bug,这会导致CPU占有率到100%,java也并没有修复这个bug,netty采用了一个很巧妙的方法来绕过这个bug。
主要思想就是,通过检测发生空轮询的次数,当超过一定的阈值之后,netty将会重新创建一个selector,并将之前selector上的channel转移到新的selector上。通过重新创建selector的方式来解决NIO空轮询的bug。

unexpectedSelectorWakeup

        //空轮询的次数超过阈值,默认为512
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            //重新构建selector
            rebuildSelector();
            return true;
        }

跟进去找到具体的实现方法rebuildSelector0

        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;
        try {
            //创建新的selector
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }
        // Register all channels to the new Selector.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
        //将旧的selector上的channel全部注册到新的selector上
        }
        //赋值
        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;
        try {
            // 关闭旧的selector
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }

总结

本文分析NioEventLoop中所对应的唯一的thread,启动是一个懒加载的过程,当第一次任务执行的时候才会初始化。后续thread开始循环处理三件事件

  1. 检测IO事件 ;
  2. 处理准备就绪的IO事件;
  3. 执行队列里的任务

本文也对具体的代码进行了分析,还有一些netty对NIO的优化和bug处理,当然netty的精妙之处远不止本文分析的这些,更多的还需要自己去探索和学习。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351