业务场景:做一个netty服务端,跟设备交互,设备使用socket连接服务端。
需要的注意的地方只有两个,一是:服务端心跳检测,二是:服务端粘包处理
NettyServer
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
ChannelFuture future = null;
@Resource
private NettyConfig nettyConfig;
@PreDestroy
public void stop(){
if(future!=null){
future.channel().close().addListener(ChannelFutureListener.CLOSE);
future.awaitUninterruptibly();
boss.shutdownGracefully();
work.shutdownGracefully();
future=null;
logger.info(" 服务关闭 ");
}
}
public void start(){
logger.info(" nettyServer 正在启动");
int port = nettyConfig.getPort();
serverBootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.TCP_NODELAY,true)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new NettyServerInitializer());
logger.info("netty服务器在["+port+"]端口启动监听");
try{
future = serverBootstrap.bind(port).sync();
if(future.isSuccess()){
logger.info("nettyServer 完成启动 ");
}
// 等待服务端监听端口关闭
future.channel().closeFuture().sync();
}catch (Exception e){
logger.info("[出现异常] 释放资源,{}",e);
boss.shutdownGracefully();
work.shutdownGracefully();
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
NettyServerInitializer
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(
Constants.SERVER_READ_IDEL_TIME_OUT,
Constants.SERVER_WRITE_IDEL_TIME_OUT,
Constants.SERVER_ALL_IDEL_TIME_OUT,
TimeUnit.SECONDS));
pipeline.addLast(new AcceptorIdleStateTrigger());
// 字符串解码 和 编码
pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast("encoder", new ObjectEncoder());
// 自己的逻辑Handler
pipeline.addLast(new NettyServerHandler());
}
}
AcceptorIdleStateTrigger
@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(AcceptorIdleStateTrigger.class);
//可以把loss_connect_time 放到AttributeMap中
private int loss_connect_time = 0;
private static DeviceWarnService deviceWarnService;
@Override
public void channelInactive(ChannelHandlerContext chc) throws Exception{
SocketChannel socketChannel = (SocketChannel) chc.channel();
String clientId = NettyMap.getKeyByChannel(socketChannel);
logger.info("----客户端设备连接断开:{}",clientId);
if(!StringUtils.isEmpty(clientId)) {
NettyMap.removeChannel(clientId);
//客户端断开
HttpUtil.syncNetworkStatus(clientId,0);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
SocketChannel socketChannel=(SocketChannel) ctx.channel();
String clientId = NettyChannelMap.get(socketChannel);
if(StringUtil.isEmpty(clientId)){
return;
}
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state=event.state();
if (state==IdleState.READER_IDLE) {
//设备离线,更改设备状态,增加离线操作日志
this.updateDeviceStatus(clientId,0,loss_connect_time);
loss_connect_time++;
logger.info(clientId+"客户端离线"+loss_connect_time+"周期了");
if(loss_connect_time>=Constants.MAX_LOSS_CONNECT_TIME){
//客户端断连10分钟
logger.info("服务器主动关闭客户端链路--"+String.valueOf(Constants.SERVER_READ_IDEL_TIME_OUT*loss_connect_time)+"s没有"+ NettyChannelMap.get(clientId)+"的信息了");
//发送离线短信通知客户
this.sendDeviceOffLineMsg(clientId,0);
loss_connect_time=0;
NettyChannelMap.remove(clientId);
//服务端主动关闭channel,会触发com.vendor.netty.server.NettyServerHandler.channelInactive()方法
ctx.channel().close();
}
} else {
//复位
logger.info(clientId+"客户端恢复连接-----");
loss_connect_time =0;
super.userEventTriggered(ctx,evt);
}
} else {
//复位
logger.info(clientId+"客户端恢复连接=======");
loss_connect_time =0;
super.userEventTriggered(ctx,evt);
}
}
private void updateDeviceStatus(String clientId, int status,int lossConnectTime) {
if(deviceWarnService==null){
deviceWarnService = ContextUtil.getBeanByName(DeviceWarnService.class, "deviceWarnService");
}
String factoryDevNo= Constants.getFactoryDevNo(clientId);
if(lossConnectTime == 0 ) {
//只添加离线操作日志
deviceWarnService.updateDeviceNetworkStatusAndLog(factoryDevNo, status);
}
}
netty的demo网上一大把,这里就不详细解释了,这里只记录实际业务中遇到的问题。
服务端心跳检测:
在这个业务中,设备使用socket连接服务端,然后会有定期的心跳,同时,服务端要检测客户端是否在线,如果不在线,则发出告警,给后台服务发送报警日志,同时给先关人员发送短信邮件。问题在于,如果设备断网或者断电后,channelInactive并不会被触发,这时就需要服务端主动监控客户端连接。
有两个方案来处理这个问题:
第一个:使用redis来实现,每次设备发起心跳,server就更新一次redis,当设备断网超过一定时间,则redis中数据失效。这时候就认为设备失联,可以发送告警。
每次server重启,从数据库中读取所有设备号,然后储存在内存中,同时启动一个线程,定时根据设备号去redis中获取数据,如果有,则认为设备在线,如果没有,则设备失联
第二个:使用IdleStateHandler来实现。
IdleStateHandler中的三个参数解释如下:
1)readerIdleTime:为读超时时间;
2)writerIdleTime:为写超时时间;
3)allIdleTime:所有类型的超时时间;
这里最重要是的readerIdleTime,当设置了readerIdleTime以后,服务端server会每隔readerIdleTime时间去检查一次channelRead方法被调用的情况,如果在readerIdleTime时间内该channel上的channelRead()方法没有被触发,就会调用userEventTriggered方法。
最终项目中采用的是IdleStateHandler来实现,因为用起来实在是太方便了。
总之,不管客户端是用什么实现的,socket netttyclient websocket,服务端想要主动检测客户端是否在线,都需要心跳,事实上,用到socket的地方,大多都要实现心跳,只要客户端有心跳,那服务端检测客户端是否在线就可以使用IdleStateHandler
粘包拆包处理:
粘包拆包的概念,这里就不重复了,项目中最开始遇到的是粘包问题,因为交互命令都很短,同时数据格式是String,所以最开始只是使用String.split()来切割命令来解决拆包。但是后来遇到一个特殊命令,上报是消息超过了1024字节,这时候就发生了拆包现象,netty默认一次性只接受1024字节的数据,如果超过了,则会拆分。这时候就用到DelimiterBasedFrameDecoder了。
TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,一半采用如下四种方式:
1、消息长度固定,累计读取到消息长度总和为定长Len的报文之后即认为是读取到了一个完整的消息。计数器归位,重新读取。
2、将回车换行符作为消息结束符。
3、将特殊的分隔符作为消息分隔符
4、通过在消息头定义长度字段来标识消息总长度。
DelimiterBasedFrameDecoder属于第三种。业务中因为设备上client是被人家的代码,消息格式都是固定的,所以第1、2、4都不行,只能使用第三种。
DelimiterBasedFrameDecoder的参数:
maxFrameLength:解码的帧的最大长度
stripDelimiter:解码时是否去掉分隔符
failFast:为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
delimiter:分隔符
Netty 服务端创建参考资料
http://www.infoq.com/cn/articles/netty-server-create