这个解码器是用来处理那种包含头和内容体结构的数据。
public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends ByteBufHolder>
extends MessageToMessageDecoder<I> { ... }
它是 MessageToMessageDecoder
的子类:
名字 | 定义 |
---|---|
I |
它是 S ,C ,O 的父接口 |
S |
表示开始类型数据,即头数据 |
C |
表示内容体类型数据,也有可能表示某一项内容体数据 |
O |
表示包含头和内容体的数据,聚合后的完成数据 |
注意
C
和O
必须是ByteBufHolder
子接口,也就是说它们通过缓存区ByteBuf
存储数据。
一. 重要方法
1.1 acceptInboundMessage
方法
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
// No need to match last and full types because they are subset of first and middle types.
// 从这里来看 S, C, O 都会实现 I 接口
if (!super.acceptInboundMessage(msg)) {
return false;
}
@SuppressWarnings("unchecked")
I in = (I) msg;
if (isAggregated(in)) {
// 当前数据 I 已经是聚合后的数据,不用再聚合了,直接返回 false
return false;
}
if (isStartMessage(in)) {
// 如果是开始类型数据,将 aggregating 设置为true,表示开始聚合
aggregating = true;
// 返回 true,表示可以处理这个数据
return true;
} else if (aggregating && isContentMessage(in)) {
// 如果是内容类型数据,那么 aggregating 也必须为 true,表示之前是开始类型数据
return true;
}
return false;
}
复写 MessageToMessageDecoder
中的 acceptInboundMessage(...)
方法,判断当前接收到的数据 msg
是否能被当前解码器处理。
- 必须是
I
类型或它子类型的数据。isAggregated(in)
当前数据不能是聚合后的数据。isStartMessage(in)
是不是开始类型数据。aggregating && isContentMessage(in)
如果是内容类型数据,那么aggregating
也必须为true
,表示之前是开始类型数据
1.2 acceptInboundMessage
方法
@Override
protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
// 进入 decode(...) 方法, aggregating 必须为true,
// 在 acceptInboundMessage(...) 方法中设置。
assert aggregating;
if (isStartMessage(msg)) {
// 这里表示是开始类型数据
handlingOversizedMessage = false;
if (currentMessage != null) {
// 如果是开始类型数据,但是 currentMessage 有值了,
// 说明之前已经解析了开始类型数据,这里就要进行报错,抛出异常
currentMessage.release();
currentMessage = null;
throw new MessageAggregationException();
}
@SuppressWarnings("unchecked")
S m = (S) msg;
// Send the continue response if necessary (e.g. 'Expect: 100-continue' header)
// Check before content length. Failing an expectation may result in a different response being sent.
// 在内容长度检查之前isContentLengthInvalid(...)创建一个 `continueResponse`。
// 这个是可选项,完整解析数据之前,先发送一个响应给远端,不是必须的。
// 例如在 `http` 请求时,先响应 'Expect: 100-continue' 头信息。
Object continueResponse = newContinueResponse(m, maxContentLength, ctx.pipeline());
if (continueResponse != null) {
// 缓存写监听器以便复用。
ChannelFutureListener listener = continueResponseWriteListener;
if (listener == null) {
continueResponseWriteListener = listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 写失败的时候,抛出异常
ctx.fireExceptionCaught(future.cause());
}
}
};
}
// Make sure to call this before writing, otherwise reference counts may be invalid.
// 写入 continueResponse 数据成功之后,是否直接关闭通道
boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
// 写 continueResponse 之后,是不是忽略内容数据
handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);
// 发送 continueResponse 数据
final ChannelFuture future = ctx.writeAndFlush(continueResponse).addListener(listener);
if (closeAfterWrite) {
future.addListener(ChannelFutureListener.CLOSE);
return;
}
// 直接返回,不用解析
if (handlingOversizedMessage) {
return;
}
} else if (isContentLengthInvalid(m, maxContentLength)) {
// 通过 isContentLengthInvalid(...) 方法从开始类型数据中获取内容长度,与允许的最大内容长度 maxContentLength 进行比较。
// 如果 `(isContentLengthInvalid(...)` 方法返回false,说明内容太大,
// 就要抛出 TooLongFrameException 异常。
invokeHandleOversizedMessage(ctx, m);
return;
}
// 如果
if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
O aggregated;
if (m instanceof ByteBufHolder) {
aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
} else {
aggregated = beginAggregation(m, EMPTY_BUFFER);
}
finishAggregation0(aggregated);
out.add(aggregated);
return;
}
// 创建一个组合缓存区 content
CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);
if (m instanceof ByteBufHolder) {
// 如果开始数据 m 是 ByteBufHolder 类型,
// 就将ByteBufHolder 中包含的缓存区 content() 数据添加到组合缓存区 content 中
appendPartialContent(content, ((ByteBufHolder) m).content());
}
// 开始聚合
currentMessage = beginAggregation(m, content);
} else if (isContentMessage(msg)) {
// 这边表示是内容数据
if (currentMessage == null) {
// it is possible that a TooLongFrameException was already thrown but we can still discard data
// until the begging of the next request/response.
return;
}
// 得到组合缓存区 content,然后将内容数据添加进去
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
@SuppressWarnings("unchecked")
final C m = (C) msg;
// Handle oversized message.
if (content.readableBytes() > maxContentLength - m.content().readableBytes()) {
// By convention, full message type extends first message type.
@SuppressWarnings("unchecked")
S s = (S) currentMessage;
invokeHandleOversizedMessage(ctx, s);
return;
}
// 将内容数据添加到组合缓冲区 content
appendPartialContent(content, m.content());
// 给子类型一个合并附加信息的机会。
aggregate(currentMessage, m);
final boolean last;
if (m instanceof DecoderResultProvider) {
DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
if (!decoderResult.isSuccess()) {
if (currentMessage instanceof DecoderResultProvider) {
((DecoderResultProvider) currentMessage).setDecoderResult(
DecoderResult.failure(decoderResult.cause()));
}
last = true;
} else {
last = isLastContentMessage(m);
}
} else {
last = isLastContentMessage(m);
}
if (last) {
// 如果是最后一条内容消息,那么就调用 finishAggregation0(...) 方法完成聚合
finishAggregation0(currentMessage);
// 将聚合后的数据 currentMessage 发送到下一个处理器中
out.add(currentMessage);
currentMessage = null;
}
} else {
throw new MessageAggregationException();
}
}
这个方法逻辑很多,主要分为两个部分:
-
处理开始类型数据:
- 通过
newContinueResponse(...)
方法,看子类是否先发送一个响应给远端。 - 通过
isContentLengthInvalid(m, maxContentLength)
方法,检查内容体长度是否超标,内容体长度从头数据m
中获取。 - 调用
beginAggregation(m, content)
方法,开始聚合数据。
- 通过
-
处理内容项类型数据:
- 通过
appendPartialContent(content, m.content())
方法,将内容项数据添加到聚合数据中。 - 通过
isLastContentMessage(m)
方法,判断是不是最后一个内容项。 - 如果是最后一条内容消息,那么就调用
finishAggregation0(...)
方法完成聚合,并将聚合后的数据currentMessage
, 添加到out
中,这样就会发送到下一个处理器中。
- 通过
1.3 需要子类复写的方法
方法 | 定义 |
---|---|
isStartMessage(I msg) | 当且仅当给定数据msg 是开始数据类型时返回true
|
isContentMessage(I msg) | 当且仅当给定数据msg 是内容项数据类型时返回true 。 |
isLastContentMessage(C msg) | 当且仅当给定数据msg 是最后一个内容项数据时返回true。 |
isAggregated(I msg) | 当且仅当给定数据msg 是聚合数据类型时返回true 。 |
isContentLengthInvalid(S start, int maxContentLength) | 确定消息开始的内容长度是否已知,以及它是否大于maxContentLength 。 |
newContinueResponse(S start, int maxContentLength, ChannelPipeline pipeline) | 如果有需要指定开始消息返回数据,例如Http 请求中 100-continue 报头;如果不需要,那么直接返回 null 。 |
closeAfterContinueResponse(Object msg) | 决定在写入 newContinueResponse(Object, int, ChannelPipeline) 的结果之后是否应该关闭通道。 |
ignoreContentAfterContinueResponse(Object msg) | 确定是否应该忽略当前请求/响应的所有对象。 当下一次isContentMessage(Object) 返回true时,这些数据会被忽略掉。 |
beginAggregation(S start, ByteBuf content) | 使用开始类型数据S 和给定缓存区content 创建返回的聚合数据O ;给定缓存区content 将会继续接收内容体的数据。如果开始类型数据S 实现了ByteBufHolder ,它的数据将被添加到给定缓存区content 中。 |
finishAggregation(O aggregated) | 当聚合消息即将传递给管道中的下一个处理程序前调用。 |
二. 实现子类
2.1 RedisBulkStringAggregator
public final class RedisBulkStringAggregator extends MessageAggregator<RedisMessage, BulkStringHeaderRedisMessage,
BulkStringRedisContent, FullBulkStringRedisMessage> {
/**
* Creates a new instance.
*/
public RedisBulkStringAggregator() {
super(RedisConstants.REDIS_MESSAGE_MAX_LENGTH);
}
@Override
protected boolean isStartMessage(RedisMessage msg) throws Exception {
// 是开始数据类型 BulkStringHeaderRedisMessage
return msg instanceof BulkStringHeaderRedisMessage && !isAggregated(msg);
}
@Override
protected boolean isContentMessage(RedisMessage msg) throws Exception {
// 是内容项数据 BulkStringRedisContent
return msg instanceof BulkStringRedisContent;
}
@Override
protected boolean isLastContentMessage(BulkStringRedisContent msg) throws Exception {
// 最后一个内容项数据 LastBulkStringRedisContent
// LastBulkStringRedisContent 是 BulkStringRedisContent 的子类
return msg instanceof LastBulkStringRedisContent;
}
@Override
protected boolean isAggregated(RedisMessage msg) throws Exception {
// 是聚合数据类型 FullBulkStringRedisMessage
return msg instanceof FullBulkStringRedisMessage;
}
@Override
protected boolean isContentLengthInvalid(BulkStringHeaderRedisMessage start, int maxContentLength)
throws Exception {
return start.bulkStringLength() > maxContentLength;
}
@Override
protected Object newContinueResponse(BulkStringHeaderRedisMessage start, int maxContentLength,
ChannelPipeline pipeline) throws Exception {
// 直接返回 null,不用刚开始响应数据
return null;
}
@Override
protected boolean closeAfterContinueResponse(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected FullBulkStringRedisMessage beginAggregation(BulkStringHeaderRedisMessage start, ByteBuf content)
throws Exception {
// 返回聚合数据
return new FullBulkStringRedisMessage(content);
}
}
实现起来非常简单,通过 instanceof
操作符,根据不同类型,返回开始类型,内容项类型,聚合类型数据;最后在 beginAggregation(...)
方法中,返回聚合数据。
主要看这几个数据类型:
-
RedisMessage
是BulkStringHeaderRedisMessage
,BulkStringRedisContent
,LastBulkStringRedisContent
,FullBulkStringRedisMessage
的父接口。 -
BulkStringHeaderRedisMessage
实现RedisMessage
的子类。 -
BulkStringRedisContent
是继承RedisMessage
和ByteBufHolder
的子接口。 -
LastBulkStringRedisContent
是继承BulkStringRedisContent
的子接口。 -
FullBulkStringRedisMessage
是实现LastBulkStringRedisContent
的子类。
2.2 WebSocketFrameAggregator
public class WebSocketFrameAggregator
extends MessageAggregator<WebSocketFrame, WebSocketFrame, ContinuationWebSocketFrame, WebSocketFrame> {
/**
* Creates a new instance
*
* @param maxContentLength If the size of the aggregated frame exceeds this value,
* a {@link TooLongFrameException} is thrown.
*/
public WebSocketFrameAggregator(int maxContentLength) {
super(maxContentLength);
}
@Override
protected boolean isStartMessage(WebSocketFrame msg) throws Exception {
// 开始数据类型是 TextWebSocketFrame 和 BinaryWebSocketFrame
return msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame;
}
@Override
protected boolean isContentMessage(WebSocketFrame msg) throws Exception {
// 内容项数据类型是 ContinuationWebSocketFrame
return msg instanceof ContinuationWebSocketFrame;
}
@Override
protected boolean isLastContentMessage(ContinuationWebSocketFrame msg) throws Exception {
// 最后一个内容项数据, 必须是内容项数据类型,且 isFinalFragment 返回true
return isContentMessage(msg) && msg.isFinalFragment();
}
@Override
protected boolean isAggregated(WebSocketFrame msg) throws Exception {
if (msg.isFinalFragment()) {
return !isContentMessage(msg);
}
return !isStartMessage(msg) && !isContentMessage(msg);
}
@Override
protected boolean isContentLengthInvalid(WebSocketFrame start, int maxContentLength) {
return false;
}
@Override
protected Object newContinueResponse(WebSocketFrame start, int maxContentLength, ChannelPipeline pipeline) {
// 直接返回 null,不用刚开始响应数据
return null;
}
@Override
protected boolean closeAfterContinueResponse(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected boolean ignoreContentAfterContinueResponse(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected WebSocketFrame beginAggregation(WebSocketFrame start, ByteBuf content) throws Exception {
if (start instanceof TextWebSocketFrame) {
return new TextWebSocketFrame(true, start.rsv(), content);
}
if (start instanceof BinaryWebSocketFrame) {
return new BinaryWebSocketFrame(true, start.rsv(), content);
}
// Should not reach here.
throw new Error();
}
}