前两章中讲到了Dubbo的服务启动以及服务引用过程,之后的篇章开始讲解一下Dubbo的其他模块,开始在在来的基础上讲一下细节的内容。本章主要讲解Dubbo的编码解码内容。
编码(encode):我们可以粗略的把这里的编码粗略地理解为Java的序列化,即把POJO对象转化为byte数据在网络里传输。
解码(decode):我们可以粗略的把这里的解码粗略地理解为Java的反序列化,即把从网络中接收到的byte数据转化为对应的POJO对象供业务方使用。
说到Dubbo的编码解码,一定要理解的是Dubbo的数据包模型。
理解上述内容的前提是对网络有一定认识,如果对基本的网络没有概念的话请先自行查询相关资料看一下TCP/IP协议。
因为Socket的底层已经帮我们处理了IP/TCP的分包,我们不需要解析IP/TCP的包结构,处理包头信息等。但是Dubbo作为一个封装性的协议,需要自己处理一下包头,包体的内容处理,粘包也需要自行进行处理。
我们首先明确一下Dubbo的包头都有哪些成分:
- 2个字节的魔数(标示Dubbo的协议)
- 1个字节的消息标志位
- 5个比特位的序列化ID
- 1个比特位的事件类型(区分心跳还是正常的请求/响应信息)
- 1个比特表示是oneWay还是twoWay请求
- 1个比特表示是request还是response
- 1个字节的状态位(标示Request和Response的状态)
- 8个字节的消息ID(requestId/responseId)
- 4个字节的数据长度
从这里看到消息头的长度固定是16个字节,然后我们根据header中的数据长度字段就能知道真是的body长度,这样就可以解析到具体的body信息了。
明白了上面的内容,我们还要再看一下Dubbo中对于数据缓存区的处理,Dubbo在使用数据缓存区的时候用到了自己的类:ChannelBuffer(不同于Netty的ChannelBuffer),在处理编解码的过程中经常用到这个数据缓存区,所以首先对于这个类做一定介绍:
| discardable bytes | readable bytes | writable bytes |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
ChannelBuffer中主要有两个游标,readerIndex和writerIndex,基本含义就是readerIndex记录上一次读取到的数据的位置,writerIndex记录上一次写数据的终点位置,初始情况下readerIndex=wirterIndex=0,读数据和写数据的时候游标都会自动移动。下面看一下基本的操作:
- void clear(); 将readerIndex和writerIndex都设置为0,但是不清除原来的数据
- void discardReadBytes(); 将readerIndex和writerIndex对应的数据都往前移动(readerIndex个单位)
- byte getByte(int index); 获得特定位置的byte值
- void getBytes(int index, byte[] dst); 将从index开始的数据copy到dst中
- boolean readable(); 判断缓存区是否有可读取的内容
- int readableBytes(); 计算缓存区中可读取的数据长度
- byte readByte(); 在当前的游标位置读一个字节的数据
- void readBytes(byte[] dst); 从当前位置读取dst.length长度的内容到dst中
其余的write*方法跟read*方法都是对应的,不再一一介绍。
因为NettyServer和NettyClient的编码解码器是同一套,所以我们就选择NettyServer为例来讲解这个问题。
编码
编码器主要就是NettyCodecAdapter.getDecoder()对象进行的,下面还是直接撸代码:
//编码的起始位置就是这个Encoder的encode方法
private class InternalEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
//dynamicBuffer表示动态的buffer空间,扩自动扩展空间大小
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
//如果存在相关通道就直接获取,否则就新建
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
//将编码后的结果存入buffer中
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
//将编码后可读的信息重新包装后再往后传递
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}
//有dubbo的默认配置直到这里的Codec就是DubboCountCodec,所以我们接着看一下DubboCountCodec.encode()
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
//还是回到了DubboCodec
codec.encode(channel, buffer, msg);
}
//首先进入到DubboCountCode的夫类ExchangeCodec中的encode方法
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
//根据msg的类型确定是编码请求,响应或者talnet信息,然后找到对应的编码方法
//在具体的实现上因为encodeResponse跟encodeRequest差不多,所以就以encodeRequest为例往下面继续讲解
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
//dubbo自己封装了一个buffer使用,并没有使用netty的buffer,为了避免耦合
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
//默认使用Hession序列化
Serialization serialization = getSerialization(channel);
// 16个字节的header.
byte[] header = new byte[HEADER_LENGTH];
// 2个字节的魔数
Bytes.short2bytes(MAGIC, header);
// 5个bit的Serialization id
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// 8个字节的RequestId
Bytes.long2bytes(req.getId(), header, 4);
// 先对body进行编码才能确定实际的body长度
int savedWriteIndex = buffer.writerIndex();
// 控制头部的长度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
//实际上干的事情就是把body编码的数据放到buffer中,bos只是buffer的装饰者
encodeRequestData(channel, out, req.getData());
}
out.flushBuffer();
bos.flush();
bos.close();
//写到buf中实际大小
int len = bos.writtenBytes();
//默认只支持8M的最大大小
checkPayload(channel, len);
//计算实际的body长度并写到buf中
Bytes.int2bytes(len, header, 12);
//定位到原始的write节点(header的起始位置)
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header);
//更新buffer的write为body结束的位置
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
//encode方法因为不不要考虑消息的边界问题,所以比较简单
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
//序列化消息体(不包含消息头)的实际内容
RpcInvocation inv = (RpcInvocation) data;
//dubbo版本信息
out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
//接口名称
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
//服务版本
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
//接口方法名称
out.writeUTF(inv.getMethodName());
//参数类型
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
//具体的参数值
if (args != null)
for (int i = 0; i < args.length; i++){
//encodeInvocationArgument主要处理回调相关的信息
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
//附属信息
out.writeObject(inv.getAttachments());
}
编码的逻辑相对于解码来说是比较简单的,因为并不涉及处理粘包信息以及多包信息,只是纯粹的将包信息编码之后然后发出去,但是这了我们并没有提到hession的具体编码步骤,因为本身编码的过程就是一个非常深的话题,涉及到比较多的数学逻辑,也不是这次讲解的重点,所以这里只看到了dubbo在其之上有做多少内容。
解码
解码器的入口也是在NettyServer中的NettyCodecAdaptor,解码主要需要的注意地方就是:每次收到的数据包可能是不完整的,有可能收到的数据包只包含数据头,或者数据头都是不完整的,也有可能包含消息体,但是消息体是不完整的,这些都是需要考虑的事情,下面我们看一下具体的解码实现:
private class InternalDecoder extends SimpleChannelUpstreamHandler {
//内部公用的数据缓存区
private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
Object o = event.getMessage();
//编码的结果就是ChannelBuffer
if (! (o instanceof ChannelBuffer)) {
ctx.sendUpstream(event);
return;
}
ChannelBuffer input = (ChannelBuffer) o;
int readable = input.readableBytes();
//如果信息的载体,input中并没有更多的可读数据那就直接返回
if (readable <= 0) {
return;
}
com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
//buffer在初次使用的时候是空的,但是之后就有可能不为空
if (buffer.readable()) {
//每次都复用buffer,并将之前遗留的buffer与input的内容汇总起来一并合成message存起来
if (buffer instanceof DynamicChannelBuffer) {
buffer.writeBytes(input.toByteBuffer());
message = buffer;
} else {
//如果buffer不是动态的话要生成一个动态的buffer用来存放新的汇总信息
int size = buffer.readableBytes() + input.readableBytes();
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
size > bufferSize ? size : bufferSize);
message.writeBytes(buffer, buffer.readableBytes());
message.writeBytes(input.toByteBuffer());
}
} else {
//接到第一个dubbo包的话buffer还是空的,所以在这里会把读到的消息放到buffer中
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
input.toByteBuffer());
}
//经过上面的逻辑处理之后message就是新的汇总信息(上次未处理的+本次新收到的)
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
Object msg;
int saveReaderIndex;
try {
do {
saveReaderIndex = message.readerIndex();
try {
//DubboCountCodec.decode(代码见下)
msg = codec.decode(channel, message);
} catch (IOException e) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
//目前的信息解析不出来一个完成的dubbo,因此会跳出循环,直到下个数据包的到来
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//两次的readerIndex一样说明decode没有进行
if (saveReaderIndex == message.readerIndex()) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
//如果有读到解码的信息,将解码后的信息继续往下游发送
//这时候的msg就是对应的Request,Response或者心跳
if (msg != null) {
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
}
}
} while (message.readable());
} finally {
if (message.readable()) {
//清除已经读过的空间
message.discardReadBytes();
buffer = message;
} else {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
}
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
}
//DubboCountCodec.decode
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
//可能由于网络原因,一次接收到比较多的的数据包
do {
Object obj = codec.decode(channel, buffer);
//如果结果表示为需要更多的数据的话,就将原本保持的readerIndex设置回去,因为在decode的时候可能改变了readerIndex
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);
//将解码后的结果大小记录到result中
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
//表示收到的数据不完整或者本次编码结束,等待下一个数据到来
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
//ExchangeCodec.decode()
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
//如果readable小于HEADER_LENGTH,说明是一个不全的dubbo消息
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}
//ExchangeCodec.decode()
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 如果header的前两个字节不是魔数的话
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
//说明头部是完整的
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
//逐步检查buffer中是否有魔数开头的header,有的话就直接读取到header数组中
for (int i = 1; i < header.length - 1; i ++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
//处理父类telnet的解码
return super.decode(channel, buffer, readable, header);
}
// 数据不全(头部不完整)
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
//获取body长度,检查8M最大限制
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
// 数据不全(body不完整)
if( readable < tt ) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//DubboCodec.decodeBody
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
//DubboCodec.decodeBody
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 获得请求的ID
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 如果是response类型
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 获得状态字段
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
Object data;
//如果是心跳事件的话,直接通过解码器读取数据
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
//将原来的RPCResult转换为DecodeableRpcResult,转换后的DecodeableRpcResult里面的data为解码后的结果信息
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation)getRequestData(id), proto);
result.decode();
} else {
//如果解码操作不在IO线程中做的话可以到线程池中做,具体的操作就是DecodeHandler处理类,处理的步骤跟上面一样,只不过时机不同
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
//解码Response是客户端的内容,所以设置错误码为CLIENT_ERROR
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
} else {
//如果Response状态不正常的话就直接设置解码信息为错误信息
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {
//解码请求
Request req = new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
//如果请求时心跳事件的话就直接通过解码器解码
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
//如果解码操作不在IO线程中做的话可以到线程池中做,具体的操作就是DecodeHandler处理类,处理的步骤跟上面一样,只不过时机不同
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
//DecodeableRpcResult.decode()
public Object decode(Channel channel, InputStream input) throws IOException {
//根据serializationType获得对应的Hession序列化类
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
//这个flag的读取需要深入到Hession2的内部理解
//这里的in其实就是封装类具体的序列化框架装饰者
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
break;
case DubboCodec.RESPONSE_VALUE:
try {
//返回结果:Type[]{method.getReturnType(), method.getGenericReturnType()}
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " + obj);
setException((Throwable) obj);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
}
return this;
}
//DecodeableRpcInvocation.decode
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
//这里的读取顺序不能变,一定要跟编码的顺序保持一致,否则信息将错乱
//依次从缓存区读取dubbo版本信息,path和version
setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
setAttachment(Constants.PATH_KEY, in.readUTF());
setAttachment(Constants.VERSION_KEY, in.readUTF());
//读取dubbo的方法
setMethodName(in.readUTF());
try {
Object[] args;
Class<?>[] pts;
//依次构建方法参数类型即实际方法参数
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
setParameterTypes(pts);
//读取attachments信息并且设置到DecodeableRpcInvocation
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
attachment.putAll(map);
setAttachments(attachment);
}
//decode argument ,may be callback
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
setArguments(args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
}
return this;
}
到此,dubbo的编码解码都已经涉及到了。我们从中可以感受到设计的巧妙之处,我觉得最巧妙的地方就是:不会多处理无用的内容,例如在序列化的时候只是将request对应的数据内容(方法信息,版本信息,必要的attachments等)进行编码然后传输,而不是直接request对象直接作为序列化的对象进行序列化,这么做既较少了传输的内容,还削减了序列化的难度(最起码不用考虑request自身的内容)。在解码的时候通过requestId重新构建出对应的Request信息,然后从网络中将接收到的信息一一进行反序列化之后重新塞到新构建的Request对象中。正是因为序列化和反序化都是耗时操作,所以能减轻的话就减轻其操作步骤,这里还是十分科学的,不由得感叹:工程师对于架构的把控力果然牛逼!
插播一条花边新闻:关于request和response的映射都是通过ID来做的,并在创建ID时候会在一个全局的FUTURE里面存入信息(我们可以把FUTURE理解为一个redis),在创建DefaultFuture的时候将其放入redis(请求的处理生命周期内),然后在之后的任何时间都可以随时拿到这个请求ID(在请求中带了request ID,还记得吗?),然后在客户端收到Response的时候通过ID找到Future,再把结果设置到Future中,这样就完成了同步和异步的转换,是不是很牛叉。
今天的内容就这样子了,撒花~~~啦啦啦~~~