前面讲到了编码,那netty是怎么进行解码的?
其实也是类似的,既然有MessageToByteEncoder,也会有ByteToMessageDecoder.
所以重点还是在ByteToMessageDecoder这个类。
先看看这个类的结构体。
其实结构与MessageToByteEncoder类似。唯一的区别只是,ByteToMessageDecoder实现的是ChannelInboundHandler。
所以直接看一下ByteToMessageDecoder的源码吧。
为什么重点说到这个类的channelRead方法呢?
在前面说到netty是怎么样读数据的,最后ctx都会强转成ChannelInboundHandler,随后调用其channelRead方法。
我们自定义的解码器,由于继承了ByteToMessageDecoder(ByteToMessageDecoder是ChannelInboundHandler的子类)这个类,而这个类对于channelRead方法进行了覆写。
而ChannelInboundHandlerAdapter对于channelRead的实现,只是简单的做传播。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
ByteBuf cumulation;
private boolean first;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
这其实是一个自定义的list
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
如果cumulation为空,则说明是第一次读取数据。
first = cumulation == null;
if (first) {
如果是第一次读取数据的话,cumulation就是当前传入的数据
cumulation = data;
} else {
如果不是第一次读数据的话,要对之前的数据,以及本次读取到的数据进行合并。
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
合并完数据后,进行解码。
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
数据已经读完,将cumulation置空
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
若干次之后,还还没有读完数据,就会舍弃一些数据。
清空cumulation的数据
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
最后进行传播。out是最后反序列化出来的对象集合,通过遍历的方式去传播。
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
}
看完上述代码,估计流程也大概清楚了。
无非就是以下几步。
1.查看数据是否需要合并,如果需要合并则进行合并。
2.将数据进行反序列化,添加到out这个输出的集合中。
3.如果数据已经读完,再cumulation重置,如果若干次了,数据还木有读完,就会放弃一些数据。
4.最后就是将out这个输出的集合对象进行传播。一个对象就好像一个完整的数据包,所以以数据包为单位进行遍历传播。
如何进行数据合并
重点是在于以下这句代码
cumulator.cumulate(ctx.alloc(), cumulation, data);
下面来看看这个对象是如何生成的。
public interface Cumulator {
/**
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
*/
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
final ByteBuf buffer;
其实转化一下 cumulation.writerIndex() + in.readableBytes() > cumulation的容量,则说明cumulation需要扩容了。
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
否则不需要扩容
buffer = cumulation;
}
将in里面的字节数据读取出来,并且写入到cumulation中
buffer.writeBytes(in);
return buffer;
} finally {
避免In没有被回收(切断引用,放到netty自己的回收站中),需要释放,重复利用。
in.release();
}
}
};
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
重新分配一个新的bytebuf
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
写入数据,将旧的释放
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
}
数据合并完之后,就是如何序列化了
合并完数据之后,有一个方法callDecode,这个方法看名字就是进行反序列化的功能,具体还是看看代码。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
如果outSize已经大于0,说明已经有反序列的对象在集合里面了,说明其实数据包已经完整,
所以对read事件发起传播,让各个ctx去处理read事件
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
这里才是反序列化的实现,反序列化之后会将对象放到out集合中
========================================================================
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
如果一开始out的大小,跟反序列化完之后一样,读数据的过程中bytebuf的readIndex会一直移动。
if (outSize == out.size()) {
不是一个完整的数据包,不进行读取。跳出循环,直到数据包完整才进行读取。
if (oldInputLength == in.readableBytes()) {
break;
} else {
说明读出数据了,只是没有解析出对象
continue;
}
}
如果最后还是一样,则说明没有任何数据可读,抛出异常
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
}
从上面例子,大概看得出整个反序列化的过程,其实就是把数据一点一点读出来,直到读完数据,在读数据的过程中,会将字节数据反序列,添加到out的集合中。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
这个方法就是留给我们自己去实现的方法。
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
}
所以怎么用呢?
自己定义一个解码器,将字节反序列后,加入到输出集合即可。
其实这个方法有问题,应该要判断是不是一个完整的数据包,再决定要不要读取,而不是直接读取。以上只是一个使用的例子。
public class CustomDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readbytes = in.readableBytes();
byte[] bytes = new byte[readbytes];
in.readBytes(bytes);
String json = new String(bytes);
CustomSystemInfo csi = JSONObject.parseObject(json, CustomSystemInfo.class);
out.add(csi);
}
}
这里需要注意的点,就是解码器,应该排在需要去处理业务对象的channelHandler之前,否则处理业务数据的channelHandler拿到的则是未经过反序列的bytebuf。直接转成业务对象就会出现类型转化错误。
以上就是netty反序列化的原理。