Netty源码分析之服务端Accept过程详解

作者: 一字马胡
转载标志 【2017-11-03】

更新日志

日期 更新内容 备注
2017-11-03 添加转载标志 持续更新

NI/O C/S通信过程

下面分别展示了NI/O模式下的客户端/服务端编程模型:

NI/O服务端
NI/O客户端

Netty是一种基于NI/O的网络框架,网络层面的操作只是对NI/O提供的API的封装,所以,它的服务端流程和客户端流程是和NI/O的一致的,对于客户端而言,它要做的事情就是连接到服务端,然后发送/接收消息。对于服务端而言,它要bind一个端口,然后等待客户端的连接,接收客户端的连接使用的是Accept操作,一个服务端需要为多个客户端提供服务,而每次accept都会生成一个新的Channel,Channel是一个服务端和客户端之间数据传输的通道,可以向其write数据,或者从中read数据。本文将分析Netty框架的Accept流程。

Netty的Accept流程

Netty是一个较为复杂的网络框架,想要理解它的设计需要首先了解NI/O的相关知识,为了对Netty框架有一个大概的了解,你可以参考Netty线程模型及EventLoop详解,该文章详解解析了Netty中重要的事件循环处理流程,包含EventLoop的初始化和启动等相关内容。下面首先展示了Netty中的EventLoop的分配模型,Netty服务端会为每一个新建立的Channel分配一个EventLoop,并且这个EventLoop将服务于这个Channel得整个生命周期不会改变,而一个EventLoop可能会被分配给多个Channel,也就是一个EventLoop可能会服务于多个Channel的读写事件,这对于想要使用ThreadLocal的场景需要认真考虑。

Netty NI/O模式下EventLoop分配模型

在文章Netty线程模型及EventLoop详解中已经分析了EventLoop的流程,现在从事件循环的起点开始看起,也就是NioEventLoop的run方法,本文关心的是Netty的Accept事件,当在Channel上发生了事件之后,会执行processSelectedKeysPlain方法,看一下这个方法:


 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

接着会执行processSelectedKey这个方法,下面是它的细节:


    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

现在我们可以看到和NI/O一样的类似OP_READ和OP_ACCEPT之类的东西了,OP_ACCEPT表示的是有Accept事件发生了,需要我们处理,但是发现好像OP_READ 事件和OP_ACCEPT事件的处理都是通过一个read方法进行的,我们先来找到这个read方法:

-------------------------------------------------
AbstractNioMessageChannel.NioMessageUnsafe.read
-------------------------------------------------
        public void read() {
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

            } 
         }
    }


本文中的所有代码都是已经处理过的,完整的代码参考源代码,本文为了控制篇幅去除了一些不影响阅读(影响逻辑)的代码,上面的read方法中有一个关键的方法doReadMessages,下面是它的实现:


--------------------------------------------
NioServerSocketChannel. doReadMessages
--------------------------------------------

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

这个方法就是处理accept类型的事件的,为了更好的理解上面的代码,下面展示一段在NI/O中服务端的代码:


int port = 8676;
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking (false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind (new InetSocketAddres(port));
Selector selector = Selector.open();
serverChannel.register (selector, SelectionKey.OP_ACCEPT);
while (true) {
    int n = selector.select();
    Iterator it = selector.selectedKeys().iterator();
    while (it.hasNext()) {
        SelectionKey key = (SelectionKey) it.next();
        if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel channel = server.accept();
            channel.configureBlocking (false);
            channel.register (selector, SelectionKey.OP_READ);
        }
        if (key.isReadable( )) {
            processReadEvent(key);
        }
        
        it.remove( );
    }
} 

可以看到,服务端accept一次就会产生一个新的Channel,Netty也是,每次Accept都会new一个新的NioSocketChannel,当然,这个Channel需要分配一个EventLoop给他才能开始事件循环,但是Netty服务端的Accept事件到此应该可以清楚流程了,下面分析这个新的Channel是怎么开始事件循环的。继续看AbstractNioMessageChannel.NioMessageUnsafe.read这个方法,其中有一个句话:


pipeline.fireChannelRead(readBuf.get(i));

现在来跟踪一下这个方法的调用链:


 -> ChannelPipeline.fireChannelRead
 -> DefaultChannelPipeline.fireChannelRead
 -> AbstractChannelHandlerContext.invokeChannelRead
 -> ChannelInboundHandler.channelRead
 ->ServerBootstrapAcceptor.channelRead

上面列出的是主要的调用链路,只是为了分析Accept的过程,到ServerBootstrapAcceptor.channelRead这个方法就可以看到是怎么分配EventLoop给Channel的了,下面展示了ServerBootstrapAcceptor.channelRead这个方法的细节:


        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);        【1】

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());   【2】
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {   【3】
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

  • 【1】首先将服务端的处理逻辑赋值给新建立的这个Channel得pipeline,使得这个新建立的Channel可以得到服务端提供的服务
  • 【2】属性赋值
  • 【3】将这个新建立的Channel添加到EventLoopGroup中去,这里进行EventLoop的分配

下面来仔细看一下【3】这个register方法:

========================================
MultithreadEventLoopGroup.register
========================================

    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
========================================
SingleThreadEventLoop.register
========================================

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    

========================================
AbstractUnsafe.register
========================================    
    
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

上面的流程展示了这个新的Channel是怎么获得一个EventLoop的,而EventLoopGroup分配EventLoop的关键在于MultithreadEventLoopGroup.register这个方法中的next方法,而这部分的分析已经在Netty线程模型及EventLoop详解中做过了,不再赘述。当一个新的连接被服务端Accept之后,会创建一个新的Channel来维持服务端与客户端之间的通信,而每个新建立的Channel都会被分配一个EventLoop来实现事件循环,本文分析了Netty服务端Accept的详细过程,至此,对于Netty的EventLoop、EventLoopGroup以及EventLoop是如何被运行起来的,以及服务端是如何Accept新的连接的这些问题应该都已经有答案了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352

推荐阅读更多精彩内容