最近在使用netty的时候,踩了几个小坑,所以专门看了一下编码的原理。
首先,编码之后,netty是如何将编码的结果写进去的。
究竟是先写入,还是先编码?另外如果不编码直接写入是否存在什么问题?
带着这些问题,先看一下源码。
这里首先要看一个类MessageToByteEncoder
先看看MessageToByteEncoder的结构
先来说说这些类分别的作用吧。
ChannelHandler
ChannelHandler可以将相关的ctx加入到整个channelHandler对应的channel的channelPipeline中,
同理,也可以删除。
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
ChannelHandlerAdapter
ChannelHandlerAdapter主要的功能有以下。
1.对异常的捕获注意做处理(进行传播)
2.判断一个channelHandler能不能被多个channel使用。
public abstract class ChannelHandlerAdapter implements ChannelHandler {
boolean added;
protected void ensureNotSharable() {
if (isSharable()) {
throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared");
}
}
public boolean isSharable() {
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Skip
@Override
@Deprecated
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
ChannelOutboundHandler
功能很多,都是主动的操作。
比如读,写,连接,断开连接等等。。。
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
ChannelOutboundHandlerAdapter
说白了就是利用ctx去完成outbound的操作。
主要是对ChannelOutboundHandler进行实现,代码如下
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.close(promise);
}
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
通过上述,基本可以确定MessageToByteEncoder具备的功能了。
具有操作outbound事件的功能,可以读,写,连接,绑定服务端,断开连接,传播异常等等。
netty如何写入数据,在前面说到,ctx如果没有覆写write方法,那么write方法就是用父类中的公共实现,AbstractChannelHandlerContext中的write方法,主要的功能就是用于传播写这个事件。
由于我们对写入的数据要进行编码,所以必然不能只能是单纯的传播,因此MessageToByteEncoder也对write方法进行了覆写。
下面来看看MessageToByteEncoder的write方法的实现。
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
暴露给我们去实现的一个方法,可以自定义编码格式。
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
通过内存管理获取bytebuf。
buf = allocateBuffer(ctx, cast, preferDirect);
try {
进行编码,将编码后的值写入bytebuf,这个时候Bytebuf里面的值就是我们编码过的数据了。再进行传播
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
这个时候调用ctx的write方法,最重要的是用于传播,直到最后的headContext,headContext覆写了write方法,将数据通过Unsafe写入。
ctx.write(buf, promise);
} else {
buf.release();
这个时候调用ctx的write方法,最重要的是用于传播,直到最后的headContext,headContext覆写了write方法,将数据通过Unsafe写入。
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
这个时候调用ctx的write方法,最重要的是用于传播,直到最后的headContext,headContext覆写了write方法,将数据通过Unsafe写入。
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
}
如何使用
1.先构造一个编码器
public class CustomEncoder extends MessageToByteEncoder<CustomSystemInfo>{
@Override
protected void encode(ChannelHandlerContext ctx, CustomSystemInfo msg, ByteBuf out) throws Exception {
String json = JSONObject.toJSONString(msg);
out.writeBytes(json.getBytes());
}
}
2.将编码器添加到Channel的pipeline中去即可。
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new CustomEncoder());
}
});
ChannelFuture cf = null;
try {
cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
需要避免的问题
1.不能直接把一个对象在没有经过编码的情况下,写入。netty只会告诉你写入失败,但是并不会抛异常,因为被捕获处理了。
例子如下
对应的channelHandler
public class WriteChannelHandler extends ChannelDuplexHandler {
private static int delay = 0;
private static long peroid = 10;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// channel一旦建立连接,开始监控系统状态
CollectDataService collectDataService = new CollectDataServiceImpl();
ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
CustomSystemInfo systemInfo = collectDataService.collectData();
System.out.println(systemInfo);
ctx.writeAndFlush(systemInfo).addListener(new GenericFutureListener<ChannelPromise>() {
@Override
public void operationComplete(ChannelPromise future) throws Exception {
if (future.isDone()) {
log.info("send systeminfo data done");
if(future.isSuccess()) {
log.info("send systeminfo data success");
return;
}
log.info("send systeminfo data fail");
return;
}
log.info("send systeminfo data undone");
}
});
}, delay, peroid, TimeUnit.SECONDS);
//继续传播
super.channelActive(ctx);
}
}
对应的客户端
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new WriteChannelHandler());
}
});
ChannelFuture cf = null;
try {
cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
运行结果
netty对异常信息已经做了处理,捕获之后并没有将该数据写入,看回调可以看出来写入失败了。
CustomSystemInfo(host=null, port=null, useMemory=15514451968, freeMemory=18732584960, cupUseRate=12188, io=null, jvmInfo=JvmInfo(useMemory=null, freeMemory=292896200, maxMemory=7611613184, totalMemory=398983168, jdkVersion=1.8.0_181))
2021-05-20 14:33:07.925 INFO 10436 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler : send systeminfo data done
2021-05-20 14:33:07.925 INFO 10436 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler : send systeminfo data fail
如何解决上述问题呢?
添加编码器即可。
编码器代码
public class CustomEncoder extends MessageToByteEncoder<CustomSystemInfo>{
@Override
protected void encode(ChannelHandlerContext ctx, CustomSystemInfo msg, ByteBuf out) throws Exception {
String json = JSONObject.toJSONString(msg);
out.writeBytes(json.getBytes());
}
}
客户端代码
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new CustomEncoder());
ch.pipeline().addLast(new WriteChannelHandler());
}
});
ChannelFuture cf = null;
try {
cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
运行结果
从结果来看是发送成功了
CustomSystemInfo(host=null, port=null, useMemory=15517786112, freeMemory=18729250816, cupUseRate=12187, io=null, jvmInfo=JvmInfo(useMemory=null, freeMemory=295960008, maxMemory=7611613184, totalMemory=403177472, jdkVersion=1.8.0_181))
2021-05-20 14:39:10.193 INFO 12448 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler : send systeminfo data done
2021-05-20 14:39:10.193 INFO 12448 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler : send systeminfo data success
2.编码器的处理器要排在写入数据的处理器之后。即channelHandler的执行顺序应该是先write,后encode。由于write是outbound的事件,Outbound事件是逆序执行的,所以channelHandler的添加顺序是先encode的channelhandler,再到write的channelhandler.
代码如下,这段代码会执行失败
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//ch.pipeline().addLast(new CustomEncoder());
//ch.pipeline().addLast(new WriteChannelHandler());
//加入的顺序跟原先的代码相反,这个时候就会导致写入失败
ch.pipeline().addLast(new WriteChannelHandler());
ch.pipeline().addLast(new CustomEncoder());
}
});
ChannelFuture cf = null;
try {
cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
运行结果
果然是失败的,为什么呢?
CustomSystemInfo(host=null, port=null, useMemory=15551770624, freeMemory=18695266304, cupUseRate=12376, io=null, jvmInfo=JvmInfo(useMemory=null, freeMemory=361530200, maxMemory=7611613184, totalMemory=400031744, jdkVersion=1.8.0_181))
2021-05-20 14:47:38.167 INFO 20692 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler : send systeminfo data done
2021-05-20 14:47:38.167 INFO 20692 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler : send systeminfo data fail
如何解决问题
很简单,将channelHandler换个顺序,这样子就没问题了。
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new CustomEncoder());
ch.pipeline().addLast(new WriteChannelHandler());
}
});
ChannelFuture cf = null;
try {
cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
原因
首先,写入是outbound事件。outbound事件是逆序执行的。
上面的代码的channelPipeline中的顺序应该是 如下:
headContext --> WriteChannelHandler的ctx --> CustomEncoder的ctx ---> tailContext。
如果是inbound事件,执行顺序当然会如上,从headContext开始。
但是write是outbound事件,执行顺序就相反了。顺序如下,由于是从WriteChannelHandler发起的,所以写入之后直接往下传播,就到headContext了,而我们添加的编码器排在WriteChannelHandler的后面,所以并不会被执行到。执行顺序如下:
headContext <-- WriteChannelHandler的ctx(发起写事件)
从执行顺序可以看出,CustomEncoder并没有在传播的路径中,所以自然没有编码。
我们将顺序置换一下。
ch.pipeline().addLast(new CustomEncoder());
ch.pipeline().addLast(new WriteChannelHandler());
当WriteChannelHandler发起写入的时候,执行顺序如下