Netty源码(二)线程模型EventLoop

前言

Netty源码(一)Netty架构解析分析了netty的基本原理和工作流程,其中EventLoop是netty中最核心的组件,本文将会分析EventLoop的主要设计思想,至于具体的代码细节下一篇会具体分析。本文将主要分析NIOEventLoop基于4.1.49.Final版本。

核心思想

  1. 单线程
    之前介绍过EventLoop在它的生命周期内只和一个 Thread 绑定,一个EventLoop管理一个或者多个Channel。为什么是单线程?首先多线程编程是很难很麻烦的事情,而且为了保证线程安全加锁或者CAS都会影响执行效率,某些场景下线程数量越多导致的效率还会更低。采用单线程模型第一不用考虑线程安全,第二没有线程上下文切换带来的损耗。
  2. 执行流程
    与EventLoop所绑定的Channel的事件都由EventLoop所拥有的Thread处理,实现的核心流程如下:
        //当前调用线程是否EventLoop所属的Thread
        if (executor.inEventLoop()) {
            //do something
         } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                     //do something
                }
            });

如果当前调用线程正是支撑 EventLoop 的线程,那么所提交的任务会直接执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。EventLoop下次处理它的事件时,它会执行队列中的那些任务。这样的任务执行流程保证,Channel上的所有任务都是由它所注册的EventLoop所拥有的Thread执行,不会出现并发问题。

构造参数

NioEventLoop构造参数:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
  • parent:当前EventLoop所属的NioEventLoopGroup;
  • executor:为EventLoop创建线程,并且执行管理该线程生命周期的任务,默认为ThreadPerTaskExecutor;
  • selectorProvider:NIO中selector的帮助类;
  • selectStrategy:对selector.select()提供选择策略;
  • queueFactory:创建存放任务队列的工厂。

从构造参数可以大概知道NioEventLoop的职责,前面提过一个NioEventLoopGroup管理一个或者多个NioEventLoop,parent指向NioEventLoop所属的NioEventLoopGroup。
NioEventLoop的Thread由executor去创建管理,这个thread只做三件事情:

  1. select: 返回EventLoop所持有的selector上已经准备就绪的Channel;
  2. processSelectedKeys: 处理准备就绪的IO操作;
  3. ranTasks: 执行队列里的任务;

由selectorProvider提供IO操作,queueFactory创建队列存放待执行的任务。

Executor

EventLoop类图

可见EventLoop继承自Executor,Executor的核心思想是将任务调用,和任务自身的执行逻辑给分离开,使用Executor去执行任务,调用方只需要关注自身任务的逻辑,把任务执行交给Executor去处理,将职责分离开。
JDK中自带的线程池采用池化的思想,基本思路是从池中取一个空闲的线程去执行任务,执行完之后将线程返回空闲列表,让其可以重复使用。因为线程是很珍贵的资源,采用池化的技术可以减少线程创建、销毁的损耗,但不能减少线程上下文切换所带来的消耗,随着线程数量的增加这种消耗会更多。所以EventLoop采用单线程的模型去实现Executor避免多线程的上下文切换,和多线程的并发编程问题。

Feature

Netty中所有操作都是异步进行的,实现异步编程很关键的对象就是Feature,因为异步操作都是立即返回的不会等待任务完成,所以需要有一种机制去表示异步执行的结果 ,这种机制就是Feature
JDK Feature

public interface Future<V> {
    //取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    //任务是否取消
    boolean isCancelled();
    //任务是否执行完成
    boolean isDone();
    //阻塞获取执行结果
    V get() throws InterruptedException, ExecutionException;
    //指定时间内获取执行结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以看出jdk的Feature接口提供了对异步执行结果的几个操作,取消、获取结果、判断是否完成等。但是这有个缺点就是调用方其实不知道任务什么时候结束,没有一种机制去通知,只有自己不断的去检测或者阻塞等待执行完成,这都不是很好的方法。所以netty自己实现了一套Feature,相比jdk的Feature对重要的就是提供了一种通知机制。

Netty Feature

public interface Future<V> extends java.util.concurrent.Future<V> {
    //省略其他方法
    ......
    //添加监听器
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    //添加多个监听器
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

     //移除监听器
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

   //移除多个监听器
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
}

Netty的Feature与jdk自带的Feature,最大的不同就是Listener。使用是一种观察者模式,将执行结果通知出去。

GenericFutureListener

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

GenericFutureListener只有一个operationComplete方法,通过实现GenericFutureListener接口的operationComplete调用方就能知道异步执行的结果。

MpscUnboundedArrayQueue

前面提到,NioEventLoop中有一个用于存放任务的队列,如果没有指定特定的queueFactory,默认为MpscUnboundedArrayQueue
MpscUnboundedArrayQueue是JCTools所提供的的一种队列,JCTools提供了几种目前jdk没有实现的队列MpscUnboundedArrayQueue就是其中一种。从名字可知(MPSC - Multi Producer Single Consumer )多生产者单消费者,是一种使用于多生产者单消费者场景的队列。而EventLoop正符合这种多生产者单消费者的场景,所以采用MpscUnboundedArrayQueue队列来存放任务。
下面说一下MpscUnboundedArrayQueue的几个好处,具体细节就不深入研究了
1.缓存行填充
为什么要有缓存行填充?首先要知道“伪共享”的概念,伪共享和 CPU 内部的 Cache 有关,Cache 内部是按照缓存行(Cache Line)管理的,缓存行的大小通常是 64 个字节。CPU加载数据是按照一个缓存行为单位加载的。

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

如上图如果出队索引和入队索引用32位的int类型,CPU会把takeIndex、putIndex加载到一个缓存行。当一个入队操作的时候会修改putIndex,导致整个缓存行失效,需要重新加载到缓存中,但是入队操作并不会修改takeIndex,由于putIndex与takeIndex共享同一个缓存行,所以也会导致takeIndex失效,这种情况就是伪共享。MpscUnboundedArrayQueue采用让变量独占缓存行的形式解决伪共享的问题。

abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
{
    long p01, p02, p03, p04, p05, p06, p07;
    long p10, p11, p12, p13, p14, p15, p16, p17;
}
  1. 无锁
    首先offer方法,这里只分析关键部分
while (true)
        {
          //省略代码
          ......
          //cas设置生产者索引
            if (casProducerIndex(pIndex, pIndex + 2))
            {
                break;
            }
        }
        // INDEX visible before ELEMENT
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        //放置元素
        soRefElement(buffer, offset, e); // release element e
        return true;
    }

offer方法采用cas的方式去放入元素,而poll由于是单线程消费,无需加锁和cas操作。

EventLoopGroup

EventLoopGroup是对EventLoop进行管理,那么EventLoopGroup是如何对EventLoop进行管理的呢?先看一个简单的execute方法

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

是调用next返回一个对象后执行的execute方法,跟进去看一下next方法

    /**
     * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
     */
    EventExecutor next();

返回一个EventExecutorGroup所管理的EventExecutor对象,对于EventLoopGroup来说next返回的就是所管理的EventLoop对象,next方法就是EventLoopGroup管理的EventLoop关键方法。
最后找一下next的实现,在MultithreadEventExecutorGroup类中

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

chooser变量

private final EventExecutorChooserFactory.EventExecutorChooser chooser;

最后再看一下EventExecutorChooserFactory的实现,只有一个实现类EventExecutorChooserFactory

/**
 * Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
 */
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

很简单的实现,按照管理executors数量的奇偶数返回不同的策略,核心逻辑就是循环依次返回executors中的executor。

总结

本文主要分析了Netty的线程模型EventLoop,最重要的要关注的点就是EventLoop是单线程模型,所有提交给EventLoop都需要判断调用线程是否为EventLoop所持有的thread,如果是直接执行,否则添加到EventLoop的任务队列里面。这一部分个人感觉是netty中设计很精妙的一点,如果要使用多线程就用EventLoopGroup去管理EventLoop,EventLoopGroup中的EventLoop又互相隔离,互不影响也不会出现线程安全的问题。
本文主要分析了EventLoop的继承关系和一些设计思想,并没有具体去分析一些代码细节,下篇文章会具体分析EventLoop的thread所负责的三件事情:

  1. select: 返回EventLoop所持有的selector上已经准备就绪的Channel;
  2. processSelectedKeys: 处理准备就绪的IO操作;
  3. ranTasks: 执行队列里的任务。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354