Netty 模拟心跳通信

在长连接场景下有很多发送心跳测试的需求,如服务注册与实现等。

一 、服务器端

ServerNetty:心跳服务端
ServerHeartBeatHandler:处理某个客户端的心跳通信

package com.test.thread.netty.heartBeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import com.test.thread.netty.MarshallingCodefactory;
import com.test.thread.utils.Constant;

/**
 * netty 模拟发送心跳的服务器端
 * @author zhb
 */
public class ServerNetty {
    
    private int port;
    
    public ServerNetty(int port){
        this.port = port;
    }
    
    // netty 服务端启动
    public void action() throws InterruptedException{
        
        // 用来接收进来的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(); 
        // 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // nio服务的启动类
            ServerBootstrap sbs = new ServerBootstrap();
            // 配置nio服务参数
            sbs.group(bossGroup, workerGroup)
               .channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
               .option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
               .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
               .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        
                        // marshalling 序列化对象的解码
                        socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
                        // marshalling 序列化对象的编码
                        socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
                        
                        // 处理接收到的请求
                        socketChannel.pipeline().addLast(new ServerHeartBeatHandler()); // 这里相当于过滤器,可以配置多个
                    }
               });
            
            System.err.println("server 开启--------------");
            // 绑定端口,开始接受链接
            ChannelFuture cf = sbs.bind(port).sync();
            
            // 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务
            cf.channel().closeFuture().sync();
        } finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }       
    }
        
    // 开启netty服务线程
    public static void main(String[] args) throws InterruptedException {
        new ServerNetty(Constant.serverSocketPort).action();
    }   
    
}

package com.test.thread.netty.heartBeat;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.HashMap;

/**
 * 处理具体的一个客户端的心跳处理
 * @author zhb
 */
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {

    // 允许接入的认证信息
    private static HashMap<String, Object> auth_map = new HashMap<String, Object>();
    
    private static final String SUCCESS_KEY = "auth_success_key";
    
    static{
        // 可配置多个
        auth_map.put("127.0.0.1", "1234");
        // 192.168.56.1
        auth_map.put("192.168.56.1", "1234");
    }
    
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        System.out.println("--------------------------------------------");
        // 如果信息是字符串类型,就去做认证
        if(msg instanceof String){
            auth(ctx, msg);
        // 认证通过后的心跳信息   
        }else if(msg instanceof RequestInfo){
            
            handlerHeartBeatInfo(ctx, msg);
        }else{
            ctx.writeAndFlush("info error!").addListener(ChannelFutureListener.CLOSE);
        }
    }

    /**
     * 处理检测客户端心跳信息
     * @param msg
     * @param msg2 
     */
    private void handlerHeartBeatInfo(ChannelHandlerContext ctx, Object msg) {
        
        RequestInfo info = (RequestInfo) msg;
        System.out.println("--------------------------------------------");
        System.out.println("当前主机ip为: " + info.getIp());
        System.out.println("当前主机cpu情况: ");
        HashMap<String, Object> cpu = info.getCpuPercMap();
        System.out.println("总使用率: " + cpu.get("combined"));
        System.out.println("用户使用率: " + cpu.get("user"));
        System.out.println("系统使用率: " + cpu.get("sys"));
        System.out.println("等待率: " + cpu.get("wait"));
        System.out.println("空闲率: " + cpu.get("idle"));
        
        System.out.println("当前主机memory情况: ");
        HashMap<String, Object> memory = info.getMemoryMap();
        System.out.println("内存总量: " + memory.get("total"));
        System.out.println("当前内存使用量: " + memory.get("used"));
        System.out.println("当前内存剩余量: " + memory.get("free"));
        // 返回心跳信息接收成功   
        ctx.writeAndFlush("info received!");
    }
    
    /**
     * 验证客户端的信息
     * @param ctx
     * @param msg
     */
    private void auth(ChannelHandlerContext ctx, Object msg) {
        
        String[]  authStr = ((String)msg).split(",");   
        String authKey = (String) auth_map.get(authStr[0]);     
        // 请求的ip对应的key和服务端的是否一致
        if(authKey != null && authKey.equals(authStr[1])){
            ctx.writeAndFlush(SUCCESS_KEY);
        }else{
            // 返回认证失败
            ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
        }
    }
        
    // 数据读取完毕的处理
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.err.println("服务端读取数据完毕");
    }
    
    // 出现异常的处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println("server 读取数据出现异常");
        ctx.close();
    }
        
}

二、客户端

ClientNetty:心跳测试客户端
ClientHeartBeatHandler:客户端心跳测试处理类
RequestInfo:发送心跳信息的实体类
HeartBeatTask:发送心跳信息任务类

package com.test.thread.netty.heartBeat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.UnsupportedEncodingException;

import com.test.thread.netty.MarshallingCodefactory;
import com.test.thread.utils.Constant;

/**
 * 客户端发送请求
 * @author zhb
 *
 */
public class ClientNetty {
    
    // 要请求的服务器的ip地址
    private String ip;
    // 服务器的端口
    private int port;
    
    public ClientNetty(String ip, int port){
        this.ip = ip;
        this.port = port;
    }
    
    // 请求端主题
    private void action() throws InterruptedException, UnsupportedEncodingException {
        
        EventLoopGroup bossGroup = new NioEventLoopGroup();     
        Bootstrap bs = new Bootstrap();
        
        bs.group(bossGroup)
          .channel(NioSocketChannel.class)
          .option(ChannelOption.SO_KEEPALIVE, true)
          .handler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel socketChannel) throws Exception {
                  
                    // marshalling 序列化对象的解码
                    socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
                    // marshalling 序列化对象的编码
                    socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
                  
                    // 处理来自服务端的响应信息
                    socketChannel.pipeline().addLast(new ClientHeartBeatHandler());
              }
         });
        
        // 客户端开启
        ChannelFuture cf = bs.connect(ip, port).sync();     
        // 等待直到连接中断
        cf.channel().closeFuture().sync();      
        bossGroup.shutdownGracefully();
    }
        
    public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException {
        new ClientNetty("127.0.0.1", Constant.serverSocketPort).action();
    }
        
}

package com.test.thread.netty.heartBeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
 * 处理客户端的心跳测试
 * @author zhb
 *
 */
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {
    
    private InetAddress addr;
    
    private static final String SUCCESS_KEY = "auth_success_key";
    
    private ScheduledFuture<?> heartBeat;
    
    // 心跳发送执行
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 

    //当channel已激活就执行的方法
    public void channelActive(ChannelHandlerContext ctx) throws UnknownHostException{
        
        
        addr = InetAddress.getLocalHost();
        String ip = addr.getHostAddress();
        String key = "1234";
        
        // 相当于认证证书
        String auth = ip + "," + key;
        System.err.println("client 发送认证给你信息: " + auth);
        // 首先向服务器发送认证
        ctx.writeAndFlush(auth);
        
    }
    
    // 读取服务端响应的信息
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        try {
            if(msg instanceof String){
                System.err.println("client 收到服务端的响应信息:" +(String)msg);
                if(((String)msg).equals(SUCCESS_KEY)){
                    // 如果服务器端认证成功,开始执行心跳信息 每5秒执行一次
                    heartBeat = scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx, addr), 0, 5, TimeUnit.SECONDS);
                }
            }else{
                System.err.println(msg);
            }
        } finally {
            // 没有返回信息,要释放msg
            ReferenceCountUtil.safeRelease(msg);
        }
    }

    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println("服务端出现异常");
        
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }   

    // 数据读取完毕的处理
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.err.println("服务端读取数据完毕");
    }
        
}

package com.test.thread.netty.heartBeat;

import java.net.InetAddress;
import java.util.HashMap;

import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import org.hyperic.sigar.SigarException;

import io.netty.channel.ChannelHandlerContext;
/**
 * 客户端发送一个心跳信息的线程
 * @author zhb
 *
 */
public class HeartBeatTask implements Runnable {
    
    private ChannelHandlerContext ctx;  
    private InetAddress addr;
    
    public HeartBeatTask(ChannelHandlerContext ctx, InetAddress addr){
        this.ctx = ctx;
        this.addr = addr;
    }

    @Override
    public void run() {     
        try {
            RequestInfo reqInfo = new RequestInfo();
            reqInfo.setIp(addr.getHostAddress());
            
            // 该类可以自行检查本地的系统相关参数,具体信息可以百度
            Sigar sigar = new Sigar();
            //cpu prec
            CpuPerc cpuPerc = sigar.getCpuPerc();
            HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
            cpuPercMap.put("combined", cpuPerc.getCombined());
            cpuPercMap.put("user", cpuPerc.getUser());
            cpuPercMap.put("sys", cpuPerc.getSys());
            cpuPercMap.put("wait", cpuPerc.getWait());
            cpuPercMap.put("idle", cpuPerc.getIdle());
            
            // memory
            Mem mem = sigar.getMem();
            HashMap<String, Object> memoryMap = new HashMap<String, Object>();
            memoryMap.put("total", mem.getTotal() / 1024L);
            memoryMap.put("used", mem.getUsed() / 1024L);
            memoryMap.put("free", mem.getFree() / 1024L);
            
            reqInfo.setCpuPercMap(cpuPercMap);
            reqInfo.setMemoryMap(memoryMap);
            // 发送心跳信息
            ctx.writeAndFlush(reqInfo);
            
        } catch (SigarException e) {
            e.printStackTrace();
        }
        
    }
    

}

package com.test.thread.netty.heartBeat; 

import java.io.Serializable;
import java.util.HashMap;
/**
 * 心跳信息实体类
 * @author zhb
 * 
 */
public class RequestInfo implements Serializable {

    private String ip ;
    // cpu的一些信息
    private HashMap<String, Object> cpuPercMap ;
    // 内存的一些信息
    private HashMap<String, Object> memoryMap;
    //.. other field
    
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public HashMap<String, Object> getCpuPercMap() {
        return cpuPercMap;
    }
    public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
        this.cpuPercMap = cpuPercMap;
    }
    public HashMap<String, Object> getMemoryMap() {
        return memoryMap;
    }
    public void setMemoryMap(HashMap<String, Object> memoryMap) {
        this.memoryMap = memoryMap;
    }   
    
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355

推荐阅读更多精彩内容