Netty源码深度解析-EventLoop(1)EventLoop的构造

导读

原创文章,转载请注明出处。

本文源码地址:netty-source-code-analysis

本文所使用的netty版本4.1.6.Final:带注释的netty源码

EventLoop在netty中发挥着驱动引擎的作用,本文我们以NioEventLoopGroupNioEventLoop为例着重分析一下EventLoopGroupEventLoop的创建、一些重要的数据结构和netty的一些优化。

1 NioEventLoopGroup

咱们以NioEventLoopGroup为例进行分析。NioEventLoopGroup有很多构造方法,咱们不再一一贴出,只贴出2个关键的构造方法。

NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider)该构造方法给出了SelectStrategyFactory的默认值为DefaultSelectStrategyFactory.INSTANCEEventLoop在每次循环时需要调用该类的calculateStrategy方法来决定循环的策略,具体咱们后边讲。

我们通过new NioEventLoopGroup()或者new NioEventLoopGroup(32)创建EventLoopGroup时最终都会调用到这个构造方法。接着又调用了另一个构造方法,咱们跟下去。

public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

这里调用了父类 MultithreadEventLoopGroup的构造方法,咱们继续跟下去。

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

MultithreadEventLoopGroup的构造方法如下,这里给出了nThreads的默认值,为MultithreadEventLoopGroup的静态属性DEFAULT_EVENT_LOOP_THREADS。接着调用父类MultithreadEventExecutorGroup的构造方法。

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

DEFAULT_EVENT_LOOP_THREADS的赋值在MultithreadEventLoopGroup的静态代码块中,我们看到该值默认为cpu核数×2。

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    

咱们接着跟到MultithreadEventExecutorGroup的构造方法,这里继续调用本类的构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),并且为executor赋默认值。

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}

executor的默认值为ThreadPerTaskExecutor,顾名思义就是为每一个任务创建一个线程,我们看一下它的实现,代码如下,execute方法中为每一个任务创建了一个线程。

public final class ThreadPerTaskExecutor implements Executor {

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

我们继续跟到MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),这里接着调用另一个构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)并且为chooserFacotry赋默认值,chooserFactory咱们在“服务端的启动流程”和“客户端的启动流程”中均有涉及,作用是在为Channel绑定EventLoop时轮询地从EventLoopGroup里选择EventLoop,这里不再赘述。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

接着看下一个构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)这里就是创建EventLoopGroup的核心逻辑了

首先创建了一个EventExecutor数组,并赋值给children属性,数组长度和nThreads相等,很容易想到,这里应该就是保存EventLoop的数组了。

紧接着循环调用newChild方法为数组的每一个元素赋值。

最后调用chooserFactory.newChooser(children)chooser赋值。

 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
              
            } finally {
            }
        }
        chooser = chooserFactory.newChooser(children);
}

我们跟着看一下newChild方法,该方法为抽象的,这里newChild方法的实现在NioEventLoopGroup中。好了,到这里咱们看到了NioEventLoop的构造方法,继续跟进去。

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

2 NioEventLoop

来看NioEventLoop的构造方法,这里NioEventLoop保存了3个属性。

  • provider: selector的工厂类,从provider可以得到selector
  • selector: 调用provider.openSelector()得到的selector,这里netty做了优化,咱们后边讲。
  • selectStrategy:这个的calculateStrategy方法返回值,决定了EventLoop的下一步动作,咱们也是后边讲。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    selector = openSelector();
    selectStrategy = strategy;
}

继续跟到父类SingleThreadEventLoop的构造方法,这里初始化了一个属性tailTasks,具体有什么用,咱们一会儿说,先记住这里有一个任务队列叫tailTasks

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    tailTasks = newTaskQueue(maxPendingTasks);
}

继续跟到父类SingleThreadEventExecutor的构造方法,这里有几个属性赋值。

  • addTaskWakesUp:这个是用来唤醒阻塞在taskQueue上的EventLoop线程的,在NioEventLoopEventLoop不会阻塞在taskQueue上,这里用处不是很大,可以先不研究它。
  • maxPendingTasks:后面紧接着就用到了,任务队列的最大长度。
  • executor:真正生成线程的类,默认是ThreadPerTaskExecutor
  • taskQueue任务队列,还记得上边已经有一个tailTask队列了吗,这里是另外一个队列
  • rejectedExecutionHandler:拒绝策略。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

继续跟到父类AbstractScheduledEventExecutor的构造方法,这里什么也没干,又继续调用父类的构造方法,但是我们注意到该类中也有一个队列scheduledTaskQueue,顾名思义是定时任务队列,只是该队列没有在构造方法中初始化,等到有定时任务加入时才初始化。

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

    protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
        super(parent);
    }
}

继续跟到父类AbstractEventExecutor的构造方法,这里为parent赋值,即该EventLoop所属的EventLoopGroup

protected AbstractEventExecutor(EventExecutorGroup parent) {
    this.parent = parent;
}

3 netty的一些优化

3.1 对selector的优化

我们回到NioEventLoop类中的openSelector方法,这个方法在干什么呢,它在通过反射替换sun.nio.ch.SelectorImpl类的两个属性selectedKeyspublicSelectedKeys,将这两个属性都替换成了SelectedSelectionKeySet类的实例。为什么要这么换呢,咱们接着往下看。

 private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (ClassNotFoundException e) {
                } catch (SecurityException e) {
                }
            }
        });

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    selectedKeysField.setAccessible(true);
                    publicSelectedKeysField.setAccessible(true);

                    selectedKeysField.set(selector, selectedKeySet);
                    publicSelectedKeysField.set(selector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
 
                } catch (IllegalAccessException e) {

                } catch (RuntimeException e) {
                   
                }
            }
        });
        if (maybeException instanceof Exception) {

        } else {
            //将selectedKeySet保存到selectedKeys属性中
            selectedKeys = selectedKeySet;
        }

        return selector;
    }

sun.nio.ch.SelectorImpl类中的selectedKeyspublicSelectedKeys的作用是保存有兴趣事件发生的Selectionkey,其中publicSelectedKeysselectedKeys的包装类,Util.ungrowableSet(this.selectedKeys),看方法名ungrowableSet表示包装之后这个set就不能添加元素了,但是可以删除元素。也就是说jdk内部使用selectedKeys(注意这是个protected字段)进行添加和删除操作,而暴露给用户的是publicSelectedKeys只能进行删除操作(看selectedKeys方法)。

默认实现用的是HashSet

public abstract class SelectorImpl extends AbstractSelector {
    protected Set<SelectionKey> selectedKeys = new HashSet();
    protected HashSet<SelectionKey> keys = new HashSet();
    private Set<SelectionKey> publicKeys;
    private Set<SelectionKey> publicSelectedKeys;

    protected SelectorImpl(SelectorProvider var1) {
        super(var1);
        if (Util.atBugLevel("1.4")) {
            this.publicKeys = this.keys;
            this.publicSelectedKeys = this.selectedKeys;
        } else {
            this.publicKeys = Collections.unmodifiableSet(this.keys);
            this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
        }

    }
    
    public Set<SelectionKey> selectedKeys() {
        if (!this.isOpen() && !Util.atBugLevel("1.4")) {
            throw new ClosedSelectorException();
        } else {
            return this.publicSelectedKeys;
        }
    }
}

再来看看SelectedSelectionKeySet,这里用的数据结构是数组,很显然netty这里的优化是因为数组的元素添加和遍历操作比HashSet更快。有同学对这里有两个数组的存在表示疑问,大家不用操心这个问题了,因为我也没看懂为什么会有两个数组,还要交替使用。netty在后来的版本中做了优化,只用一个数组就实现了。所以这里只要知道netty用数组进行优化就可以了。

那么又一个问题来了,为什么jdk里边不用数组或者List,而用HashSet呢,那是因为selectorselect方法每次调用都会把已经准备好的SelectionKey放入selectedKeys中,如果用户在第一次调用select方法之后没有处理相应Channel的事件的,也没有删除selectedKeys中的元素,那么再次调用select之后,相同的SelectionKey会再次加入selectedKeys中,如果selectedKeys使用数组或者List实现将起不到去重的效果。

另一方面,如果使用List或者数组,删除成本也比较高。

而netty的实现中保证不会在连续调用两次select方法之间不删除selectedKeys中的元素,而且netty直接将selectedKeys暴露出来,在删除的时候可以直接将数组中对应索引的元素设置为null。

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;
}

3.2 对任务队列的优化

SingleThreadEventExecutor的构造方法中taskQueue属性通过调用newTaskQueue方法产生,newTaskQueue方法在NioEventLoop中被覆盖了,我们看一下。

这里被优化成了MpscQueue,全称是MultiProducerSingleConsumerQueue,即多生产者单消费者队列,感兴趣的同学自己去看一下,既然单独做一个这样的队列出来,在当前场景下自然是比BlockingQueue性能更好了。

为什么可以做这样的优化呢,很显然,这里的队列消费者只有一个那就是EventLoop,而生产者会有多个。并且,这里有一行注释This event loop never calls takeTask(),这就说明这个MultiProducerSingleConsumerQueue并没有阻塞的take方法,而正好NioEventLoop也不需要调用阻塞的take方法,正好适合。

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return PlatformDependent.newMpscQueue(maxPendingTasks);
}

同样的,tailTask这个队列也被优化成了MpscQueue。我们看一下scheduledTaskQueue是什么队列,答案在AbstractScheduledEventExecutor#scheduledTaskQueue方法中,我们看到scheduledTaskQueue仅仅是一个普通的优先级队列,甚至都不是一个线程安全的阻塞队列。

Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
    }
    return scheduledTaskQueue;
}

为什么呢,为什么tailTaskstaskQueueMpscQueue,而scheduledTaskQueue是非线程安全的队列呢?可能是因为没有的带优先级功能的MpsQueue吧。

scheduledTaskQueue是非线程安全的队列,会不会有多线程安全问题呢,答案是不会。我们看一下AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)方法,这里在添加定时任务时,如果是非EventLoop线程调用,则会发起一个异步调用,最终往scheduledTaskQueue添加定时任务的还是EventLoop线程。所以呢,这里又有另一个优化,那就是可以延迟初始化scheduledTaskQueue

4 总结

画重点来了,本文的重点就这两个。

  • 每个EventLoop中有3个队列,分别是tailTaskstaskQueuescheduledTaskQueue。而且tailTaskstaskQueue队列在NioEventLoop中的被优化为MultiProducerSingleConsumerQueue
  • netty对selector中的selectedKeys做了优化,从HashSet替换为SelectedSelectionKeySetSelectedSelectionKeySet是用数组实现的Set,添加元素和遍历效率更高。

关于作者

王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容