【netty学习笔记一】ServerBootstrap启动过程

这篇主要想分析下ServerBootstrap的启动过程,包括:

  1. netty怎么创建selector,什么时候注册OP_ACCEPT事件?
  2. netty怎么初始化各类handler?

首先我们看下经典的netty server启动代码:

DefaultThreadFactory bossThreadFactory = new DefaultThreadFactory("boss");
        DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("worker");
        EventLoopGroup bossGroup = new NioEventLoopGroup(1, bossThreadFactory);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16, workerThreadFactory);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new TimeServerHandler());
            
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            // TODO: handle exception
        }finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

可以看到ServerBootstrap的一些初始化参数,包括对应reactor线程模型的两个EventLoopGroup:bossGroup和workerGroup(这里为了区分处理连接的accept线程和处理io的worker线程)、channel类型、channel的参数、注入的handler。
启动过程主要是bind方法,最终会进入AbstractBootstrap.doBind方法

private ChannelFuture doBind(final SocketAddress localAddress) {
        //初始化和注册channel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

继续看initAndRegister方法

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //基于反射+工场创建NioServerSocketChannel类,利用之前传的NioServerSocketChannel.class参数
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            //异常处理略。。。
        }
        //注册channel到bossGroup中的某个线程
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

initAndRegister方法首先创建NioServerSocketChannel类,然后init(channel),最后注册channel到NioEventLoopGroup中,看下init方法主要做什么:

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        //以上为设置channel的属性
        //创建ChannelPipeline
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        //初始化ChannelInitializer(一次性handler),实现一些初始化操作
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
         
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

init方法首先给channel设置了一些属性,然后对channel绑定的ChannelPipeline加入了一个特殊的handler:ChannelInitializer,ChannelInitializer类的作用后面再讨论。
继续看ChannelFuture regFuture = config().group().register(channel)操作,首先拿到NioEventLoopGroup,register进入MultithreadEventLoopGroup.register方法:

public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

next方法是从EventLoopGroup中选择一个线程来注册channel,选择方式:

executors[idx.getAndIncrement() & executors.length - 1]

这里要注意一下,因为NioServerSocketChannel只有一个,所以只会注册到一个线程中,bossGroup的线程数设置多少没有意义,最终只有一个线程注册NioServerSocketChannel,即accept线程。
进入register方法(AbstractChannel.register):

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
             。。。
            //校验代码略
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    //本例是main线程,最终会进入此方法
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

重点看下register0(promise)方法,该方法所在的runnable会作为一个task放入eventLoop线程中(即boss线程,也是reactor模型中的accept线程)。

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //对channel注册0事件
                doRegister();
                neverRegistered = false;
                registered = true;
                //会触发初始化ChannelInitializer
                pipeline.invokeHandlerAddedIfNeeded();
                //设置成功,并通知相关Listener
                safeSetSuccess(promise);
                //注册完成事件
                pipeline.fireChannelRegistered();

                if (isActive()) {
                    if (firstRegistration) {
                        //第一次会进入此方法,进行激活channel,是最终注册op_accept事件的地方
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

查看注册方法doRegister:

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

可以看到这里实现了注册事件,不过只注册了0,还不是OP_ACCEPT事件。我们知道注册事件需要selector组件,那么eventLoop().unwrappedSelector()是怎么创建的呢?回到eventLoop的初始化方法中,在其父类MultithreadEventExecutorGroup的构造方法中有对每个NioEventLoop进行初始化,而NioEventLoop的构造方法如下:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        //开启selector
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

看完doRegister(),继续看pipeline.invokeHandlerAddedIfNeeded()方法,最终会调到ChannelInitializer的initChannel方法,该方法的目的是将config中的handler和ServerBootstrapAcceptor加入ChannelPipeline。initChannel方法,就会移除ChannelPipeline中的ChannelInitializer(移除代码在ChannelInitializer类的initChannel方法的finally代码中),即只做初始化功能,一次性handler。
再到safeSetSuccess(promise),这里会对promise设置成功,并通知相关listener,比如在AbstractBootstrap#doBind中的regFuture,这里后面再说。
继续到pipeline.fireChannelActive()方法,最终会到AbstractNioChannel#doBeginRead:

protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;
        
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            //第一次会进入此逻辑,此时readInterestOp=16,即accept事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

再回到regFuture的Listener事件,有个doBind操作,最终会执行到NioServerSocketChannel#doBind:

protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

这也就让NioServerSocketChannel和端口绑定上了。
再看下开头的两个问题:

  1. netty怎么创建selector,什么时候注册OP_ACCEPT事件?
    netty在创建eventLoopGroup时,会对每个eventLoop创建并绑定selector,在bossGroup中会选择一个eventLoop注册selector并初始化监听事件0,等到bind后才监听OP_ACCEPT事件。
  2. netty怎么初始化各类handler?
    通过一次性的handler(ChannelInitializer)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352