Netty实战二:Netty服务端心跳检测

业务场景:做一个netty服务端,跟设备交互,设备使用socket连接服务端。

需要的注意的地方只有两个,一是:服务端心跳检测,二是:服务端粘包处理
NettyServer

public class NettyServer {
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup work = new NioEventLoopGroup();
   ChannelFuture future = null;
    @Resource
    private NettyConfig nettyConfig;
    @PreDestroy
    public void stop(){
        if(future!=null){
            future.channel().close().addListener(ChannelFutureListener.CLOSE);
            future.awaitUninterruptibly();
            boss.shutdownGracefully();
            work.shutdownGracefully();
            future=null;
            logger.info(" 服务关闭 ");
        }
    }
    public void start(){    
        logger.info(" nettyServer 正在启动");
        int port = nettyConfig.getPort();
        serverBootstrap.group(boss,work)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .option(ChannelOption.TCP_NODELAY,true)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new NettyServerInitializer());
            logger.info("netty服务器在["+port+"]端口启动监听");
        try{
            future = serverBootstrap.bind(port).sync();
            if(future.isSuccess()){
                logger.info("nettyServer 完成启动 ");
            }
            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();

        }catch (Exception e){
            logger.info("[出现异常] 释放资源,{}",e);
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

NettyServerInitializer


public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(
                Constants.SERVER_READ_IDEL_TIME_OUT,
                Constants.SERVER_WRITE_IDEL_TIME_OUT, 
                Constants.SERVER_ALL_IDEL_TIME_OUT,
                TimeUnit.SECONDS));
        pipeline.addLast(new AcceptorIdleStateTrigger());
        // 字符串解码 和 编码
        pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
        pipeline.addLast("encoder", new ObjectEncoder());
        // 自己的逻辑Handler
        pipeline.addLast(new NettyServerHandler());
    }
}

AcceptorIdleStateTrigger

@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(AcceptorIdleStateTrigger.class);

    //可以把loss_connect_time 放到AttributeMap中
    private int loss_connect_time = 0;

    private static DeviceWarnService deviceWarnService;

 @Override
    public void channelInactive(ChannelHandlerContext chc) throws  Exception{
        SocketChannel socketChannel = (SocketChannel) chc.channel();
        String clientId = NettyMap.getKeyByChannel(socketChannel);
        logger.info("----客户端设备连接断开:{}",clientId);
        if(!StringUtils.isEmpty(clientId)) {
            NettyMap.removeChannel(clientId);
            //客户端断开
            HttpUtil.syncNetworkStatus(clientId,0);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        SocketChannel socketChannel=(SocketChannel) ctx.channel();
        String clientId = NettyChannelMap.get(socketChannel);
        if(StringUtil.isEmpty(clientId)){
            return;
        }

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            IdleState state=event.state();
            if (state==IdleState.READER_IDLE) {
                //设备离线,更改设备状态,增加离线操作日志
                this.updateDeviceStatus(clientId,0,loss_connect_time);
                loss_connect_time++;
                logger.info(clientId+"客户端离线"+loss_connect_time+"周期了");
                if(loss_connect_time>=Constants.MAX_LOSS_CONNECT_TIME){
                    //客户端断连10分钟
                    logger.info("服务器主动关闭客户端链路--"+String.valueOf(Constants.SERVER_READ_IDEL_TIME_OUT*loss_connect_time)+"s没有"+ NettyChannelMap.get(clientId)+"的信息了");
                    //发送离线短信通知客户
                    this.sendDeviceOffLineMsg(clientId,0);
                    loss_connect_time=0;
                    NettyChannelMap.remove(clientId);
                    //服务端主动关闭channel,会触发com.vendor.netty.server.NettyServerHandler.channelInactive()方法
                    ctx.channel().close();
                }
            } else {
                //复位
                logger.info(clientId+"客户端恢复连接-----");
                loss_connect_time =0;
                super.userEventTriggered(ctx,evt);
            }
        } else {
            //复位
            logger.info(clientId+"客户端恢复连接=======");
            loss_connect_time =0;
            super.userEventTriggered(ctx,evt);
        }
    }


    private void updateDeviceStatus(String clientId, int status,int lossConnectTime) {
        if(deviceWarnService==null){
            deviceWarnService = ContextUtil.getBeanByName(DeviceWarnService.class, "deviceWarnService");
        }
        String factoryDevNo= Constants.getFactoryDevNo(clientId);
        if(lossConnectTime == 0 ) {
            //只添加离线操作日志
            deviceWarnService.updateDeviceNetworkStatusAndLog(factoryDevNo, status);
        }
    }

netty的demo网上一大把,这里就不详细解释了,这里只记录实际业务中遇到的问题。

服务端心跳检测:

在这个业务中,设备使用socket连接服务端,然后会有定期的心跳,同时,服务端要检测客户端是否在线,如果不在线,则发出告警,给后台服务发送报警日志,同时给先关人员发送短信邮件。问题在于,如果设备断网或者断电后,channelInactive并不会被触发,这时就需要服务端主动监控客户端连接。
有两个方案来处理这个问题:
第一个:使用redis来实现,每次设备发起心跳,server就更新一次redis,当设备断网超过一定时间,则redis中数据失效。这时候就认为设备失联,可以发送告警。
每次server重启,从数据库中读取所有设备号,然后储存在内存中,同时启动一个线程,定时根据设备号去redis中获取数据,如果有,则认为设备在线,如果没有,则设备失联
第二个:使用IdleStateHandler来实现。
IdleStateHandler中的三个参数解释如下:
1)readerIdleTime:为读超时时间;
2)writerIdleTime:为写超时时间;
3)allIdleTime:所有类型的超时时间;
这里最重要是的readerIdleTime,当设置了readerIdleTime以后,服务端server会每隔readerIdleTime时间去检查一次channelRead方法被调用的情况,如果在readerIdleTime时间内该channel上的channelRead()方法没有被触发,就会调用userEventTriggered方法。
最终项目中采用的是IdleStateHandler来实现,因为用起来实在是太方便了。
总之,不管客户端是用什么实现的,socket netttyclient websocket,服务端想要主动检测客户端是否在线,都需要心跳,事实上,用到socket的地方,大多都要实现心跳,只要客户端有心跳,那服务端检测客户端是否在线就可以使用IdleStateHandler

粘包拆包处理:

粘包拆包的概念,这里就不重复了,项目中最开始遇到的是粘包问题,因为交互命令都很短,同时数据格式是String,所以最开始只是使用String.split()来切割命令来解决拆包。但是后来遇到一个特殊命令,上报是消息超过了1024字节,这时候就发生了拆包现象,netty默认一次性只接受1024字节的数据,如果超过了,则会拆分。这时候就用到DelimiterBasedFrameDecoder了。

TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,一半采用如下四种方式:
1、消息长度固定,累计读取到消息长度总和为定长Len的报文之后即认为是读取到了一个完整的消息。计数器归位,重新读取。
2、将回车换行符作为消息结束符。
3、将特殊的分隔符作为消息分隔符
4、通过在消息头定义长度字段来标识消息总长度。
DelimiterBasedFrameDecoder属于第三种。业务中因为设备上client是被人家的代码,消息格式都是固定的,所以第1、2、4都不行,只能使用第三种。
DelimiterBasedFrameDecoder的参数:
maxFrameLength:解码的帧的最大长度
stripDelimiter:解码时是否去掉分隔符
failFast:为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
delimiter:分隔符

Netty 服务端创建参考资料
http://www.infoq.com/cn/articles/netty-server-create

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • 一、Python简介和环境搭建以及pip的安装 4课时实验课主要内容 【Python简介】: Python 是一个...
    _小老虎_阅读 5,744评论 0 10
  • Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架...
    认真期待阅读 2,780评论 1 27
  • 春节家族聚会的一个晚宴,我们许多从小一起玩到大的兄弟姐妹们又重新聚在了一起。表弟最后一个入席,坐在了我的身...
    温闻阅读 1,385评论 0 0
  • 不知道有多少人因为这部剧而被这座城所吸引—《东京爱情故事》,也不知道有多少人未曾看过这部剧,至少这部青春偶像剧伴随...
    AdamDai阅读 549评论 2 6