Netty源码笔记(一)Netty服务端处理流程

最近开始了解Netty源码。此系列文章在于对Netty设计思想的个人理解做一些整理,以及关于网络通信延伸的一些思考。
备注:
Netty版本:4.1.6.Final
启动一个Netty服务端,代码示例:

        ServerBootstrap bootstrap = new ServerBootstrap();
                EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(0);
        bootstrap.group(bossGroup , workerGroup );
        bootstrap.channel(NioServerSocketChannel.class);
                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("decode", new MessageDecoder());
                pipeline.addLast("encode", new ClientMessageEncoder());
            }
        });
                
        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            bootstrap.bind(port).sync();
        } catch (Exception e) {
            logger.error("Started Netty Server Failed:" + port, e);
        }

EventLoopGroup 为Netty EventLoop线程组,服务端主要用来处理Accept事件,IO事件。默认线程数为CPU内核数*2。
NioServerSocketChannel是对java ServerSocketChannel进行了一层包装。
ChannelPipeline.addLast添加执行器链。
childOption设置TCP连接属性。

接下来从NioServerSocketChannel的初始化、端口的绑定、ChannelPipeline中处理器链的添加、Accept事件监听和初始化子连接四个方面来拆解Netty服务端处理流程。
本文不会分析每一个方法调用,只截取主干函数和主要代码便于迅速理解。

ServerBootstrap可以理解为一个引导类。


image.png

1》:代码入口AbstractBootstrap.doBind(SocketAddress localAddress)

    private ChannelFuture doBind(final SocketAddress localAddress) {
        //详细逻辑见代码片段2》
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        ////详细逻辑和说明代码片段5》
        doBind0(regFuture, channel, localAddress, promise);
    }

2》:initAndRegister()完成NioServerSocketChannel初始化。


image.png
final ChannelFuture initAndRegister() {
        //实例化NioServerSocketChannel,详细逻辑见代码片段3》
        Channel channel = channelFactory.newChannel();
        //详细逻辑见代码片段4》
        init(channel);
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
}

3》:ReflectiveChannelFactory基于反射创建NioServerSocketChannel实例并进行一系列初始化

    //bootstrap.channel(NioServerSocketChannel.class);配置了ServerSocketChannel的包装类
    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    public NioServerSocketChannel(ServerSocketChannel channel) {
        //感兴趣的事件类型,后续会注册到selector上
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        ch.configureBlocking(false);
    }
    //创建unsafe和DefaultChannelPipeline
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

4》:初始化NioServerSocketChannel的option, attr,并在上一步已经创建的pipeline对象中添加ServerBootstrapAcceptor处理器。关于ServerBootstrapAcceptor的实现逻辑在后文介绍,这里只需要注意在NioServerSocketChannel的pipeline中添加了一个这么一个处理器,处理的消息对象为ServerSocketChannel的Accept事件建立的子连接SocketChannel。

//初始化ServerSocketChannel的并在NioServerSocketChannel实添加一个handler
//
void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
      });
}

5》:上文我们介绍了NioServerSocketChannel的初始化。接下来我们来看doBind0关于端口的绑定。

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    //绑定端口
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    //AbstractChannel
    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
    //DefaultChannelPipeline
    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }
    //AbstractChannelHandlerContext
    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        //此处查到的处理器为HeadContext
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
    //AbstractChannelHandlerContext
    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }
    //DefulatChannelPipeline
        @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }
    //最终调用NioServerSocketChannel的doBind方法绑定端口
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

6》:上文已经介绍了NioServerSocketChannel的初始化、端口的绑定、ChannelPipeline中处理器链的添加。接下来开始最后一部分Accept事件监听和子连接SocketChannel的初始化、对应pipleline的处理器配置、注册selector。逻辑流程如下:

//NioEventLoop中循环处理SelectedKeys
    protected void run() {
        for (;;) {
            processSelectedKeys();
        }
    }
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            for (int i = 0;; i ++) {
                final SelectionKey k = selectedKeys[i];
                processSelectedKey(k, (AbstractNioChannel) a);
            }
    }
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
          if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
          }
    }
    //SelectionKey.OP_ACCEPT对应的unsafe实现为NioServerSocketChannel对应的NioMessageUnsafe,读取SocketChannel到          
    //readBuf集合中
    //SelectionKey.OP_READ对应的unsafe实现为NioSocketChannel对应的NioByteUnsafe,读取IO事件消息体
    //此处我们暂且分析NioMessageUnsafe
    private final List<Object> readBuf = new ArrayList<Object>();
    public void read() {
           int localRead = doReadMessages(readBuf);
           int size = readBuf.size();
           for (int i = 0; i < size; i ++) {
                readPending = false;
                //触发pipeline处理器链,主要是ServerBootstrapAcceptor处理器初始化
                pipeline.fireChannelRead(readBuf.get(i));
           }
    }
    //在doReadMessages方法中调用accept连接
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();
        if (ch != null) {
            //创建NioSocketChannel对象,初始化SelectionKey.OP_READ事件类型,创建对应的unsafe、pipeline实例,
            //类似服务端NioServerSocketChannel的创建,不再具体分析
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
        return 0;
    }

ServerBootstrapAcceptor处理器主要是对新建立的SocketChannel连接进行初始化,并在SocketChannel实例的pipeline添
加childHandler。这里的childHandler就是最开始我们在启动引导类配置的bootstrap.childHandler(new ChannelInitializer)。
这样就在每一个SocketChannel对象的pepiline中配置好了对应的处理器,用来处理SocketChannel的读写事件的每一个消息体。

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            
            child.pipeline().addLast(childHandler);
            //初始化option和attr
            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                //分发到workerGroup
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

转载请备注原文链接。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,635评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,543评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,083评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,640评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,640评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,262评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,833评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,736评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,280评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,369评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,503评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,185评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,870评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,340评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,460评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,909评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,512评论 2 359

推荐阅读更多精彩内容