分析聚合Request对象的过程
类图
从继承关系可以看出,该类继承了几个抽象类,其中
MessageAggregator、MessageToMessageDecoderd
是Netty为了解码其他协议而抽出来的公共实现类,其代码并不是为了专门解析http协议而写
先从顶层的代码往下分析
首先看抽象类 MessageToMessageDecoderd
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//这个对象作用就是一个List,是Netty自己封装的
CodecOutputList out = CodecOutputList.newInstance();
try {
//判断是否是正常的msg 子类实现该方法,这里是由MessageAggregator实现
//判断的代码是 (isContentMessage(in) || isStartMessage(in)) && !isAggregated(in)
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
//子类实现解码过程
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
int size = out.size();
//注意最后这个for循环,当size大于0的时候,就会触发执行下一个handler的方法,
// 所以这里需要看子类的decode实现,看看什么条件下list里才会被添加上对象,一旦被添加上对象,即聚合结束
// 其实size=1就结束聚合
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
out.recycle();
}
}
现在来看下MessageAggregator的decode()
方法的实现,注意currentMessage是该handler的成员变量,每一个channel对应一个handler实例,这个currentMessage会存储多次decode迭代的结果,这是理解数据流转过程的一个关键
@Override
protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
//看是否是起始信息,即头信息 子类HttpObjectAggregator实现该方法,当msg是HttpMessage(消息行消息头)对象时返回true
if (isStartMessage(msg)) {
handlingOversizedMessage = false;
//刚刚开始解析消息头就有数据了,则不正常
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
throw new MessageAggregationException();
}
@SuppressWarnings("unchecked")
S m = (S) msg;
// Send the continue response if necessary (e.g. 'Expect: 100-continue' header)
// Check before content length. Failing an expectation may result in a different response being sent.
Object continueResponse = newContinueResponse(m, maxContentLength, ctx.pipeline());
//这里还不太懂是干嘛,不过这里段逻辑也不是很重要,跳过
if (continueResponse != null) {
// Cache the write listener for reuse.
ChannelFutureListener listener = continueResponseWriteListener;
if (listener == null) {
continueResponseWriteListener = listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ctx.fireExceptionCaught(future.cause());
}
}
};
}
// Make sure to call this before writing, otherwise reference counts may be invalid.
boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);
final ChannelFuture future = ctx.writeAndFlush(continueResponse).addListener(listener);
if (closeAfterWrite) {
future.addListener(ChannelFutureListener.CLOSE);
return;
}
if (handlingOversizedMessage) {
return;
}
} else if (isContentLengthInvalid(m, maxContentLength)) {
// if content length is set, preemptively close if it's too large
invokeHandleOversizedMessage(ctx, m);
return;
}
//这里大概是判断解码上一层handler解码是否成功吧,这里还不好推断解码失败的情况是什么样
if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
O aggregated;
if (m instanceof ByteBufHolder) {
aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
} else {
aggregated = beginAggregation(m, EMPTY_BUFFER);
}
finishAggregation(aggregated);
out.add(aggregated);
return;
}
//这里是初始化一个组合buffer
// A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);
//HttpMessage 没有实现该类
if (m instanceof ByteBufHolder) {
appendPartialContent(content, ((ByteBufHolder) m).content());
}
//获得一个AggregatedFullHttpRequest,并且把把头信息赋值进去
//第一次头信息解析结束
//给currentMessage赋值
currentMessage = beginAggregation(m, content);
} else if (isContentMessage(msg)) {//第二次ChannelRead()读取的数据走到这里,这里应该就是消息体对象,即DefaultHttpContent
if (currentMessage == null) {
// it is possible that a TooLongFrameException was already thrown but we can still discard data
// until the begging of the next request/response.
return;
}
//把上次解析中暂存的buffer取出,当第一次解析消息体的时候,该content是空的
// Merge the received chunk into the content of the current message.
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
@SuppressWarnings("unchecked")
final C m = (C) msg;
// Handle oversized message.
if (content.readableBytes() > maxContentLength - m.content().readableBytes()) {
// By convention, full message type extends first message type.
@SuppressWarnings("unchecked")
S s = (S) currentMessage;
invokeHandleOversizedMessage(ctx, s);
return;
}
//把新读取的buffer合并到旧的content中
//content是currentMessage对象的一个属性,这里等于同时更新了currentMessage里buffer内容
// Append the content of the chunk.
appendPartialContent(content, m.content());
//truncked协议会用到该方法,参见HttpObjectAggregator的实现
// Give the subtypes a chance to merge additional information such as trailing headers.
aggregate(currentMessage, m);
//下面的逻辑是判断消息是否发送完毕,如果isLastContentMessage返回true,则表示消息体已经读取完了
//msg是LastHttpContent对象,则表示到了消息尾
final boolean last;
if (m instanceof DecoderResultProvider) {
DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
if (!decoderResult.isSuccess()) {
if (currentMessage instanceof DecoderResultProvider) {
((DecoderResultProvider) currentMessage).setDecoderResult(
DecoderResult.failure(decoderResult.cause()));
}
last = true;
} else {
last = isLastContentMessage(m);
}
} else {
last = isLastContentMessage(m);
}
if (last) {
//聚合结束,把content-length添加到header中,值就是消息体的长度
finishAggregation(currentMessage);
//已经是最后一个消息了,则把AggregatedFullHttpRequest对象添加到List进去
//看到这里就已经拟清楚上一个抽象类写的逻辑了,可以再回看下上一个抽象类的逻辑
// All done
out.add(currentMessage);
currentMessage = null;
}
} else {
throw new MessageAggregationException();
}
}
贴一下HttpObjectAggregator
实现的几个方法
@Override
protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception {
assert !(start instanceof FullHttpMessage);
HttpUtil.setTransferEncodingChunked(start, false);
AggregatedFullHttpMessage ret;
if (start instanceof HttpRequest) {
ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);
} else if (start instanceof HttpResponse) {
ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);
} else {
throw new Error();
}
return ret;
}
@Override
protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception {
if (content instanceof LastHttpContent) {
// Merge trailing headers into the message.
((AggregatedFullHttpMessage) aggregated).setTrailingHeaders(((LastHttpContent) content).trailingHeaders());
}
}
@Override
protected void finishAggregation(FullHttpMessage aggregated) throws Exception {
// Set the 'Content-Length' header. If one isn't already set.
// This is important as HEAD responses will use a 'Content-Length' header which
// does not match the actual body, but the number of bytes that would be
// transmitted if a GET would have been used.
//
// See rfc2616 14.13 Content-Length
if (!HttpUtil.isContentLengthSet(aggregated)) {
aggregated.headers().set(
CONTENT_LENGTH,
String.valueOf(aggregated.content().readableBytes()));
}
}