ByteToMessageDecoder

ByteToMessageDecoder是解码器的基类, 具有最基本的能力, 将字节解码成消息, 以便在pipeline上进行传递.

1564365188195.png

关键属性

// 对入站数据进行临时缓冲, 直到它准备好处理
ByteBuf cumulation;
// 缓冲的策略
private Cumulator cumulator = MERGE_CUMULATOR;
// 是否只解码一次
private boolean singleDecode;
// 意思是解码有没有结果, true为没有
private boolean decodeWasNull;
// 这批数据是不是第一次处理
private boolean first;
private int discardAfterReads = 16;
private int numReads;

channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 如果消息是ByteBuf类型
    if (msg instanceof ByteBuf) {
        // 从对象池取出一个CodecOutputList, 用来收集解码后的消息
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            // 看累积器是不是为空来决定是不是首次处理
            first = cumulation == null;
            // 看是不是第一次处理,如果是,那么直接赋予累积器
            if (first) {
                cumulation = data;
            } else {
                // 否则累加到累积器, 见cumulator部分
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 对消息进行解码
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            // 如果累积器的内容已经读取完毕,那么回收掉
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            // 当这批数据已经读了有16次之多后,需要整理下内存
            } else if (++ numReads >= discardAfterReads) {               
                numReads = 0;
                // 这里主要是对累积器进行整理,清理discard区域为读写空间腾地方
                discardSomeReadBytes();
            }
            
            // 将解码后的结果通知下游, 且回收out容器
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        // 如果不是ByteBuf类型,直接传递给下游
        ctx.fireChannelRead(msg);
    }
}

callDecode

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        // 如果缓冲器有内容可读
        while (in.isReadable()) {
            int outSize = out.size();

            // 如果out容器有内容,那么说明解码有结果了,那么马上需要通知下游handle
            // 通知完,清理容器out
            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();
                
                // 如果此时hanler被移除,那么不用继续处理,直接退出
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }

            int oldInputLength = in.readableBytes();
            // 子类来处理具体解码的工作,最终将解码后的消息放在out里面就好
            decode(ctx, in, out);

            // 如果此时hanler被移除,那么不用继续处理,直接退出
            if (ctx.isRemoved()) {
                break;
            }
            
            // 如果这时,out容器的大小没有变化,说明子类那边解码还没有结果
            if (outSize == out.size()) {
                // 如果没有读取任何数据, 那么结束循环
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    // 如果有读取,但不足与产生结果,那么需要继续读取
                    continue;
                }
            }
            
            // 如果有结果,但是根本就没有读取数据,那么不是很诡异么?
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
            }
            
            // 是否只执行一次
            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Throwable cause) {
        throw new DecoderException(cause);
    }
}

Cumulator

MERGE_CUMULATOR

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        // 简单的情况是当前的累积器空间不足,需要扩容
        // 因为read的时候,每隔一段时间都需要对累积器的内存空间进行整理,那么整理的过程会导致
        // 读写index变更, 进而导致浅拷贝后的ByteBuf不可用.
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1) { 
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};

子缓冲区

Netty中调用ByteBuf.duplicate(),ByteBuf.slice()和ByteBuf.order(ByteOrder)三个方法, 会创建一个子缓冲区,子缓冲区共享父缓冲区的内存区域。子缓冲区没有自己的引用计数,而是 共享父缓冲区的引用计数。

当父缓冲区release的时候, 会引用计数清零, 导致该内存区域被回收, 进而影响子缓冲区, 导致读写失败. 那么需要注意的事, 子缓冲区需要显示调用retain来提示Netty有其他人在使用, 防止被错误回收. 这里带来额外的坏处是, 所有子缓冲区在使用完后, 要及时release, 防止内存泄漏.

在前面的场景中可以看到, 解码器在解码完后视情况来决定要不要做内存整理, 而整理的过程会进行数据移动, 且按照整理后的结果重置read和write索引, 这样会影响到子缓冲区的读写. 下面是个简单的例子, 自己体会. 另外一个问题是, 如果源Buffer提前release, 那么子缓冲区也会读写异常.

ByteBuf source = ByteBufAllocator.DEFAULT.buffer(20, 20);

source.writeInt(1);
source.readInt();

source.writeInt(2);

ByteBuf duplicate = source.duplicate();
System.out.println("source" + source.toString());
System.out.println("duplicate" + duplicate.toString());
source.discardReadBytes();
System.out.println("source" + source.toString());
System.out.println("duplicate" + duplicate.toString());
System.out.println("duplicate" + duplicate.readInt());
duplicate.readerIndex(0);
System.out.println("duplicate" + duplicate.toString());
System.out.println("duplicate" + duplicate.readInt());
source:PooledUnsafeDirectByteBuf(ridx: 4, widx: 16, cap: 20/20)
duplicate:UnpooledDuplicatedByteBuf(ridx: 4, widx: 16, cap: 20/20)
source:PooledUnsafeDirectByteBuf(ridx: 0, widx: 12, cap: 20/20)
duplicate:UnpooledDuplicatedByteBuf(ridx: 4, widx: 16, cap: 20/20)
duplicate:0
duplicate:UnpooledDuplicatedByteBuf(ridx: 0, widx: 16, cap: 20/20)
duplicate:2

expandCumulation

可以看到解决上面问题的方案也是简单粗暴, 直接重建一个ByteBuf, 将数据拷贝过来. 这样, 之前的子缓冲区也不会被之后可能的内存整理给影响.

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
    ByteBuf oldCumulation = cumulation;
    cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
    cumulation.writeBytes(oldCumulation);
    oldCumulation.release();
    return cumulation;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容