Netty解码器_MessageAggregator详解

这个解码器是用来处理那种包含头和内容体结构的数据。

public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends ByteBufHolder>
        extends MessageToMessageDecoder<I> { ... }

它是 MessageToMessageDecoder 的子类:

名字 定义
I 它是 S,C,O 的父接口
S 表示开始类型数据,即头数据
C 表示内容体类型数据,也有可能表示某一项内容体数据
O 表示包含头和内容体的数据,聚合后的完成数据

注意 CO 必须是 ByteBufHolder 子接口,也就是说它们通过缓存区 ByteBuf 存储数据。

一. 重要方法

1.1 acceptInboundMessage 方法

    @Override
    public boolean acceptInboundMessage(Object msg) throws Exception {
        // No need to match last and full types because they are subset of first and middle types.
        // 从这里来看 S, C, O 都会实现 I 接口
        if (!super.acceptInboundMessage(msg)) {
            return false;
        }

        @SuppressWarnings("unchecked")
        I in = (I) msg;

        if (isAggregated(in)) {
            // 当前数据 I 已经是聚合后的数据,不用再聚合了,直接返回 false
            return false;
        }


        if (isStartMessage(in)) {
            // 如果是开始类型数据,将 aggregating 设置为true,表示开始聚合
            aggregating = true;
            // 返回 true,表示可以处理这个数据
            return true;
        } else if (aggregating && isContentMessage(in)) {
            // 如果是内容类型数据,那么 aggregating 也必须为 true,表示之前是开始类型数据
            return true;
        }

        return false;
    }

复写 MessageToMessageDecoder 中的 acceptInboundMessage(...) 方法,判断当前接收到的数据 msg 是否能被当前解码器处理。

  • 必须是 I 类型或它子类型的数据。
  • isAggregated(in) 当前数据不能是聚合后的数据。
  • isStartMessage(in) 是不是开始类型数据。
  • aggregating && isContentMessage(in) 如果是内容类型数据,那么 aggregating 也必须为 true,表示之前是开始类型数据

1.2 acceptInboundMessage 方法

    @Override
    protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
        // 进入 decode(...) 方法, aggregating 必须为true,
        // 在 acceptInboundMessage(...) 方法中设置。
        assert aggregating;

        if (isStartMessage(msg)) {
            // 这里表示是开始类型数据
            handlingOversizedMessage = false;
            if (currentMessage != null) {
                // 如果是开始类型数据,但是 currentMessage 有值了,
                // 说明之前已经解析了开始类型数据,这里就要进行报错,抛出异常
                currentMessage.release();
                currentMessage = null;
                throw new MessageAggregationException();
            }

            @SuppressWarnings("unchecked")
            S m = (S) msg;

            // Send the continue response if necessary (e.g. 'Expect: 100-continue' header)
            // Check before content length. Failing an expectation may result in a different response being sent.

            // 在内容长度检查之前isContentLengthInvalid(...)创建一个 `continueResponse`。
            // 这个是可选项,完整解析数据之前,先发送一个响应给远端,不是必须的。
            // 例如在 `http` 请求时,先响应 'Expect: 100-continue' 头信息。
            Object continueResponse = newContinueResponse(m, maxContentLength, ctx.pipeline());
            if (continueResponse != null) {
                // 缓存写监听器以便复用。
                ChannelFutureListener listener = continueResponseWriteListener;
                if (listener == null) {
                    continueResponseWriteListener = listener = new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                // 写失败的时候,抛出异常
                                ctx.fireExceptionCaught(future.cause());
                            }
                        }
                    };
                }

                // Make sure to call this before writing, otherwise reference counts may be invalid.

                // 写入 continueResponse 数据成功之后,是否直接关闭通道
                boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
                // 写 continueResponse 之后,是不是忽略内容数据
                handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);

                // 发送 continueResponse 数据
                final ChannelFuture future = ctx.writeAndFlush(continueResponse).addListener(listener);

                if (closeAfterWrite) {
                    future.addListener(ChannelFutureListener.CLOSE);
                    return;
                }
                // 直接返回,不用解析
                if (handlingOversizedMessage) {
                    return;
                }
            } else if (isContentLengthInvalid(m, maxContentLength)) {
                // 通过 isContentLengthInvalid(...) 方法从开始类型数据中获取内容长度,与允许的最大内容长度 maxContentLength 进行比较。
                // 如果 `(isContentLengthInvalid(...)` 方法返回false,说明内容太大,
                // 就要抛出 TooLongFrameException 异常。
                invokeHandleOversizedMessage(ctx, m);
                return;
            }

            // 如果
            if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
                O aggregated;
                if (m instanceof ByteBufHolder) {
                    aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
                } else {
                    aggregated = beginAggregation(m, EMPTY_BUFFER);
                }
                finishAggregation0(aggregated);
                out.add(aggregated);
                return;
            }

            // 创建一个组合缓存区 content
            CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);
            if (m instanceof ByteBufHolder) {
                // 如果开始数据 m 是 ByteBufHolder 类型,
                // 就将ByteBufHolder 中包含的缓存区 content() 数据添加到组合缓存区 content 中
                appendPartialContent(content, ((ByteBufHolder) m).content());
            }
            // 开始聚合
            currentMessage = beginAggregation(m, content);
        } else if (isContentMessage(msg)) {
            // 这边表示是内容数据
            if (currentMessage == null) {
                // it is possible that a TooLongFrameException was already thrown but we can still discard data
                // until the begging of the next request/response.
                return;
            }

            // 得到组合缓存区 content,然后将内容数据添加进去
            CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();

            @SuppressWarnings("unchecked")
            final C m = (C) msg;
            // Handle oversized message.
            if (content.readableBytes() > maxContentLength - m.content().readableBytes()) {
                // By convention, full message type extends first message type.
                @SuppressWarnings("unchecked")
                S s = (S) currentMessage;
                invokeHandleOversizedMessage(ctx, s);
                return;
            }

            // 将内容数据添加到组合缓冲区 content
            appendPartialContent(content, m.content());

            // 给子类型一个合并附加信息的机会。
            aggregate(currentMessage, m);

            final boolean last;
            if (m instanceof DecoderResultProvider) {
                DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
                if (!decoderResult.isSuccess()) {
                    if (currentMessage instanceof DecoderResultProvider) {
                        ((DecoderResultProvider) currentMessage).setDecoderResult(
                                DecoderResult.failure(decoderResult.cause()));
                    }
                    last = true;
                } else {
                    last = isLastContentMessage(m);
                }
            } else {
                last = isLastContentMessage(m);
            }


            if (last) {
                // 如果是最后一条内容消息,那么就调用 finishAggregation0(...) 方法完成聚合
                finishAggregation0(currentMessage);

                // 将聚合后的数据 currentMessage 发送到下一个处理器中
                out.add(currentMessage);
                currentMessage = null;
            }
        } else {
            throw new MessageAggregationException();
        }
    }

这个方法逻辑很多,主要分为两个部分:

  1. 处理开始类型数据:

    • 通过 newContinueResponse(...) 方法,看子类是否先发送一个响应给远端。
    • 通过 isContentLengthInvalid(m, maxContentLength) 方法,检查内容体长度是否超标,内容体长度从头数据 m 中获取。
    • 调用 beginAggregation(m, content) 方法,开始聚合数据。
  2. 处理内容项类型数据:

    • 通过 appendPartialContent(content, m.content()) 方法,将内容项数据添加到聚合数据中。
    • 通过 isLastContentMessage(m) 方法,判断是不是最后一个内容项。
    • 如果是最后一条内容消息,那么就调用 finishAggregation0(...) 方法完成聚合,并将聚合后的数据 currentMessage, 添加到out 中,这样就会发送到下一个处理器中。

1.3 需要子类复写的方法

方法 定义
isStartMessage(I msg) 当且仅当给定数据msg开始数据类型时返回true
isContentMessage(I msg) 当且仅当给定数据msg内容项数据类型时返回true
isLastContentMessage(C msg) 当且仅当给定数据msg最后一个内容项数据时返回true。
isAggregated(I msg) 当且仅当给定数据msg聚合数据类型时返回true
isContentLengthInvalid(S start, int maxContentLength) 确定消息开始的内容长度是否已知,以及它是否大于maxContentLength
newContinueResponse(S start, int maxContentLength, ChannelPipeline pipeline) 如果有需要指定开始消息返回数据,例如Http请求中 100-continue报头;如果不需要,那么直接返回 null
closeAfterContinueResponse(Object msg) 决定在写入 newContinueResponse(Object, int, ChannelPipeline)的结果之后是否应该关闭通道。
ignoreContentAfterContinueResponse(Object msg) 确定是否应该忽略当前请求/响应的所有对象。 当下一次isContentMessage(Object)返回true时,这些数据会被忽略掉。
beginAggregation(S start, ByteBuf content) 使用开始类型数据S和给定缓存区content创建返回的聚合数据O;给定缓存区content将会继续接收内容体的数据。如果开始类型数据S实现了ByteBufHolder,它的数据将被添加到给定缓存区content中。
finishAggregation(O aggregated) 当聚合消息即将传递给管道中的下一个处理程序前调用。

二. 实现子类

2.1 RedisBulkStringAggregator

public final class RedisBulkStringAggregator extends MessageAggregator<RedisMessage, BulkStringHeaderRedisMessage,
                                                                 BulkStringRedisContent, FullBulkStringRedisMessage> {

    /**
     * Creates a new instance.
     */
    public RedisBulkStringAggregator() {
        super(RedisConstants.REDIS_MESSAGE_MAX_LENGTH);
    }

    @Override
    protected boolean isStartMessage(RedisMessage msg) throws Exception {
        // 是开始数据类型 BulkStringHeaderRedisMessage
        return msg instanceof BulkStringHeaderRedisMessage && !isAggregated(msg);
    }

    @Override
    protected boolean isContentMessage(RedisMessage msg) throws Exception {
        // 是内容项数据 BulkStringRedisContent
        return msg instanceof BulkStringRedisContent;
    }

    @Override
    protected boolean isLastContentMessage(BulkStringRedisContent msg) throws Exception {
        // 最后一个内容项数据 LastBulkStringRedisContent
        // LastBulkStringRedisContent 是 BulkStringRedisContent 的子类
        return msg instanceof LastBulkStringRedisContent;
    }

    @Override
    protected boolean isAggregated(RedisMessage msg) throws Exception {
        // 是聚合数据类型 FullBulkStringRedisMessage
        return msg instanceof FullBulkStringRedisMessage;
    }

    @Override
    protected boolean isContentLengthInvalid(BulkStringHeaderRedisMessage start, int maxContentLength)
            throws Exception {
        return start.bulkStringLength() > maxContentLength;
    }

    @Override
    protected Object newContinueResponse(BulkStringHeaderRedisMessage start, int maxContentLength,
                                         ChannelPipeline pipeline) throws Exception {
        // 直接返回 null,不用刚开始响应数据
        return null;
    }

    @Override
    protected boolean closeAfterContinueResponse(Object msg) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected FullBulkStringRedisMessage beginAggregation(BulkStringHeaderRedisMessage start, ByteBuf content)
            throws Exception {
        // 返回聚合数据
        return new FullBulkStringRedisMessage(content);
    }
}

实现起来非常简单,通过 instanceof 操作符,根据不同类型,返回开始类型,内容项类型,聚合类型数据;最后在 beginAggregation(...) 方法中,返回聚合数据。
主要看这几个数据类型:

  • RedisMessageBulkStringHeaderRedisMessage ,BulkStringRedisContent,LastBulkStringRedisContent,FullBulkStringRedisMessage 的父接口。
  • BulkStringHeaderRedisMessage 实现 RedisMessage 的子类。
  • BulkStringRedisContent 是继承 RedisMessageByteBufHolder 的子接口。
  • LastBulkStringRedisContent 是继承 BulkStringRedisContent 的子接口。
  • FullBulkStringRedisMessage 是实现 LastBulkStringRedisContent 的子类。

2.2 WebSocketFrameAggregator

public class WebSocketFrameAggregator
        extends MessageAggregator<WebSocketFrame, WebSocketFrame, ContinuationWebSocketFrame, WebSocketFrame> {

    /**
     * Creates a new instance
     *
     * @param maxContentLength If the size of the aggregated frame exceeds this value,
     *                         a {@link TooLongFrameException} is thrown.
     */
    public WebSocketFrameAggregator(int maxContentLength) {
        super(maxContentLength);
    }

    @Override
    protected boolean isStartMessage(WebSocketFrame msg) throws Exception {
        // 开始数据类型是 TextWebSocketFrame 和 BinaryWebSocketFrame
        return msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame;
    }

    @Override
    protected boolean isContentMessage(WebSocketFrame msg) throws Exception {
        // 内容项数据类型是 ContinuationWebSocketFrame
        return msg instanceof ContinuationWebSocketFrame;
    }

    @Override
    protected boolean isLastContentMessage(ContinuationWebSocketFrame msg) throws Exception {
        // 最后一个内容项数据, 必须是内容项数据类型,且 isFinalFragment 返回true
        return isContentMessage(msg) && msg.isFinalFragment();
    }

    @Override
    protected boolean isAggregated(WebSocketFrame msg) throws Exception {
        if (msg.isFinalFragment()) {
            return !isContentMessage(msg);
        }

        return !isStartMessage(msg) && !isContentMessage(msg);
    }

    @Override
    protected boolean isContentLengthInvalid(WebSocketFrame start, int maxContentLength) {
        return false;
    }

    @Override
    protected Object newContinueResponse(WebSocketFrame start, int maxContentLength, ChannelPipeline pipeline) {
        // 直接返回 null,不用刚开始响应数据
        return null;
    }

    @Override
    protected boolean closeAfterContinueResponse(Object msg) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected WebSocketFrame beginAggregation(WebSocketFrame start, ByteBuf content) throws Exception {
        if (start instanceof TextWebSocketFrame) {
            return new TextWebSocketFrame(true, start.rsv(), content);
        }

        if (start instanceof BinaryWebSocketFrame) {
            return new BinaryWebSocketFrame(true, start.rsv(), content);
        }

        // Should not reach here.
        throw new Error();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容