netty搭建tcp服务器通信(解决粘包问题)

最近做的项目有需求跟硬件通信,使用tcp实现长连接,协议自己规定,于是后端决定选用netty来作为tcp服务器,这里简单说一下netty的工作流程。外部的数据传入netty服务器中,netty首先通过解码器对数据进行一次预处理(比如把字节转为字符串或对象来方便操作),接着把预处理后的数据转发给处理器,在处理器中执行业务逻辑,最后如果有必要返回数据给连接者,可以通过netty提供的channel发送。

  • netty—>decode—>handler

首先是启动一个tcp服务器

package server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * @author lanni
 * @date 2020/8/19 23:05
 * @description
 **/
public class TCPServer {
    public void run(int port) throws Exception {
        //创建线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建启动类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ServerInitializer())
                    .option(ChannelOption.SO_BACKLOG, 256)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync();
            // 等待服务器 socket 关闭 。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        try {
            System.out.println("tcp服务器启动...");
            new TCPServer().run(8998);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

初始化解码器、处理器

package server;

import handler.CustomDecode;
import handler.TCPServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

/**
 * @author lanni
 * @date 2020/8/22 11:58
 * @description
 **/
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline().
                addLast(new CustomDecode()).        //自定义解码器
                addLast(new TCPServerHandler())     //自定义处理器
        ;
    }
}

解码器中解决tcp粘包问题,关于什么是粘包、拆包我就不做解释了,我这里直接上解决方案,这里我简单说一下我做的项目数据传输,规定数据格式:

固定头部(2字节)+数据长度(4字节)+其它(17字节)+数据(可变长度)+crc校验码(2字节)+固定结尾(2字节)

所以每次收到的数据包中包含了数据的长度,就以此长度来组装数据包传递给handler,这里注意看我的注释部分。

import util.StringUtil;

import java.util.List;

/**
 * @Author lanni
 * @Date 2020/8/23 9:30
 * @Description
 **/
public class CustomDecode extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
        int len = in.readableBytes();       //这里得到可读取的字节长度
        in.markReaderIndex();               //包头做标记位,后面可以重新回到数据包头开始读数据
        //有数据时开始读数据包
        if (len > 0) {
            byte[] src = new byte[len];
            in.readBytes(src);          //把数据读到字节数组中(读取完之后指针会到最后一个数据)
            in.resetReaderIndex();      //重置当前指针到标记位(包头)
            //验证首部为A5 5A,只接收首部正确的数据包,如果包头错误可以直接丢弃或关闭连接
            if ((src[0] & 0x000000ff) == 0xA5 && (src[1] & 0x000000ff) == 0x5A) {
                //计算报文长度
                byte[] data =  {src[3],src[2]};
                String hexLen = StringUtil.byteArrayToHexString(data);
                //这里计算出来的是数据长度的报文长度,需要加27个固定长度
                int pLen = Integer.parseInt(hexLen, 16) + 27;
                if (len < pLen) {
                    //当数据包的长度不够时直接return,netty在缓冲区有数据时会一直调用decode方法,所以我们只需要等待下一个数据包传输过来一起解析
                    return;
                }
                byte[] packet = new byte[pLen];
                in.readBytes(packet,0,pLen);
                out.add(packet);
            }else {
                channelHandlerContext.close();
            }
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("连接异常:"+cause);
//        ctx.close();
    }

然后就是处理器,用于处理得到的数据包,这个大家可以自己编写逻辑。

package handler;

import cn.hutool.core.util.StrUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import util.StringUtil;
/**
 * @Author lanni
 * @Date 2020/8/19 23:07
 * @Description
 **/
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //这里msg就是从解码器中传来的数据,解码器传输过来是什么格式,这里直接转成对应的格式就可以
        byte[] src = (byte[]) msg;
        try {
            //这里做自己的业务逻辑
            
            
            //获取链接实例
            Channel channel = ctx.channel();
            //响应消息一定要这样去发送,只能使用字节传输
            //netty中发送数据需要把待发送的字节数组包装一下成为ByteBuf来发送
            byte[] dest = null;
            ByteBuf buf = Unpooled.copiedBuffer(dest);
            //数据冲刷
            ChannelFuture cf = channel.writeAndFlush(buf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

netty中当然还涉及到服务器主动发送消息给客户端,但是需要注意的是如果是主动发消息,有一个先决条件是需要知道客户端的唯一标识(id或其它标识),我们需要用一个map来保存好channel和这个标识的对应关系。我所做的项目是服务器来维护设备id和连接通道channel的对应关系。

首先需要一个统一管理channel的类,这里有CHANNEL_POOLKEY_POOL两个map,是为了让id和channel能够互相对应起来,可能有人会想着只需要维护id—>channel的关系就可以了,但是可以看见上面在发生异常时所使用的处理方法exceptionCaught(ChannelHandlerContext ctx, Throwable cause)时,只能拿到channel,所以需要通过channel找到id来做出相应的操作。

import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author lanni
 * @date 2020/9/11 20:21
 *
 **/
@Slf4j
public class NettyChannelManager {
    /**
     * 保存连接 Channel 的地方
     */
    private static Map<String, Channel> CHANNEL_POOL = new ConcurrentHashMap<>();
    private static Map<Channel, String> KEY_POOL = new ConcurrentHashMap<>();

    /**
     * 添加 Channel
     *
     * @param key
     */
    public static void add(String key, Channel channel) {
        CHANNEL_POOL.put(key, channel);
        KEY_POOL.put(channel, key);
    }

    /**
     * 删除 Channel
     *
     * @param key
     */
    public static void remove(String key) {
        Channel channel = CHANNEL_POOL.get(key);
        if (channel == null) {
            return;
        }
        CHANNEL_POOL.remove(key);
        KEY_POOL.remove(channel);
    }

    /**
     * 删除并同步关闭连接
     *
     * @param key
     */
    public static void removeAndClose(String key) {
        Channel channel = CHANNEL_POOL.get(key);
        remove(key);
        if (channel != null) {
            // 关闭连接
            try {
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void removeAndClose(Channel channel) {
        String key = KEY_POOL.get(channel);
        removeAndClose(key);
    }

    /**
     * 获得 Channel
     *
     * @param key
     * @return String
     */
    public static Channel getChannel(String key) {
        return CHANNEL_POOL.get(key);
    }

    /**
     * 获得 key
     *
     * @param channel
     * @return Channel
     */
    public static String getKey(Channel channel) {
        return KEY_POOL.get(channel);
    }

    /**
     * 判断是否存在key
     * @author lanni
     * @date 2020/9/16 10:10
     * @param key
     * @return boolean
     **/
    public static boolean hasKey(String key) {
        return CHANNEL_POOL.containsKey(key);
    }

    /**
     * 判断是否存在channel
     * @author lanni
     * @date 2020/10/12 9:34
     * @param channel
     * @return boolean
     **/
    public static boolean hasChannel(Channel channel) {
        return KEY_POOL.containsKey(channel);
    }

}

我这里是在处理器中获取到设备的id,然后交给NettyChannelManager管理,当发生异常时关闭channel并移除对应的连接信息。

package handler;

import cn.hutool.core.util.StrUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import util.StringUtil;
/**
 * @Author lanni
 * @Date 2020/8/19 23:07
 * @Description
 **/
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //这里msg就是从解码器中传来的数据,解码器传输过来是什么格式,这里直接转成对应的格式就可以
        byte[] src = (byte[]) msg;
        try {
            // 从数据包中拿到设备id
            byte[] deviceId = new byte[17];
            System.arraycopy(src, 4, deviceId, 0, 17);
            String devId = StrUtil.str(deviceId, CharsetUtil.UTF_8);
            // 保存channel,key
            // deviceId为空时表示设备断线重连
            if (!NettyChannelManager.hasKey(devId)) {
                NettyChannelManager.add(devId, ctx.channel());
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 当出现异常就关闭连接
        cause.printStackTrace();
        log.error("发生异常:" + cause.getMessage());
        String devId = NettyChannelManager.getKey(ctx.channel());
        if (devId == null || "".equals(devId)) {
            return;
        }
        // 删除链接信息并关闭链接
        NettyChannelManager.removeAndClose(ctx.channel());
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String devId = NettyChannelManager.getKey(ctx.channel());
        if (devId == null || "".equals(devId)) {
            return;
        }
        // 删除链接信息并关闭链接
        NettyChannelManager.removeAndClose(ctx.channel());
    }

}

现在有了这样一个对应关系之后,如果我们想给客户端主动发送消息,那么我们只需要通过客户端的id拿到对应的channel就可以在任意位置发送数据。

        // 先准备好需要发送的数据
        byte[] pkg = 
        // 通过id获取netty连接通道channel
        Channel channel = NettyChannelManager.getChannel(deviceId);
        // 封装数据
        ByteBuf buf = Unpooled.copiedBuffer(pkg);
        // 把数据写入通道并发送
        channel.writeAndFlush(buf);

结语:以上所说都是在单机环境下,如果说是分布式环境的话那么关于id-channel的维护就需要修改。我们可以使用spring session来代替这里的

NettyChannelManager,只需要几个配置就能解决分布式的问题,当然也可以有其它的方案,我在这里就不列举了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351