(九)编码的原理

最近在使用netty的时候,踩了几个小坑,所以专门看了一下编码的原理。
首先,编码之后,netty是如何将编码的结果写进去的。
究竟是先写入,还是先编码?另外如果不编码直接写入是否存在什么问题?
带着这些问题,先看一下源码。

这里首先要看一个类MessageToByteEncoder
先看看MessageToByteEncoder的结构


MessageToByteEncoder结构图

先来说说这些类分别的作用吧。

ChannelHandler

ChannelHandler可以将相关的ctx加入到整个channelHandler对应的channel的channelPipeline中,
同理,也可以删除。

public interface ChannelHandler {
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

ChannelHandlerAdapter

ChannelHandlerAdapter主要的功能有以下。
1.对异常的捕获注意做处理(进行传播)
2.判断一个channelHandler能不能被多个channel使用。

public abstract class ChannelHandlerAdapter implements ChannelHandler {
    boolean added;
    protected void ensureNotSharable() {
        if (isSharable()) {
            throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared");
        }
    }

    public boolean isSharable() {
        Class<?> clazz = getClass();
        Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);
        }
        return sharable;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // NOOP
    }

    @Skip
    @Override
    @Deprecated
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

ChannelOutboundHandler

功能很多,都是主动的操作。
比如读,写,连接,断开连接等等。。。

public interface ChannelOutboundHandler extends ChannelHandler {
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void read(ChannelHandlerContext ctx) throws Exception;
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    void flush(ChannelHandlerContext ctx) throws Exception;
}

ChannelOutboundHandlerAdapter

说白了就是利用ctx去完成outbound的操作。
主要是对ChannelOutboundHandler进行实现,代码如下

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    @Skip
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    @Skip
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.disconnect(promise);
    }

    @Skip
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.close(promise);
    }

    @Skip
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    @Skip
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    @Skip
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

通过上述,基本可以确定MessageToByteEncoder具备的功能了。
具有操作outbound事件的功能,可以读,写,连接,绑定服务端,断开连接,传播异常等等。
netty如何写入数据,在前面说到,ctx如果没有覆写write方法,那么write方法就是用父类中的公共实现,AbstractChannelHandlerContext中的write方法,主要的功能就是用于传播写这个事件。
由于我们对写入的数据要进行编码,所以必然不能只能是单纯的传播,因此MessageToByteEncoder也对write方法进行了覆写。

下面来看看MessageToByteEncoder的write方法的实现。

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    
    暴露给我们去实现的一个方法,可以自定义编码格式。
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                通过内存管理获取bytebuf。
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    进行编码,将编码后的值写入bytebuf,这个时候Bytebuf里面的值就是我们编码过的数据了。再进行传播
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    这个时候调用ctx的write方法,最重要的是用于传播,直到最后的headContext,headContext覆写了write方法,将数据通过Unsafe写入。
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    这个时候调用ctx的write方法,最重要的是用于传播,直到最后的headContext,headContext覆写了write方法,将数据通过Unsafe写入。
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                这个时候调用ctx的write方法,最重要的是用于传播,直到最后的headContext,headContext覆写了write方法,将数据通过Unsafe写入。           
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
}

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            unsafe.write(msg, promise);
        }

    }

如何使用

1.先构造一个编码器

public class CustomEncoder extends MessageToByteEncoder<CustomSystemInfo>{

    @Override
    protected void encode(ChannelHandlerContext ctx, CustomSystemInfo msg, ByteBuf out) throws Exception {
        String json = JSONObject.toJSONString(msg);
        out.writeBytes(json.getBytes());
    }
}

2.将编码器添加到Channel的pipeline中去即可。

EventLoopGroup work = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new CustomEncoder());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }

需要避免的问题

1.不能直接把一个对象在没有经过编码的情况下,写入。netty只会告诉你写入失败,但是并不会抛异常,因为被捕获处理了。

例子如下

对应的channelHandler
public class WriteChannelHandler extends ChannelDuplexHandler {
    private static int delay = 0;
    private static long peroid = 10;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // channel一旦建立连接,开始监控系统状态
        CollectDataService collectDataService = new CollectDataServiceImpl();
        ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
            CustomSystemInfo systemInfo = collectDataService.collectData();
            System.out.println(systemInfo);
            ctx.writeAndFlush(systemInfo).addListener(new GenericFutureListener<ChannelPromise>() {
                @Override
                public void operationComplete(ChannelPromise future) throws Exception {
                    if (future.isDone()) {
                        log.info("send systeminfo data done");
                        if(future.isSuccess()) {
                            log.info("send systeminfo data success");
                            return;
                        }
                        log.info("send systeminfo data fail");
                        return;
                    }
                    log.info("send systeminfo data undone");
                }
            });
        }, delay, peroid, TimeUnit.SECONDS);
        //继续传播
        super.channelActive(ctx);
    }
}
对应的客户端
        EventLoopGroup work = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new WriteChannelHandler());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
运行结果

netty对异常信息已经做了处理,捕获之后并没有将该数据写入,看回调可以看出来写入失败了。

CustomSystemInfo(host=null, port=null, useMemory=15514451968, freeMemory=18732584960, cupUseRate=12188, io=null, jvmInfo=JvmInfo(useMemory=null, freeMemory=292896200, maxMemory=7611613184, totalMemory=398983168, jdkVersion=1.8.0_181))
2021-05-20 14:33:07.925  INFO 10436 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler     : send systeminfo data done
2021-05-20 14:33:07.925  INFO 10436 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler     : send systeminfo data fail

如何解决上述问题呢?

添加编码器即可。

编码器代码
public class CustomEncoder extends MessageToByteEncoder<CustomSystemInfo>{

    @Override
    protected void encode(ChannelHandlerContext ctx, CustomSystemInfo msg, ByteBuf out) throws Exception {
        String json = JSONObject.toJSONString(msg);
        out.writeBytes(json.getBytes());
    }
}
客户端代码
        EventLoopGroup work = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new CustomEncoder());
                        ch.pipeline().addLast(new WriteChannelHandler());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
运行结果

从结果来看是发送成功了

CustomSystemInfo(host=null, port=null, useMemory=15517786112, freeMemory=18729250816, cupUseRate=12187, io=null, jvmInfo=JvmInfo(useMemory=null, freeMemory=295960008, maxMemory=7611613184, totalMemory=403177472, jdkVersion=1.8.0_181))
2021-05-20 14:39:10.193  INFO 12448 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler     : send systeminfo data done
2021-05-20 14:39:10.193  INFO 12448 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler     : send systeminfo data success

2.编码器的处理器要排在写入数据的处理器之后。即channelHandler的执行顺序应该是先write,后encode。由于write是outbound的事件,Outbound事件是逆序执行的,所以channelHandler的添加顺序是先encode的channelhandler,再到write的channelhandler.

代码如下,这段代码会执行失败

        EventLoopGroup work = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //ch.pipeline().addLast(new CustomEncoder());
                        //ch.pipeline().addLast(new WriteChannelHandler());
                        //加入的顺序跟原先的代码相反,这个时候就会导致写入失败
                        ch.pipeline().addLast(new WriteChannelHandler());
                        ch.pipeline().addLast(new CustomEncoder());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
运行结果

果然是失败的,为什么呢?

CustomSystemInfo(host=null, port=null, useMemory=15551770624, freeMemory=18695266304, cupUseRate=12376, io=null, jvmInfo=JvmInfo(useMemory=null, freeMemory=361530200, maxMemory=7611613184, totalMemory=400031744, jdkVersion=1.8.0_181))
2021-05-20 14:47:38.167  INFO 20692 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler     : send systeminfo data done
2021-05-20 14:47:38.167  INFO 20692 --- [ntLoopGroup-2-1] c.channelhandler.WriteChannelHandler     : send systeminfo data fail
如何解决问题

很简单,将channelHandler换个顺序,这样子就没问题了。

 EventLoopGroup work = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(work).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new CustomEncoder());
                        ch.pipeline().addLast(new WriteChannelHandler());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = b.connect(cdscp.getHost(), cdscp.getPort()).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
原因

首先,写入是outbound事件。outbound事件是逆序执行的。
上面的代码的channelPipeline中的顺序应该是 如下:

headContext --> WriteChannelHandler的ctx --> CustomEncoder的ctx ---> tailContext。

如果是inbound事件,执行顺序当然会如上,从headContext开始。
但是write是outbound事件,执行顺序就相反了。顺序如下,由于是从WriteChannelHandler发起的,所以写入之后直接往下传播,就到headContext了,而我们添加的编码器排在WriteChannelHandler的后面,所以并不会被执行到。执行顺序如下:

headContext <-- WriteChannelHandler的ctx(发起写事件)

从执行顺序可以看出,CustomEncoder并没有在传播的路径中,所以自然没有编码。

我们将顺序置换一下。

                        ch.pipeline().addLast(new CustomEncoder());
                        ch.pipeline().addLast(new WriteChannelHandler());

当WriteChannelHandler发起写入的时候,执行顺序如下

headContext(最后写入) <-- CustomEncoder的ctx(将对象编码,写入bytebuf中) <--WriteChannelHandler的ctx(发起写入)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容