【Netty源码系列】服务端启动流程(一)创建线程池组

【相关源码都是出自4.1.55.Final-SNAPSHOT版本】

在学习源码之前,先看下官方的example是怎样做的(以下代码删减了部分不必要代码和添加部分中文注释)

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    public static void main(String[] args) throws Exception {
        // 创建线程池组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            // 创建 ServerBootstrap 对象
            ServerBootstrap b = new ServerBootstrap();
           
            b.group(bossGroup, workerGroup) // 绑定线程池组
             .channel(NioServerSocketChannel.class) // 服务端channel类型
             .option(ChannelOption.SO_BACKLOG, 100) // TCP配置
             .handler(new LoggingHandler(LogLevel.INFO)) // 服务端Handler
             .childHandler(new ChannelInitializer<SocketChannel>() { // 客户端Handler
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });
            // 绑定端口并启动
            ChannelFuture f = b.bind(PORT).sync();
            // 等待直到服务端channel关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅关闭线程池
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

简单来说,启动Netty服务端需要以下几步:

  • 创建线程池组
  • 创建并初始化ServerBootstrap对象
  • 绑定端口并启动
  • 优雅关闭线程组

接下来的服务端启动源码剖析也是按照这四步逐一展开...

创建线程池组

首先第一步需要创建线程组,一个线程池是专门用来负责接收客户端连接,另外一个线程池是专门负责处理客户端的请求,分别对应以下bossGroup和workerGroup

   EventLoopGroup bossGroup = new NioEventLoopGroup(1);
   EventLoopGroup workerGroup = new NioEventLoopGroup();

实际上两种线程池的创建过程都是一样的,无参构造器创建出来的线程数默认是当前启动服务端的服务器CPU核数 * 2,有参构造器就是按照指定数值,创建对应的线程。

new NioEventLoopGroup()位置debug进去,一直debug到MultithreadEventLoopGroup类可以发现,如果没有指定线程数,则会默认使用DEFAULT_EVENT_LOOP_THREADS参数的值创建线程。

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
}

然后继续debug进去,直到最后是MultithreadEventExecutorGroup类的相关构造器

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        // 删减参数判断代码

        // 如果 executot 为空,则使用netty默认的线程工厂——ThreadPerTaskExecutor
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        // 初始化线程池
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 创建线程池数组
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        // 优雅关闭线程池中所有的线程
                        children[j].shutdownGracefully();
                    }
                    // 省略异常处理逻辑
                    }
                }
            }
        }

        // 初始化线程选择器
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        // 为每个线程添加关闭监听器
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        // 所有的单例线程添加到HashSet中
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

总体来说,该方法主要执行以下几个步骤:

  1. 初始化线程工厂(如果参数executor为空则默认使用newDefaultThreadFactory线程工厂)
  2. 初始化指定数量的线程池
  3. 调用newChild方法创建线程池内的线程
  4. 初始化线程选择器
  5. 为每个线程添加关闭监听器

其中newChild是创建线程的核心方法,debug进去看一下是如何创建线程的

由于线程池是由NioEventLoopGroup创建的,调用newChild方法实际上是调用NioEventLoopGroup类重写的方法

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        // 创建nioEventLoop
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }

至于创建nioEventLoop的细节,留到之后再讨论。先简单理顺一下前面创建线程池组的思路。

  1. 确定线程池的大小(如果没有指定线程池的大小,默认使用DEFAULT_EVENT_LOOP_THREADS大小)
  2. 通过MultithreadEventExecutorGroup类初始化线程池,其中调用newChild方法创建线程池内的每一个线程对象

以上是创建线程池组的大致流程,其中我省略了很多步骤和方法,这个就留给读者一步一步的debug,因为阅读源码还是需要靠自己动手,纯看别人的博客或者视频是很难学下去的...

线程池组创建完成后,接下来就需要创建ServerBootstrap对象。欲知后事如何,请看下篇:【Netty源码系列】服务端启动流程(二)创建并初始化ServerBootstrap对象

如果觉得文章不错的话,麻烦点个赞哈,你的鼓励就是我的动力!对于文章有哪里不清楚或者有误的地方,欢迎在评论区留言~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容