Netty线程模型及EventLoop详解

作者: 一字马胡
转载标志 【2017-11-03】

更新日志

日期 更新内容 备注
2017-11-03 添加转载标志 持续更新

线程模型与并发

什么是线程模型呢?线程模型指定了线程管理的模型。在进行并发编程的过程中,我们需要小心的处理多个线程之间的同步关系,而一个好的线程模型可以大大减少管理多个线程的成本。在阅读本文之前,你可以选择性的阅读下面列出的文章,来快速了解和回顾java中的并发编程内容:

Reactor线程模型

Reactor是一种经典的线程模型,Reactor线程模型分为单线程模型、多线程模型以及主从多线程模型。下面分别分析一下各个Reactor线程模型的优缺点。首先是Reactor单线程模型,下面的图片展示了这个线程模型的结构:

Reactor单线程模型

Reactor单线程模型仅使用一个线程来处理所有的事情,包括客户端的连接和到服务器的连接,以及所有连接产生的读写事件,这种线程模型需要使用异步非阻塞I/O,使得每一个操作都不会发生阻塞,Handler为具体的处理事件的处理器,而Acceptor为连接的接收者,作为服务端接收来自客户端的链接请求。这样的线程模型理论上可以仅仅使用一个线程就完成所有的事件处理,显得线程的利用率非常高,而且因为只有一个线程在工作,所有不会产生在多线程环境下会发生的各种多线程之间的并发问题,架构简单明了,线程模型的简单性决定了线程管理工作的简单性。但是这样的线程模型存在很多不足,比如:

  • 仅利用一个线程来处理事件,对于目前普遍多核心的机器来说太过浪费资源
  • 一个线程同时处理N个连接,管理起来较为复杂,而且性能也无法得到保证,这是以线程管理的简洁换取来的事件管理的复杂性,而且是在性能无 法得到保证的前提下换取的,在大流量的应用场景下根本没有实用性
  • 根据第二条,当处理的这个线程负载过重之后,处理速度会变慢,会有大量的事件堆积,甚至超时,而超时的情况下,客户端往往会重新发送请求,这样的情况下,这个单线程的模型就会成为整个系统的瓶颈
  • 单线程模型的一个致命缺钱就是可靠性问题,因为仅有一个线程在工作,如果这个线程出错了无法正常执行任务了,那么整个系统就会停止响应,也就是系统会因为这个单线程模型而变得不可用,这在绝大部分场景(所有)下是不允许出现的

介于上面的种种缺陷,Reactor演变出了第二种模型,也就是Reactor多线程模型,下面展示了这种模型:

Reactor多线程模型

可以发现,多线程模型下,接收链接和处理请求作为两部分分离了,而Acceptor使用单独的线程来接收请求,做好准备后就交给事件处理的handler来处理,而handler使用了一个线程池来实现,这个线程池可以使用Executor框架实现的线程池来实现,所以,一个连接会交给一个handler线程来复杂其上面的所有事件,需要注意,一个连接只会由一个线程来处理,而多个连接可能会由一个handler线程来处理,关键在于一个连接上的所有事件都只会由一个线程来处理,这样的好处就是消除了不必要的并发同步的麻烦。Reactor多线程模型似乎已经可以很好的工作在我们的项目中了,但是还有一个问题没有解决,那就是,多线程模型下任然只有一个线程来处理客户端的连接请求,那如果这个线程挂了,那整个系统任然会变为不可用,而且,因为仅仅由一个线程来负责客户端的连接请求,如果连接之后要做一些验证之类复杂耗时操作再提交给handler线程来处理的话,就会出现性能问题。

Reactor多线程模型对Reactor单线程模型做了一些改进,但是在某些场景下任然有所缺陷,所以就有了第三种Reactor模型,Reactor主从多线程模型,下面展示了这种模型的架构:

Reactor主从多线程模型

Reactor多线程模型解决了Reactor单线程模型和Reactor多线程模型中存在的问题,解决了handler的性能问题,以及Acceptor的安全以及性能问题,Netty就使用了这种线程模型来处理事件。

Netty线程模型

在了解了线程模型以及Reactor线程模型之后,我们来看一下Netty的线程模型是怎么样的。首先,Netty使用EventLoop来处理连接上的读写事件,而一个连接上的所有请求都保证在一个EventLoop中被处理,一个EventLoop中只有一个Thread,所以也就实现了一个连接上的所有事件只会在一个线程中被执行。一个EventLoopGroup包含多个EventLoop,可以把一个EventLoop当做是Reactor线程模型中的一个线程,而一个EventLoopGroup类似于一个ExecutorService,当然,这只是为了更好的理解Netty的线程模型,它们之间是没有等价关系的,后面的分析中会详细讲到。下面的图片展示了Netty的线程模型:

Netty线程模型

首先看一下Netty服务端启动的代码:


        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(your_handler_name, your_handler_instance);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

Netty的服务端使用了两个EventLoopGroup,而第一个EventLoopGroup通常只有一个EventLoop,通常叫做bossGroup,负责客户端的连接请求,然后打开Channel,交给后面的EventLoopGroup中的一个EventLoop来负责这个Channel上的所有读写事件,一个Channel只会被一个EventLoop处理,而一个EventLoop可能会被分配给多个Channel来负责上面的事件,当然,Netty不仅支持NI/O,还支持OI/O,所以两者的EventLoop分配方式有所区别,下面分别展示了NI/O和OI/O的分配方式:

Netty NIO分配EventLoop模型
Netty OIO分配EventLoop模型

在NI/O非阻塞模式下,Netty将负责为每个Channel分配一个EventLoop,一旦一个EventLoop呗分配给了一个Channel,那么在它的整个生命周期中都使用这个EventLoop,但是多个Channel将可能共享一个EventLoop,所以和Thread相关的ThreadLocal的使用就要特别注意,因为有多个Channel在使用该Thread来处理读写时间。在阻塞IO模式下,考虑到一个Channel将会阻塞,所以不太可能将一个EventLoop共用于多个Channel之间,所以,每一个Channel都将被分配一个EventLoop,并且反过来也成立,也就是一个EventLoop将只会被绑定到一个Channel上来处理这个Channel上的读写事件。无论是非阻塞模式还是阻塞模式,一个Channel都将会保证一个Channel上的所有读写事件都只会在一个EventLoop上被处理。

Netty EventLoop

上文中分析了Reactor线程模型以及Netty的线程模型,在Netty中,EventLoop是一个极为重要的组件,它翻译过来称为事件循环,一个EventLoop将被分配给一个Channel,来负责这个Channel的整个生命周期之内的所有事件,下面来分析一下EventLoop的结构和实现细节。首先展示了EventLoop的类图:

EventLoop类图

从EventLoop的类图中可以发现,其实EventLoop继承了Java的ScheduledExecutorService,也就是调度线程池,所以,EventLoop应当有ScheduledExecutorService提供的所有功能。那为什么需要继承ScheduledExecutorService呢,也就是为什么需要延时调度功能,那是因为,在Netty中,有可能用户线程和Netty的I/O线程同时操作网络资源,而为了减少并发锁竞争,Netty将用户线程的任务包装成Netty的task,然后向Netty的I/O任务一样去执行它们。有些时候我们需要延时执行任务,或者周期性执行任务,那么就需要调度功能。这是Netty在设计上的考虑,为我们极大的简化的编程方法。

EventLoop是一个接口,它在继承了ScheduledExecutorService等多个类的同时,仅仅提供了一个方法parent,这个方法返回它属于哪个EventLoopGroup。本文只分析非阻塞模式,而阻塞模式留到未来某个合适的时候再做分析总结。在上文中展示的服务端启动的代码中我们发现我们使用的EventLoop是一个子类NioEventLoopGroup,下面就来分析一下NioEventLoopGroup这个类。首先展示一下NioEventLoopGroup的类图:

NioEventLoopGroup类图

可以发现,NioEventLoopGroup的实现非常的复杂,但是只要我们清楚了Netty的线程模型,我们就可以有入口去分析它的代码。首先,我们知道每个EventLoop只要一个Thread来处理事件,那我们就来找到那个Thread在什么地方。可以在SingleThreadEventExecutor类中找到thread,它的初始化在doStartThread这个方法中,而这个方法被startThread方法调用,而startThread 这个方法被execute方法调用,也就是提交任务的入口,这个方法是Executor接口的唯一方法。也就是说,所有我们通过EventLoop的execute方法提交的任务都将被这个Thread线程来执行。我们还知道一个事实,EventLoop是一个循环执行来消耗Channel事件的类,那么它必然会有一个类似循环的方法来作为任务,来提交给这个Thread来执行,而这可以在doStartThread方法中被发现,因为这个方法非常重要,所以下面展示了它的实现细节,但是去掉了一些代码来减少代码量:


    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

上面所提到的事件循环就是通过SingleThreadEventExecutor.this.run()这句话来触发的。这个run方法的具体实现在NioEventLoop中,下面展示了它的实现代码:


    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

首先,我们来分析一下NioEventLoop的相关细节,在一个无限循环里面,只有在遇到shutdown的情况下才会停止循环。然后在循环里会询问是否有事件,如果没有,则继续循环,如果有事件,那么就开始处理时间。上文中我们提到,在事件循环中我们不仅要处理IO事件,还要处理非I/O事件。Netty中可以设置用于I/O操作和非I/O操作的时间占比,默认各位50%,也就是说,如果某次I/O操作的时间花了100ms,那么这次循环中非I/O得任务也可以花费100ms。Netty中的I/O时间处理通过processSelectedKeys方法来进行,而非I/O操作通过runAllTasks反复来进行,首先来看runAllTasks方法,虽然设定了一个可以运行的时间参数,但是实际上Netty并不保证能精确的确保非I/O任务只运行设定的毫秒,下面来看下runAllTasks的代码:


    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

    // 将任务运行起来
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }

可以看到,这个方法是在每运行了64个任务之后再进行比较的,如果超出了设定的运行时间则退出,否则再运行64个任务再比较。所以,Netty强烈要求不要在I/O线程中运行阻塞任务,因为阻塞任务将会阻塞住Netty的事件循环,从而造成事件堆积的现象。现在回头看处理I/O任务的processSelectedKeys方法,跟踪代码之后发现最后实际处理I/O事件的一个方法为processSelectedKey,下面展示了它的代码:


    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }


这个方法运行的流程为:

  1. 从Channel上获取一个unsafe对象,这个对象 是用来进行NIO操作的一系列系统级API,关于Netty的Channel的深层次分析将在另外的篇章中进行
  2. 从Channel上获取了eventLoop,而这个eventLoop是什么时候分配给Channel的细节在后文中进行分析
  3. 根据事件调用底层API来处理事件

下面,我们分析一下是什么时候将一个EventLoop分配给一个Channel的,并且这个EventLoop的那个唯一的Thread是什么时候被赋值的。在这个问题上,服务端的流程和客户端的流程可能不太一样,对于服务端来说,首先需要bind一个端口,然后在进行Accept进来的连接,而客户端需要进行connect到服务端。先来分析一下服务端。

还是看上面提供的服务端的示例代码,其中启动的代码为下面这句代码:


 // Start the server.
 ChannelFuture f = b.bind(PORT).sync();

也就是我们网络编程中的bind操作,这个操作会发生什么呢?追踪代码如下:

 
 -> AbstractBootstrap.bind(port)
 -> AbstractBootstrap.bind(address) 
 -> AbstractBootstrap.doBind(final SocketAddress localAddress) 
 -> AbstractBootstrap.initAndRegister 
 -> AbstractBootstrap.doBind0 
 -> SingleThreadEventExecutor.execute 
 -> SingleThreadEventExecutor.startThread()
 -> SingleThreadEventExecutor.doStartThread

EventLoop在AbstractBootstrap.initAndRegister中获得了一个新的Channel,然后在AbstractBootstrap.doBind0 方法里面调用接下来的方法来初始化EventLoop的Thread的工作,并且将EventLoop的时间循环打开了,可以开始接收客户端的连接请求了。下面来分析一下客户端的流程。

一个客户端的启动代码示例:


        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }

其中启动的关键代码为:


 // Start the client.
 ChannelFuture f = b.connect(HOST, PORT).sync();

下面是connect的调用流程:

 -> Bootstrap.doResolveAndConnect
 -> AbstractBootstrap.initAndRegister 
 -> Bootstrap.doResolveAndConnect0
 -> Bootstrap.doConnect
 -> SingleThreadEventExecutor.execute 
 -> SingleThreadEventExecutor.startThread()
 -> SingleThreadEventExecutor.doStartThread

后半部分和服务端的启动过程是一致的,而区别在于服务端是通过bind操作来启动的,而客户端是通过connect操作来启动的。执行到此,客户端和服务端的EventLoop都已经启动起来,服务端可以接受客户端的连接并且处理Channel上的读写事件,而客户端可以去连接远程服务端来请求数据。

EventLoopGroup

到目前为止,我们已经知道了Reactor多个线程模型,并且知道了一个EventLoop会负责一个Channel的生命周期内的所有事件,并且知道了服务端和客户端是如何启动这个EventLoop得,但是还有一个问题没有解决,那就是一个EventLoop是如何被分配给一个Channel的。下文就来分析这个分配的原理和过程。而对于阻塞I/O模型的分配和非阻塞I/O模型的分配是不一样的,在上文中也提到这个内容,所以本文只分析对于非阻塞I/O模型的分配。

EventLoopGroup是用来管理EventLoop的对象,一个EventLoopGroup里面有多个EventLoop,下面展示了EventLoopGroup的类图:

EventLoopGroup类图

我们从实际的代码出发来分析EventLoopGroup。上文中已经展示了客户端和服务端的启动代码,其中有类似的代码如下:


        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

上文中我们分析了EventLoop被启动的过程,我们肯定,EventLoop是在分配之后启动的,因为对于服务端而言,bind是一个最开始的网络操作,对于客户端来说,connect也是最开始的网络操作,在这之前是没有关于网络I/O的操作的,所以,EventLoop的分配和启动是在这两个过程或者之后的流程中进行的,但是EventLoop的分配肯定是在启动之前的,但是EventLoop的分配和启动在bind和connect中进行,那么我们可以肯定,EventLoop的分配也是在这两个方法中进行的。为了证明这个假设,回头再看一下服务端的EventLoop的启动过程,其中有一个方法值得我们注意:AbstractBootstrap.initAndRegister,我们进行了init部分的分析,而register部分我们还没有分析,下面就对服务端来进行register部分的分析,下面展示了register的调用链路:


 -> Bootstrap.doResolveAndConnect
 -> AbstractBootstrap.initAndRegister 
 -> EventLoopGroup.register
 -> MultithreadEventLoopGroup.register
 -> SingleThreadEventLoop.register
 -> Channel.register
 -> AbstractUnsafe.register
 
         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

最后展示了AbstractUnsafe.register这个方法,在这里初始化了一个EventLoop,需要记住的一点是,EventLoopGroup中的是EventLoop,不然在追踪代码的时候会迷失。现在来正式看一下NioEventLoopGroup这个类,它的它继承了MultithreadEventExecutorGroup这个类,而我们在初始化EventLoopGroup的时候传递进去的参数,也就是我们希望这个EventLoopGroup拥有的EventLoop数量,会在MultithreadEventExecutorGroup这个类中初始化,并且是在构造函数中初始化的,如果在new EventLoopGroup的时候没有任何参数,那么默认的EventLoop的数量是机器CPU数量的两倍。现在我们来看一下MultithreadEventExecutorGroup这个类的一个重要的构造函数,这个构造函数初始化了EventLoopGroup的EventLoop。


 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

一个较为重要的方法为newChild,这是初始化一个EventLoop的方法,下面是它的具体实现,假设我们使用NioEventLoop:


    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

我们现在知道了EventLoopGroup管理着很多的EventLoop,上文中我们仅仅分析了分配的流程,但是分配的策略还没有分析,现在来分析一下EventLoopGroup是如何分配EventLoop给Channel的,我们仅分析非阻塞I/O下的分配策略,阻塞模式下的分配策略可以参考非阻塞下的分配策略。

在MultithreadEventLoopGroup.register方法中,调用了next()方法,我们来看一下这个流程:


  -> MultithreadEventExecutorGroup.next()
  
    public EventExecutor next() {
        return chooser.next();
    }

chooser是什么东西?


 private final EventExecutorChooserFactory.EventExecutorChooser chooser;

它是怎么初始化的呢?


    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

这是它初始化最后调用的方法,这个方法在DefaultEventExecutorChooserFactory中被实现,这个参数是MultithreadEventExecutorGroup类中的children,也就是EventLoopGroup中的所有EventLoop,那这个newChooser得分配方法就是如果EventLoop的数量是2的n次方,那么就使用PowerOfTwoEventExecutorChooser来分配,否则使用GenericEventExecutorChooser来分配。这两个策略类的分配方法实现分别如下:

     
     1、PowerOfTwoEventExecutorChooser
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }

    2、GenericEventExecutorChooser
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }

所以,到此为止,我们可以解决为什么一个EventLoop会被分配给多个Channel的疑惑。本文到此也就结束了。篇幅较长,内容涉及到Reactor的三种线程模型,然后分析了Netty的线程模型,然后分析了Netty的EventLoop,以及EventLoopGroup,以及分析了EventLoop是怎么被分配给一个Channel的,和一个EventLoop是如何启动起来来处理事件的。最后分析了EventLoopGroup分配EventLoop的策略,对于本文涉及的内容的更为深入的分析总结,将在未来的某个适宜的时刻进行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容