netty分析(二) -- 数据接收及报文处理

上篇博文我们分析了netty的启动流程。
详细见netty分析(一) -- 服务启动流程。这篇文章,我们来分析下netty的数据处理。

上篇讲到在bossGroup的NioEventLoop中的processSelectedKey函数中会调用unsafe.read()来执行NioServerSocketChannel的的accept操作。
在workerGroup中,NioEventLoop的processSelectedKey函数中会执行socket的数据读取操作,让我们来看一下。

1.读取客户端数据过程

processSelectedKey

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

NioSocketChannel的read方法

        public void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
            if (allocHandle == null) {
                this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
            }
            if (!config.isAutoRead()) {
                removeReadOp();
            }

            ByteBuf byteBuf = null;
            int messages = 0;
            boolean close = false;
            try {   
                int byteBufCapacity = allocHandle.guess();
                int totalReadAmount = 0;
                do {
                    byteBuf = allocator.ioBuffer(byteBufCapacity);
                    int writable = byteBuf.writableBytes();
                    int localReadAmount = doReadBytes(byteBuf);
                    if (localReadAmount <= 0) {
                        // not was read release the buffer
                        byteBuf.release();
                        close = localReadAmount < 0;
                        break;
                    }

                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;

                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                        // Avoid overflow.
                        totalReadAmount = Integer.MAX_VALUE;
                        break;
                    }

                    totalReadAmount += localReadAmount;
                    if (localReadAmount < writable) {
                        // Read less than what the buffer can hold,
                        // which might mean we drained the recv buffer completely.
                        break;
                    }
                } while (++ messages < maxMessagesPerRead);

                pipeline.fireChannelReadComplete();
                allocHandle.record(totalReadAmount);

                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            }
        }
    }

逻辑还是比较清晰的,核心操作是这两句"int localReadAmount = doReadBytes(byteBuf);"以及"pipeline.fireChannelReadComplete();"。
前者负责将数据从底层读入ByteBuf,后者负责将数据转交pipline处理。

DefaultChannelPipeline的fireChannelRead实现

    public ChannelPipeline fireChannelRead(Object msg) {
        head.fireChannelRead(msg);
        return this;
    }

ChannelPipeline

每个socket绑定了一个pipline,pipline内部维护了一个可迭代的ChannelHandlerContext链表,用于处理io数据,以head标识头部,tail标识尾部。
当我们初始化通道,调用pipline的addLast方法塞入一个ChannelHandler时,实际对handler各个方法是否有@Skip注解做了标记,封装成一个ChannelHandlerContext放入链表的尾部。
这样有数据触发的时候,pipline会从链表首部开始迭代,找到对应能处理相应逻辑的handler进行处理。如果当前的handler需要将数据丢给下一层handler进行处理,需要调用ChannelHandlerContext的fireXXX方法将数据传递下去。

进一步查看DefaultChannelHandlerContext的fireChannelRead方法。

    @Override
    public ChannelHandlerContext fireChannelRead(Object msg) {
        DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
        next.invoker.invokeChannelRead(next, msg);
        return this;
    }

这里的next上下文是根据handler中未携带@Skip的注解来查找最近的链表节点。进一步查看invokeChannelRead方法。

    public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        if (executor.inEventLoop()) {
            invokeChannelReadNow(ctx, msg);
        } else {
            safeExecuteInbound(new Runnable() {
                @Override
                public void run() {
                    invokeChannelReadNow(ctx, msg);
                }
            }, msg);
        }
    }

由于当前是workerGroup中的EventLoop线程,走进invokeChannelReadNow。

    public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
        try {
            ctx.handler().channelRead(ctx, msg);
        } catch (Throwable t) {
            notifyHandlerException(ctx, t);
        }
    }

走到这里,终于和用户代码联系起来了。当接收到客户端数据会调用用户设置的ChannelHandler的channelRead方法。这个很重要。下面我们分析下常见的ChannelHandler。

2 常见的ChannelHandler

由于数据处理的复杂性,Netty针对常见的应用场景给我们封装了一系列的ChannelHandler。先介绍ByteToMessageDecoder的子类。

2.1 ByteToMessageDecoder

2.1.1.LineBasedFrameDecoder

LineBasedFrameDecoder用于以换行符做解码分割符的场景。我们来查看其实现。根据上面的分析,当数据来了首先调用到channelRead方法。我们查看其实现

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());
                    }
                    cumulation.writeBytes(data);
                    data.release();
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;

                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

注意:Netty框架并不缓存数据,所以当有未处理完的半包,我们需要自己存起来。
ByteToMessageDecoder如其名,只处理ByteBuf类型的输入数据,内部有个cumulation用于缓存半包(确切的说是若干个已读的全包加一个半包).如果每次解码以后,恰好处理完读入的字节没有剩余半包,那么清空cumulation。读入的新数据会附加到cumulation的尾部,如果cumulation剩下的空间不够写入了,则会对cumulation重新分配内存,新的内存大小正好是需要的字节数。
接着对读入的数据进行分包,详细代码如下:

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while(true) {
                if (in.isReadable()) {
                    int outSize = out.size();
                    int oldInputLength = in.readableBytes();
                    this.decode(ctx, in, out);
                    if (!ctx.isRemoved()) {
                        if (outSize == out.size()) {
                            if (oldInputLength != in.readableBytes()) {
                                continue;
                            }
                        } else {
                            if (oldInputLength == in.readableBytes()) {
                                throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() did not read anything but decoded a message.");
                            }

                            if (!this.isSingleDecode()) {
                                continue;
                            }
                        }
                    }
                }

                return;
            }
        } catch (DecoderException var6) {
            throw var6;
        } catch (Throwable var7) {
            throw new DecoderException(var7);
        }
    }

接下来会调用子类的decode方法进行进一步解码,查看LineBasedFrameDecoder的实现:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        int eol = findEndOfLine(buffer);
        int length;
        int length;
        if (!this.discarding) {
            if (eol >= 0) {
                length = eol - buffer.readerIndex();
                int delimLength = buffer.getByte(eol) == 13 ? 2 : 1;
                if (length > this.maxLength) {
                    buffer.readerIndex(eol + delimLength);
                    this.fail(ctx, length);
                    return null;
                } else {
                    ByteBuf frame;
                    if (this.stripDelimiter) {
                        frame = buffer.readBytes(length);
                        buffer.skipBytes(delimLength);
                    } else {
                        frame = buffer.readBytes(length + delimLength);
                    }

                    return frame;
                }
            } else {
                length = buffer.readableBytes();
                if (length > this.maxLength) {
                    this.discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    this.discarding = true;
                    if (this.failFast) {
                        this.fail(ctx, "over " + this.discardedBytes);
                    }
                }

                return null;
            }
        } else {
            if (eol >= 0) {
                length = this.discardedBytes + eol - buffer.readerIndex();
                length = buffer.getByte(eol) == 13 ? 2 : 1;
                buffer.readerIndex(eol + length);
                this.discardedBytes = 0;
                this.discarding = false;
                if (!this.failFast) {
                    this.fail(ctx, length);
                }
            } else {
                this.discardedBytes = buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
            }

            return null;
        }
    }

以"\r\n" 或"\n"做分隔符,如果超过了配置的最大长度还没有读到结束符,将调用fireExceptionCaught告知后续节点解码发生异常。

2.1.2.DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder支持用户以多种特定的分割符来对原始ByteBuf解码。如果是换行符"\n"和"\r\n",且不是DelimiterBasedFrameDecoder的子类,功能和LineBasedFrameDecoder一模一样,内部创建一个LineBasedFrameDecoder对象来处理。

2.1.3.FixedLengthFrameDecoder

定长解码器,没什么好说的。

2.1.4.LengthFieldBasedFrameDecoder

这个比较经常用到,先看构造函数。

public LengthFieldBasedFrameDecoder(
            ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip, boolean failFast)

第一个参数是网络传输大小端,默认大端,最后一个参数是解码异常了是否快速报告。先跳过这两个参数,着重讲下其他

  • maxFrameLength 似有协议最大的包长度

  • lengthFieldOffset 私有协议都有一个header,header中有若干个字节是代表数据总长度的,这个字段代表header报文长度字段相对于第一个字节的偏移量。

  • lengthFieldLength 用多少个字节表示Header报文的长度,即从包头开始lengthFieldOffset ~lengthFieldOffset+lengthFieldLength 的数据代表报文总长度。

  • lengthAdjustment header中的报文总长度与body长度的差值,设body+header的实际长度是L,body长度是Lbody那么应该有公式,valueof(lengthFieldOffset ~lengthFieldOffset+lengthFieldLength) - lengthAdjustment= Lbody;

2.2.MessageToMessageDecoder

功能和ByteToMessageDecoder类似,但是支持了泛型的输入数据。

2.3.IdleStateHandler

IdleStateHandler在超过指定的时间通道上未发生读/写/(读和写)事件时,将借助NioEventLoop的定时器功能触发定时任务,满足条件时发出fireUserEventTriggered事件。通常我们可以用这个特性用来控制心跳等逻辑。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容