【netty学习笔记一】ServerBootstrap启动过程

这篇主要想分析下ServerBootstrap的启动过程,包括:

  1. netty怎么创建selector,什么时候注册OP_ACCEPT事件?
  2. netty怎么初始化各类handler?

首先我们看下经典的netty server启动代码:

DefaultThreadFactory bossThreadFactory = new DefaultThreadFactory("boss");
        DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("worker");
        EventLoopGroup bossGroup = new NioEventLoopGroup(1, bossThreadFactory);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16, workerThreadFactory);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new TimeServerHandler());
            
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            // TODO: handle exception
        }finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

可以看到ServerBootstrap的一些初始化参数,包括对应reactor线程模型的两个EventLoopGroup:bossGroup和workerGroup(这里为了区分处理连接的accept线程和处理io的worker线程)、channel类型、channel的参数、注入的handler。
启动过程主要是bind方法,最终会进入AbstractBootstrap.doBind方法

private ChannelFuture doBind(final SocketAddress localAddress) {
        //初始化和注册channel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

继续看initAndRegister方法

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //基于反射+工场创建NioServerSocketChannel类,利用之前传的NioServerSocketChannel.class参数
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            //异常处理略。。。
        }
        //注册channel到bossGroup中的某个线程
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

initAndRegister方法首先创建NioServerSocketChannel类,然后init(channel),最后注册channel到NioEventLoopGroup中,看下init方法主要做什么:

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        //以上为设置channel的属性
        //创建ChannelPipeline
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        //初始化ChannelInitializer(一次性handler),实现一些初始化操作
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
         
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

init方法首先给channel设置了一些属性,然后对channel绑定的ChannelPipeline加入了一个特殊的handler:ChannelInitializer,ChannelInitializer类的作用后面再讨论。
继续看ChannelFuture regFuture = config().group().register(channel)操作,首先拿到NioEventLoopGroup,register进入MultithreadEventLoopGroup.register方法:

public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

next方法是从EventLoopGroup中选择一个线程来注册channel,选择方式:

executors[idx.getAndIncrement() & executors.length - 1]

这里要注意一下,因为NioServerSocketChannel只有一个,所以只会注册到一个线程中,bossGroup的线程数设置多少没有意义,最终只有一个线程注册NioServerSocketChannel,即accept线程。
进入register方法(AbstractChannel.register):

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
             。。。
            //校验代码略
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    //本例是main线程,最终会进入此方法
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

重点看下register0(promise)方法,该方法所在的runnable会作为一个task放入eventLoop线程中(即boss线程,也是reactor模型中的accept线程)。

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //对channel注册0事件
                doRegister();
                neverRegistered = false;
                registered = true;
                //会触发初始化ChannelInitializer
                pipeline.invokeHandlerAddedIfNeeded();
                //设置成功,并通知相关Listener
                safeSetSuccess(promise);
                //注册完成事件
                pipeline.fireChannelRegistered();

                if (isActive()) {
                    if (firstRegistration) {
                        //第一次会进入此方法,进行激活channel,是最终注册op_accept事件的地方
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

查看注册方法doRegister:

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

可以看到这里实现了注册事件,不过只注册了0,还不是OP_ACCEPT事件。我们知道注册事件需要selector组件,那么eventLoop().unwrappedSelector()是怎么创建的呢?回到eventLoop的初始化方法中,在其父类MultithreadEventExecutorGroup的构造方法中有对每个NioEventLoop进行初始化,而NioEventLoop的构造方法如下:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        //开启selector
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

看完doRegister(),继续看pipeline.invokeHandlerAddedIfNeeded()方法,最终会调到ChannelInitializer的initChannel方法,该方法的目的是将config中的handler和ServerBootstrapAcceptor加入ChannelPipeline。initChannel方法,就会移除ChannelPipeline中的ChannelInitializer(移除代码在ChannelInitializer类的initChannel方法的finally代码中),即只做初始化功能,一次性handler。
再到safeSetSuccess(promise),这里会对promise设置成功,并通知相关listener,比如在AbstractBootstrap#doBind中的regFuture,这里后面再说。
继续到pipeline.fireChannelActive()方法,最终会到AbstractNioChannel#doBeginRead:

protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;
        
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            //第一次会进入此逻辑,此时readInterestOp=16,即accept事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

再回到regFuture的Listener事件,有个doBind操作,最终会执行到NioServerSocketChannel#doBind:

protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

这也就让NioServerSocketChannel和端口绑定上了。
再看下开头的两个问题:

  1. netty怎么创建selector,什么时候注册OP_ACCEPT事件?
    netty在创建eventLoopGroup时,会对每个eventLoop创建并绑定selector,在bossGroup中会选择一个eventLoop注册selector并初始化监听事件0,等到bind后才监听OP_ACCEPT事件。
  2. netty怎么初始化各类handler?
    通过一次性的handler(ChannelInitializer)
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。