netty通信的简单demo

seata中使用netty完成了TM、RM与TC之间的通信,若不熟悉netty的语法,那么阅读seata的源码是比较困难的,本文实现了一个简单的netty通信的demo。来入门netty通信。

1. 引入依赖

<dependencies>
 <!-- Netty -->
 <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.30.Final</version>
 </dependency>

 <dependency>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-log4j12</artifactId>
     <version>1.7.25</version>
 </dependency>

 <!-- Protostuff -->
 <dependency>
     <groupId>com.dyuproject.protostuff</groupId>
     <artifactId>protostuff-core</artifactId>
     <version>1.0.9</version>
 </dependency>

 <dependency>
     <groupId>com.dyuproject.protostuff</groupId>
     <artifactId>protostuff-runtime</artifactId>
     <version>1.0.9</version>
 </dependency>

 <!-- Objenesis -->
 <dependency>
     <groupId>org.objenesis</groupId>
     <artifactId>objenesis</artifactId>
     <version>2.1</version>
 </dependency>

 <!-- fastjson -->
 <dependency>
     <groupId>com.alibaba</groupId>
     <artifactId>fastjson</artifactId>
     <version>1.2.38</version>
 </dependency>
</dependencies>

2. 定义基类对象

请求对象:

/**
 * 请求对象体
 */
public class Request {
    private String requestId;

    private Object parameter;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Object getParameter() {
        return parameter;
    }

    public void setParameter(Object parameter) {
        this.parameter = parameter;
    }
}

响应对象:

/**
 * 响应对象体
 */
public class Response {
    private String requestId;

    private Object result;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}

3. 自定义传输协议

netty可以自定义传输协议,实现如下列代码

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 请求的编码格式
 * 自定义传输协议,length、data
 */
public class RpcEncoder extends MessageToByteEncoder {

    //目标对象类型进行编码
    private Class<?> target;

    public RpcEncoder(Class target) {
        this.target = target;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        if (target.isInstance(msg)) {
            byte[] data = JSON.toJSONBytes(msg);    // 使用fastJson将对象转换为byte
            out.writeInt(data.length);  // 先将消息长度写入,也就是消息头
            out.writeBytes(data);   // 消息体中包含我们要发送的数据
        }
    }

}

/**
 * 请求的解码逻辑(协议)
 */
public class RpcDecoder extends ByteToMessageDecoder {

    // 目标对象类型进行解码
    private Class<?> target;

    public RpcDecoder(Class target) {
        this.target = target;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 不够长度丢弃
        if (in.readableBytes() < 4) {
            return;
        }
        // 标记一下当前的readIndex的位置
        in.markReaderIndex();
        // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
        int dataLength = in.readInt();
        // 读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = JSON.parseObject(data, target);    // 将byte数据转化为我们需要的对象
        out.add(obj);
    }
}

4. 定义消息处理器

channl收到消息后,经过decode后需要交由消息处理器进行处理:

/**
 * 消息处理器
 */
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;

        log.info("=========》》》Client Data:" + JSON.toJSONString(request));

        Response response = new Response();
        response.setRequestId(request.getRequestId());
        response.setResult("Hello Client !");

        // client接收到信息后主动关闭掉连接
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

注意:使用ctx.writeAndFlush(response)完成响应消息的发送。

5. 定义netty服务器端

/**
 * netty的服务端
 */
public class NettyServer {

    private String ip;
    private int port;

    public NettyServer(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void server() throws Exception {

        /**
         * Server端的EventLoopGroup分为两个
         * workerGroup作为处理请求,
         * bossGroup作为接收请求。
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            final ServerBootstrap serverBootstrap = new ServerBootstrap();
            /**
             * ChannelOption四个常量作为TCP连接中的属性。
             *
             *
             */
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                    .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
                                    .addLast(new RpcEncoder(Response.class));

                            //注册消息处理器
                            socketChannel.pipeline().addLast(new NettyServerHandler());
//                                    .addLast(new NettyServerHandler());
                        }
                    });

            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);  // 开启长连接

            ChannelFuture future = serverBootstrap.bind(ip, port).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new NettyServer("127.0.0.1", 20000).server();
    }
}

启动该类后,server便一直监听20000端口的消息。

6. 定义netty客户端

实现了SimpleChannelInboundHandler<Response>类,重写了channelRead0方法,即消息处理器。

/**
 * netty的客户端,并实现了SimpleChannelInboundHandler接口,即也可以处理消息。
 */
public class NettyClient extends SimpleChannelInboundHandler<Response> {
    private final String ip;
    private final int port;
    private Response response;

    public NettyClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
        this.response = response;
    }

    public Response client(Request request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {

            // 创建并初始化 Netty 客户端 Bootstrap 对象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();

                    pipeline.addLast(new RpcDecoder(Response.class));
                    pipeline.addLast(new RpcEncoder(Request.class));
                    pipeline.addLast(NettyClient.this);
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            // 连接 RPC 服务器
            ChannelFuture future = bootstrap.connect(ip, port).sync();

            // 写入 RPC 请求数据并关闭连接
            Channel channel = future.channel();

            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();

            return response;
        } finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        Request request = new Request();
        request.setRequestId(UUID.randomUUID().toString());
        request.setParameter("Hello Server !");
        System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 20000).client(request)));
    }
}

文章参考

使用Java搭建一个简单的Netty通信例子

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容

  • Netty框架分析 Netty简介 Netty[https://netty.io/] 是一个广受欢迎的异步事件驱动...
    彭阿三阅读 413评论 0 1
  • 包含的重点内容:JAVA基础JVM 知识开源框架知识操作系统多线程TCP 与 HTTP架构设计与分布式算法数据库知...
    消失er阅读 4,321评论 1 10
  • TCP 与 HTTP http1.0 和 http1.1 有什么区别。 HTTP 1.0主要有以下几点变化: 请求...
    冷枫abc阅读 472评论 0 0
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 125,027评论 2 7
  • 16宿命:用概率思维提高你的胜算 以前的我是风险厌恶者,不喜欢去冒险,但是人生放弃了冒险,也就放弃了无数的可能。 ...
    yichen大刀阅读 6,050评论 0 4