Netty4实战 - 编解码技术

通常我们习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。

反之,解码(Decode)称为反序列化(deserialization),它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。

Java序列化

相信大多数Java程序员接触到的第一种序列化或者编解码技术就是Java默认提供的序列化机制,需要序列化的Java对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。

其他序列化框架

Java默认的序列化机制效率很低、序列化后的码流也较大,所以涌现出了非常多的优秀的Java序列化框架,例如:hessianprotobufthriftprotostuffkryomsgpackavrofst 等等。

扩展Netty 解码器

Netty提供了 io.netty.handler.codec.MessageToByteEncoderio.netty.handler.codec.ByteToMessageDecoder接口,方便我们扩展编解码。

为了扩展序列化框架更方便,我们首先定义Serializer接口:

import java.io.IOException;

/**
 * @author Ricky Fung
 */
public interface Serializer {

    byte[] encode(Object msg) throws IOException;

    <T> T decode(byte[] buf, Class<T> type) throws IOException;
}

定义Serializer工厂:

import com.mindflow.netty4.serialization.hessian.HessianSerializer;

/**
 * @author Ricky Fung
 */
public class SerializerFactory {

    public static Serializer getSerializer(){
        return new HessianSerializer();
    }
}

接下来,我们在Netty Decoder中使用上面定义的Serializer接口,如下:

import com.mindflow.netty4.serialization.Serializer;
import com.mindflow.netty4.serialization.SerializerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * ${DESCRIPTION}
 *
 * @author Ricky Fung
 */
public class NettyMessageDecoder<T> extends LengthFieldBasedFrameDecoder {
    private Logger logger = LoggerFactory.getLogger(getClass());

    //判断传送客户端传送过来的数据是否按照协议传输,头部信息的大小应该是 byte+byte+int = 1+1+4 = 6
    private static final int HEADER_SIZE = 6;

    private Serializer serializer = SerializerFactory.getSerializer();
    private Class<T> clazz;

    public NettyMessageDecoder(Class<T> clazz, int maxFrameLength, int lengthFieldOffset,
                               int lengthFieldLength) throws IOException {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        this.clazz = clazz;
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
            throws Exception {

        if (in.readableBytes() < HEADER_SIZE) {
            return null;
        }

        in.markReaderIndex();

        //注意在读的过程中,readIndex的指针也在移动
        byte type = in.readByte();
        byte flag = in.readByte();

        int dataLength = in.readInt();

        //logger.info("read type:{}, flag:{}, length:{}", type, flag, dataLength);

        if (in.readableBytes() < dataLength) {
            logger.error("body length < {}", dataLength);
            in.resetReaderIndex();
            return null;
        }

        byte[] data = new byte[dataLength];
        in.readBytes(data);

        try{
            return serializer.decode(data, clazz);
        } catch (Exception e){
            throw new RuntimeException("serializer decode error");
        }
    }
}

NettyMessageEncoder.java

import com.mindflow.netty4.serialization.Serializer;
import com.mindflow.netty4.serialization.SerializerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ${DESCRIPTION}
 *
 * @author Ricky Fung
 */
public final class NettyMessageEncoder<T> extends
        MessageToByteEncoder {
    private Logger logger = LoggerFactory.getLogger(getClass());

    private final byte type = 0X00;
    private final byte flag = 0X0F;

    private Serializer serializer = SerializerFactory.getSerializer();
    private Class<T> clazz;
    public NettyMessageEncoder(Class<T> clazz) {
        this.clazz = clazz;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg,
                          ByteBuf out) throws Exception {

        try {
            out.writeByte(type);
            out.writeByte(flag);

            byte[] data = serializer.encode(msg);
            out.writeInt(data.length);
            out.writeBytes(data);

            //logger.info("write type:{}, flag:{}, length:{}", type, flag, data.length);
        } catch (Exception e){
            e.printStackTrace();
        }

    }
}

服务端:

import com.mindflow.netty4.serialization.model.Request;
import com.mindflow.netty4.serialization.model.Response;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;

/**
 * @author Ricky Fung
 */
public class NettyServer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    public void bind() throws Exception {
        // 配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)
                            throws IOException {
                        ch.pipeline().addLast(
                                new NettyMessageDecoder<>(Request.class,1<<20, 2, 4));
                        ch.pipeline().addLast(new NettyMessageEncoder(Response.class));
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                });

        // 绑定端口,同步等待成功
        ChannelFuture future = b.bind(Constants.HOST, Constants.PORT).sync();
        logger.info("Netty server start ok host:{}, port:{}"
                , Constants.HOST , Constants.PORT);

        future.channel().closeFuture().sync();
    }

    class NettyServerHandler extends SimpleChannelInboundHandler<Request> {

        @Override
        protected void channelRead0(ChannelHandlerContext context, Request request) throws Exception {

            logger.info("Rpc server receive request id:{}", request.getId());
            //处理请求
            processRpcRequest(context, request);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            logger.error("捕获异常", cause);
        }
    }

    private void processRpcRequest(final ChannelHandlerContext context, final Request request) {

        Response response = new Response();
        response.setId(request.getId());
        response.setResult("echo "+request.getMessage());
        context.writeAndFlush(response);
    }

    public static void main(String[] args) throws Exception {
        new NettyServer().bind();
    }

}

客户端:

import com.mindflow.netty4.serialization.model.Request;
import com.mindflow.netty4.serialization.model.Response;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

/**
 * ${DESCRIPTION}
 *
 * @author Ricky Fung
 */
public class NettyClient {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private EventLoopGroup group = new NioEventLoopGroup();

    public void connect(int port, String host) throws Exception {

        // 配置客户端NIO线程组
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    new NettyMessageDecoder<Response>(Response.class, 1024 * 1024, 2, 4));
                            ch.pipeline().addLast(new NettyMessageEncoder<Request>(Request.class));
                            ch.pipeline().addLast(new NettyClientHandler());;
                        }
                    });
            // 发起异步连接操作
            ChannelFuture future = b.connect(host, port).sync();

            if (future.awaitUninterruptibly(5000)) {
                logger.info("client connect host:{}, port:{}", host, port);
                if (future.channel().isActive()) {
                    logger.info("开始发送消息");
                    for(int i=0; i<100; i++){

                        Request req = new Request();
                        req.setId((long) i);
                        req.setMessage("hello world");

                        future.channel().writeAndFlush(req);
                    }
                    logger.info("发送消息完毕");
                }
            }

        } finally {

        }
    }

    class NettyClientHandler extends SimpleChannelInboundHandler<Response> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response msg) throws Exception {

            final Response response = msg;
            logger.info("Rpc client receive response id:{}", response.getId());
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            logger.error("捕获异常", cause);
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        new NettyClient().connect(Constants.PORT, Constants.HOST);
    }
}

参考资料

Netty系列之Netty编解码框架分析


Java深度历险(十)——Java对象序列化与RMI

源码下载

https://github.com/TiFG/netty4-in-action/tree/master/netty4-serialization-demo

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容