自顶向下深入分析Netty(四)--EventLoop-1

netty线程模型

我们再次回顾这幅图,通过先前的讲解,现在是不是亲切很多了。图中绿色的acceptor应该是你最熟悉的部分,之前我们在ServerBootstrap中进行了详细分析。我们知道了mainReactor是一个线程池,处理Accept事件负责接受客户端的连接;subReactor也是一个线程池,处理Read(读取客户端通道上的数据)、Write(将数据写入到客户端通道上)等事件。在这一节中,我们将深入分析这两个线程池的实现,不断完善其中的细节。我们首先从类图开始。

4.1 类图

EventLoop类图
EventLoop类图

看到这幅类图,如果你的第一印象是气势恢宏,那么恭喜你,你已经成功了一半。但不难预料的是,大多数人和我的感受是一样的:这么多类,一定很累。好在这只是第一印象,我们仔细观察,便会发现其中明显的脉络,两条线索(这里使用自下而上):NioEventLoop以及NioEventLoopGroup即线程和线程池。忽略其中大量的接口,剩余这样的两条线:

NioEventLoop --> SingleThreadEventLoop --> SingleThreadEventExecutor -->
AbstractScheduledEventExecutor --> AbstractScheduledEventExecutor --> 
AbstractEventExecutor --> AbstractExecutorService

NioEventLoopGroup --> MultithreadEventLoopGroup --> 
MultithreadEventExecutorGroup --> AbstractEventExecutorGroup

下面我们正式开始分析,依旧使用自顶向下的方法,从类图顶部向下、从线程池到线程分析。

4.2 EventExecutorGroup

EventExecutorGroup在类图中处于承上启下的位置,其上是Java原生的接口和类,其下是Netty新建的接口和类,由于它处于如此重要的位置,我们详细分析其中的方法。

4.2.1 Executor

首先看其继承自Executor的方法:

    // Executes the given command at some time in the future
    void execute(Runnable command);

只有一个简单的execute()方法,但这个方法奠定了java并发的基础,提供了异步执行任务的。

4.2.2 ExecutorService

ExecutorService的关键方法如下(其中的invoke***方法并非关键,不再列出):

    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

这些方法我们能从命名中便能知道方法的作用。我们主要看submit()方法,该方法是execute()方法的扩展,相较于execute不关心执行结果,submit返回一个异步执行结果Future。这无疑是很大的进步,但这里的Future不提供回调操作,显得很鸡肋,所以Netty将Java原生的java.util.concurrent.Future扩展为io.netty.util.concurrent.Future,我们将在之后进行介绍。

4.2.3 ScheduledExecutorService

从名字可以看出,该接口提供了一系列调度方法:

    ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
                                                long period,TimeUnit unit);
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                long delay,TimeUnit unit);

schedule()方法调度任务使任务在延迟一段时间后执行。scheduleAtFixedRate延迟一段时间后以固定频率执行任务,scheduleWithFixedDelay延迟一段时间后以固定延时执行任务。是不是有点头晕?那就对了,这里有一个例子专门治头晕。专家建议程序员应该每小时工作50分钟,休息10分钟,类似这样:

    13:00 - 13:10 休息
    13:10 - 14:00 写代码
    14:00 - 14:10 休息
    14:10 - 15:00 写代码

实现这样的调度我们可以使用(假设现在时间为13:00):

    executor.scheduleAtFixedRate(new RestRunnable(), 0 , 60, TimeUnit.MINUTES);
    executor.scheduleWithFixedDelay(new RestRunnable(), 0 , 50, TimeUnit.MINUTES);

4.2.4 EventExecutorGroup

    boolean isShuttingDown();
    Future<?> shutdownGracefully();
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    Future<?> terminationFuture();
    EventExecutor next();

EventExecutorGroup扩展的方法有5个,前四个可以从命名中推断出功能。shutdownGracefully()我们已经在Bootstrap一节中使用过,优雅关闭线程池;terminationFuture()返回线程池终止时的异步结果。重点关注next()方法,该方法的功能是从线程池中选择一个线程。EventExecutorGroup还覆盖了一些方法,我们不再列出,如果你感兴趣可以去源码里面查看,需要注意的是,覆盖的方法大部分是将Java原生的java.util.concurrent.Future返回值覆盖为io.netty.util.concurrent.Future。

4.3 线程池

4.3.1 AbstractEventExecutorGroup

AbstractEventExecutorGroup实现了EventExecutorGroup接口的大部分方法,实现都长的和下面的差不多:

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

从这段代码可以看出这个线程池和程序员有一个相同点:懒。当线程池执行一个任务或命令时,步骤是这样的:(1).找一个线程。(2).交给线程执行。

4.3.2 MultithreadEventExecutorGroup

MultithreadEventExecutorGroup实现了线程的创建和线程的选择,其中的字段为:

    // 线程池,数组形式可知为固定线程池
    private final EventExecutor[] children;
    // 线程索引,用于线程选择
    private final AtomicInteger childIndex = new AtomicInteger();
    // 终止的线程个数
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    // 线程池终止时的异步结果
    private final Promise<?> terminationFuture = 
                          new DefaultPromise(GlobalEventExecutor.INSTANCE);
    // 线程选择器
    private final EventExecutorChooser chooser;

MultithreadEventExecutorGroup的构造方法很长,我们将选出其中的关键部分分析,故不列出整体代码。如果你是处女座,这里有一个链接MultithreadEventExecutorGroup
我们先看构造方法签名:

    protected MultithreadEventExecutorGroup(int nThreads, 
                                        ThreadFactory threadFactory, Object... args)

其中的nThreads表示线程池的固定线程数。
MultithreadEventExecutorGroup初始化的步骤是:
(1).设置线程工厂
(2).设置线程选择器
(3).实例化线程
(4).设置线程终止异步结果
首先我们看设备线程工厂的代码:

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
    }
    
    protected ThreadFactory newDefaultThreadFactory(),() {
        return new DefaultThreadFactory(getClass());
    }

如果构造参数threadFactory为空则使用默认线程池,创建默认线程池使用newDefaultThreadFactory(),这是一个protected方法,可以在子类中覆盖实现。
接着我们看设置线程选择器的代码:

    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

如果线程数是2的幂次方使用2的幂次方选择器,否则使用通用选择器。下次如果有面试官问你怎么判断一个整数是2的幂次方,请甩给他这一行代码:

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

Netty实现了两个线程选择器,虽然代码不一致,功能都是一样的:每次选择索引为上一次所选线程索引+1的线程。如果你没看明白代码的含义,没关系,再看一遍。

    private interface EventExecutorChooser {
        EventExecutor next();
    }
    
    private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            return children[childIndex.getAndIncrement() & children.length - 1];
        }
    }
    
    private final class GenericEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            return children[Math.abs(childIndex.getAndIncrement() % children.length)];
        }
    }

最佳实践:线程池数量使用2的幂次方,这样线程池选择线程时使用位操作,能使性能最高。
下面我们接着分析实例化线程的步骤:

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 使用模板方法newChild实例化一个线程
            children[i] = newChild(threadFactory, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 如果不成功,所有已经实例化的线程优雅关闭
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                // 确保已经实例化的线程终止
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

实现的过程一句话描述就是:使用newChild()依次实例化线程,如果出错,关闭所有已经实例化的线程。也许你对finally中的代码有疑问,这是因为不清楚shutdownGracefully()的含义。你需要提前明白这样的事实:shutdownGracefully()只是通知线程池该关闭,但什么时候关闭由线程池决定,所以需要使用e.isTerminated()来判断线程池是否真正关闭。
实例化线程池正常完成后,Netty使用下面的代码设置异步终止结果:

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            // 线程池中的线程每终止一个增加记录数,直到全部终止设置线程池异步终止结果为成功
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

分析完MultithreadEventExecutorGroup的构造方法,我们继续分析普通方法。它的普通方法基本与下面的isTerminated()类似:

    @Override
    public boolean isTerminated() {
        for (EventExecutor l: children) {
            if (!l.isTerminated()) {
                return false;
            }
        }
        return true;
    }

总结起来就是:线程池的状态由其中的各个线程决定。明白了这点,我们使用类比的方法可以推知其他方法的实现,故不再具体分析。

4.3.3 MultithreadEventLoopGroup

MultithreadEventLoopGroup实现了EventLoopGroup接口的方法,EventLoopGroup接口作为Netty并发的关键接口,我们看其中扩展的方法:

    // 将通道channel注册到EventLoopGroup中的一个线程上
    ChannelFuture register(Channel channel);
    // 返回的ChannelFuture为传入的ChannelPromise
    ChannelFuture register(Channel channel, ChannelPromise promise);
    // 覆盖父类接口的方法,返回EventLoop
    @Override EventLoop next();

这些方法在MultithreadEventLoopGroup的具体实现很简单。register()方法选择一个线程,该线程负责具体的register()实现。next()方法使用父类实现,即使用上一节所述的选择器选择一个线程。代码如下:

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

分析完这些代码,我们关注一下线程数的默认设置。

    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", 
                Runtime.getRuntime().availableProcessors() * 2));

默认情况,线程数最小为1,如果配置了系统参数io.netty.eventLoopThreads,设置为该系统参数值,否则设置为核心数的2倍。

4.3.4 NioEventLoopGroup

NioEventLoopGroup的主要代码实现是模板方法newChild(),用来创建线程池中的单个线程,代码如下:

    @Override
    protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) 
                   throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), 
            (RejectedExecutionHandler) args[2]);
    }

关于代码中的参数含义,我们放在NioEventLoop中分析。此外NioEventLoopGroup还提供了setIoRatio()和rebuildSelectors()两个方法,一个用来设置I/O任务和非I/O任务的执行时间比,一个用来重建线程中的selector来规避JDK的epoll 100% CPU Bug。其实现也是依次设置各线程的状态,故不再列出。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容