CIM-client 功能和设计分析

感觉Crossoverjie的一个开源cim(即时通讯系统,源码和设计个人觉得不错,空闲的时候分析一下。
cim github地址: https://github.com/crossoverJie/cim

协议设计

1. 请求协议类图
image.png
  • BaseRequest 作为基类,具有所有的请求都应该具备的两个属性
public class BaseRequest {
    // 请求的序列号
    private String reqNo;
   //请求的时间戳,构造的时候就设置为System.currentTimeMillis() / 1000
    private int timeStamp;
}
  • GoogleProtocolVO 增加了requestIdmsg两个字段,表示传输GoogleProtocol 消息。
  • GroupReqVO 增加了userId,msg 两个字段,表示传输的是群聊消息。
  • LoginReqVO 增加了userId,userName两个字段,表示传输的是登录消息
  • P2PReqVO 增加了userId,receiveUserId,msg 字段,表示传输的是一对一私聊消息
  • SendMsgReqVO 增加了msg,userId字段,表示通常的传输发送消息
  • StringReqVO 增加了msg字段,表示用来传输String的消息
2. 相应协议类图
image.png
  • CIMServerResVO 用来接收查询路由选中的服务器的响应消息,格式如下:
{
      code : 9000
      message : 成功
      reqNo : null
      dataBody : {"ip":"127.0.0.1","port":8081} 
}    
  • OnlineUsersResVO用来接受查询所有在线用户的响应消息,格式如下:
{
     code : 9000
     message : 成功
     reqNo : null
     dataBody : [{"userId":1545574841528,"userName":"zhangsan"},{"userId":1545574871143,"userName":"crossoverJie"}]
}
  • SendMsgResVO 表示发送消息的响应
3. 程序运行流程
3.1 程序入口类
public class CIMClientApplication implements CommandLineRunner{

    private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);

    @Autowired
    private ClientInfo clientInfo ;
    public static void main(String[] args) {
        SpringApplication.run(CIMClientApplication.class, args);
        LOGGER.info("启动 Client 服务成功");
    }

    @Override
    public void run(String... args) throws Exception { 
        Scan scan = new Scan() ;
        Thread thread = new Thread(scan);
        thread.setName("scan-thread");
        thread.start();
        clientInfo.saveStartDate();
    }
  • 标准的Springboot启动流程,重写run方法在Springboot应用启动后就启动一个线程去监听控制台,根据用户的命令,做相应的操作。
3.2 Scan扫描用户的输入命令
 public void run() {
        Scanner sc = new Scanner(System.in);
        while (true) {
            String msg = sc.nextLine();
            //检查消息,保证输入消息不能不为null
            if (msgHandle.checkMsg(msg)) {
                continue;
            }
            //系统内置命令
            if (msgHandle.innerCommand(msg)){
                continue;
            }
            //真正的发送消息
            msgHandle.sendMsg(msg) ;
            //写入聊天记录
            msgLogger.log(msg) ;
            LOGGER.info("{}:【{}】", configuration.getUserName(), msg);
        }
    }
  • 经过检查消息是否为空字符串,是否是内置命令,最后剩下的是用户发送的消息。
3.3 内置命令的处理

如果是内置命令,转而通过反射实例化每个命令,这里用到命令模式。

public boolean innerCommand(String msg) {

        if (msg.startsWith(":")) {
            
            InnerCommand instance = innerCommandContext.getInstance(msg);
            //调用里面的方法
            instance.process(msg) ;
            return true;
        } else {
            return false;
        }
    }

 public InnerCommand getInstance(String command) {
        //// 每个命令对应一个实现类
        Map<String, String> allClazz = SystemCommandEnum.getAllClazz();

        //兼容需要命令后接参数的数据 :q cross
        String[] trim = command.trim().split(" ");
        String clazz = allClazz.get(trim[0]);
        InnerCommand innerCommand = null;
        try {
            if (StringUtil.isEmpty(clazz)){
                clazz = PrintAllCommand.class.getName() ;
            }
            //根据类名获取到在容器里面的实例
            innerCommand = (InnerCommand) SpringBeanFactory.getBean(Class.forName(clazz));
        } catch (Exception e) {
            LOGGER.error("Exception", e);
        }

        return innerCommand;
    }
  • 内部完整命令,以及他们的实现类如下


    image.png

    完整命令类类图如下:


    image.png

    看其中一个实现类
public class PrintOnlineUsersCommand implements InnerCommand {
    private final static Logger LOGGER = LoggerFactory.getLogger(PrintOnlineUsersCommand.class);

    @Autowired
    private RouteRequest routeRequest ;

    @Override
    public void process(String msg) {
        try {
            // 查询所有的在线用户,委托routeRequest 来查询
            List<OnlineUsersResVO.DataBodyBean> onlineUsers = routeRequest.onlineUsers();

            LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
            for (OnlineUsersResVO.DataBodyBean onlineUser : onlineUsers) {
                LOGGER.info("userId={}=====userName={}", onlineUser.getUserId(), onlineUser.getUserName());
            }
            LOGGER.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");

        } catch (Exception e) {
            LOGGER.error("Exception", e);
        }
    }
}

都是通过其中的process来处理逻辑

3.4 处理内置命令后,接着来看处理发送消息
public void sendMsg(String msg) {
 
        if (aiModel) {
           //ai 模式主要是调侃之前那个价值两亿的融资项目
            aiChat(msg);
        } else {
           // 正常的聊天
            normalChat(msg);
        }
    }

private void normalChat(String msg) {
        String[] totalMsg = msg.split(";;");
        // 私聊的格式是:12345;;hello
        if (totalMsg.length > 1) {
            //私聊
            P2PReqVO p2PReqVO = new P2PReqVO();
            p2PReqVO.setUserId(configuration.getUserId());
            p2PReqVO.setReceiveUserId(Long.parseLong(totalMsg[0]));
            p2PReqVO.setMsg(totalMsg[1]);
            try {
                p2pChat(p2PReqVO);
            } catch (Exception e) {
                LOGGER.error("Exception", e);
            }

        } else {
            //群聊 直接发消息就行
            GroupReqVO groupReqVO = new GroupReqVO(configuration.getUserId(), msg);
            try {
                groupChat(groupReqVO);
            } catch (Exception e) {
                LOGGER.error("Exception", e);
            }
        }
    }

群聊和私聊也都委托 routeRequest来实现

 @Override
    public void groupChat(GroupReqVO groupReqVO) throws Exception {
        routeRequest.sendGroupMsg(groupReqVO);
    }

    @Override
    public void p2pChat(P2PReqVO p2PReqVO) throws Exception {

        routeRequest.sendP2PMsg(p2PReqVO);

    }
3.5 处理聊天记录

接着最开始的时候看,聊天完成后,需要把聊天记录写入文件,实现如下

 public void log(String msg) {
        //开始消费,异步完成
        startMsgLogger();
        try {
          //往阻塞队列里面添加
            blockingQueue.put(msg);
        } catch (InterruptedException e) {
            LOGGER.error("InterruptedException", e);
        }
    }

启动消息线程,往阻塞队列里面添加消息

private class Worker extends Thread {


        @Override
        public void run() {
            while (started) {
                try {
                    //往阻塞队列里面取
                    String msg = blockingQueue.take();
                    writeLog(msg);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }

    }

真正写入文件的实现如下:

private void writeLog(String msg) {

        LocalDate today = LocalDate.now();
        int year = today.getYear();
        int month = today.getMonthValue();
        int day = today.getDayOfMonth();

        String dir = appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/";
        String fileName = dir + year + month + day + ".log";

        Path file = Paths.get(fileName);
        boolean exists = Files.exists(Paths.get(dir), LinkOption.NOFOLLOW_LINKS);
        try {
            if (!exists) {
                Files.createDirectories(Paths.get(dir));
            }

            List<String> lines = Arrays.asList(msg);

            Files.write(file, lines, Charset.forName("UTF-8"), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
        } catch (IOException e) {
            LOGGER.info("IOException", e);
        }

    }

查找聊天记录的实现如下,就是简单的查找每个文件的每行,然后看是否包含,这样的方式很暴力,后期的话有很大改进:

@Override
    public String query(String key) {
        StringBuilder sb = new StringBuilder();

        Path path = Paths.get(appConfiguration.getMsgLoggerPath() + appConfiguration.getUserName() + "/");

        try {
            Stream<Path> list = Files.list(path);
            List<Path> collect = list.collect(Collectors.toList());
            for (Path file : collect) {
                List<String> strings = Files.readAllLines(file);
                for (String msg : strings) {
                    if (msg.trim().contains(key)) {
                        sb.append(msg).append("\n");
                    }
                }

            }
        } catch (IOException e) {
            LOGGER.info("IOException", e);
        }

        return sb.toString().replace(key, "\033[31;4m" + key + "\033[0m");
    }
3.6 RouteRequestImpl的实现

这个实现里面包含众多的功能,例如,群聊,私聊,离线,获取在线用户,获取一个可用的服务ip。这些功能的实现都是依靠 RouteRequestImpl来完成,而RouteRequestImpl里面的实现是通过okhttp远程调用cim-router的http接口实现的。看其中的群聊功能:

 public void sendGroupMsg(GroupReqVO groupReqVO) throws Exception {
        //序列化
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("msg",groupReqVO.getMsg());
        jsonObject.put("userId",groupReqVO.getUserId());
        RequestBody requestBody = RequestBody.create(mediaType,jsonObject.toString());
        
        Request request = new Request.Builder()
                .url(groupRouteRequestUrl)
                .post(requestBody)
                .build();
        //发送http请求cim-router
        Response response = okHttpClient.newCall(request).execute() ;
        try {
            if (!response.isSuccessful()){
                throw new IOException("Unexpected code " + response);
            }
        }finally {
            response.body().close();
        }
    }
3.7 客户端的启动

上面所有的都是内置命令的处理以及和cim-router的通信。但是,client最终是要和server通信的,所以在这个过程中,客户端作为netty客户端需要启动。这个启动过程可以在CIMClient实例化的过程中启动

@Component
public class CIMClient {
   //构造函数完成后调用
  @PostConstruct
    public void start() throws Exception {

        //登录 + 获取可以使用的服务器 ip+port
        CIMServerResVO.ServerInfo cimServer = userLogin();

        //启动客户端
        startClient(cimServer);

        //向服务端注册
        loginCIMServer();


    }
}

向路由注册并返回可用的服务器地址

private CIMServerResVO.ServerInfo userLogin() {
        LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
        CIMServerResVO.ServerInfo cimServer = null;
        try {
          //获取可用的服务器
            cimServer = routeRequest.getCIMServer(loginReqVO);

            //保存系统信息
            clientInfo.saveServiceInfo(cimServer.getIp() + ":" + cimServer.getCimServerPort())
                    .saveUserInfo(userId, userName);

            LOGGER.info("cimServer=[{}]", cimServer.toString());
        } catch (Exception e) {
            errorCount++;

            if (errorCount >= configuration.getErrorCount()) {
                LOGGER.error("重连次数达到上限[{}]次", errorCount);
                msgHandle.shutdown();
            }
            LOGGER.error("登录失败", e);
        }
        return cimServer;
    }

启动客户端到服务端(上一步获取的)的channel

private void startClient(CIMServerResVO.ServerInfo cimServer) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new CIMClientHandleInitializer())
        ;

        ChannelFuture future = null;
        try {
            future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
        } catch (InterruptedException e) {
            errorCount++;

            if (errorCount >= configuration.getErrorCount()) {
                LOGGER.error("链接失败次数达到上限[{}]次", errorCount);
                msgHandle.shutdown();
            }
            LOGGER.error("连接失败", e);
        }
        if (future.isSuccess()) {
            LOGGER.info("启动 cim client 成功");
        }
        channel = (SocketChannel) future.channel();
    }

向服务器注册

 private void loginCIMServer() {
        CIMRequestProto.CIMReqProtocol login = CIMRequestProto.CIMReqProtocol.newBuilder()
                .setRequestId(userId)
                .setReqMsg(userName)
                .setType(Constants.CommandType.LOGIN)
                .build();
        ChannelFuture future = channel.writeAndFlush(login);
        future.addListener((ChannelFutureListener) channelFuture ->
                LOGGER.info("注册成功={}", login.toString()));
    }

总结

到这里整cim-client的功能就完成了,客户端就是通过命令模式通过okhttp远程调用特定的服务地址来注册,获取服务器地址,完成运维。通过从cim-router拿到的服务器地址,建立客户端-服务端的连接,即可完成消息私聊,群聊。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容