CIM-router功能和设计分析

第一篇已经分析了CIM-client功能和设计。其中也提到了client需要向router注册,获取可用的服务器(负载均衡),线上运维(统计在线人数,模糊查找)。那么这篇注重看看router的设计和实现。
cim github地址: https://github.com/crossoverJie/cim

1. 协议
1.1 请求协议

请求协议的类图结构如下:

image.png

其中基类BaseRequest的实现(在client篇里面也谈到过)如下:

public class BaseRequest {
  //请求序列号
    private String reqNo;
// 请求时间戳
    private int timeStamp;
}
  • ChatReqVO增加了userId,msg字段,用来表示抽象聊天的数据请求。
  • LoginReqVO 增加了userId,userName字段,用来表示登陆的数据请求。
  • P2PReqVO 增加了userIdreceiveUserIdmsg字段,用来表示私聊的数据请求。
1.2 响应协议
  • CIMServerResVO 包含了ip,cimServerPort,httpPort这三个字段,用来表示获取某个服务的ip+prot数据请求。
  • RegisterInfoResVO包含了userId,userName用来表示用户注册的某个服务。
2. 程序运行流程
public class RouteApplication implements CommandLineRunner{

    private final static Logger LOGGER = LoggerFactory.getLogger(RouteApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(RouteApplication.class, args);
        LOGGER.info("启动 route 成功");
    }

    @Override
    public void run(String... args) throws Exception {

        //监听服务
        Thread thread = new Thread(new ServerListListener());
        thread.setName("zk-listener");
        thread.start() ;
    }
}
  • 标准的Springboot应用启动,并在容器启动后,启动一个线程去向ZK注册监听器。
public class ServerListListener implements Runnable{

    private static Logger logger = LoggerFactory.getLogger(ServerListListener.class);

    private ZKit zkUtil;

    private AppConfiguration appConfiguration ;


    public ServerListListener() {
        zkUtil = SpringBeanFactory.getBean(ZKit.class) ;
        appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
    }

    @Override
    public void run() {
        //注册监听服务
        zkUtil.subscribeEvent(appConfiguration.getZkRoot());

    }
}
// 当获取ZK中root节点发生变更(增删改)后更新本地ServerCache
public void subscribeEvent(String path) {
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                logger.info("清除/更新本地缓存 parentPath=【{}】,currentChilds=【{}】", parentPath,currentChilds.toString());

                //更新所有缓存/先删除 再新增
                serverCache.updateCache(currentChilds) ;
            }
        });


    }

ServerCache 保存的是ZK根目录下的所有注册的服务器ip,这样的目的在于每次都缓存了所有的服务节点,而不用每次都向ZK请求,减少网络请求次数。

3. 对外提供的Http服务

router就是对外提供的http服务,下面介绍它对外提供服务的具体实现

3.1 注册服务
//提供的http接口
@ApiOperation("注册账号")
    @RequestMapping(value = "registerAccount", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<RegisterInfoResVO> registerAccount(@RequestBody RegisterInfoReqVO registerInfoReqVO) throws Exception {
        BaseResponse<RegisterInfoResVO> res = new BaseResponse();

        long userId = System.currentTimeMillis();
        RegisterInfoResVO info = new RegisterInfoResVO(userId, registerInfoReqVO.getUserName());
        info = accountService.register(info);

        res.setDataBody(info);
        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        return res;
    }

 public RegisterInfoResVO register(RegisterInfoResVO info) {
        String key = ACCOUNT_PREFIX + info.getUserId();

        String name = redisTemplate.opsForValue().get(info.getUserName());
        if (null == name) {
            //为了方便查询,冗余一份
            redisTemplate.opsForValue().set(key, info.getUserName());
            redisTemplate.opsForValue().set(info.getUserName(), key);
        } else {
          //已经存在
            long userId = Long.parseLong(name.split(":")[1]);
            info.setUserId(userId);
            info.setUserName(info.getUserName());
        }

        return info;
    }
3.2 获取所有的在线用户
 @ApiOperation("获取所有在线用户")
    @RequestMapping(value = "onlineUser", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception {
        BaseResponse<Set<CIMUserInfo>> res = new BaseResponse();

        Set<CIMUserInfo> cimUserInfos = userInfoCacheService.onlineUser();
        res.setDataBody(cimUserInfos) ;
        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        return res;
    }

public Set<CIMUserInfo> onlineUser() {
        Set<CIMUserInfo> set = null ;
        Set<String> members = redisTemplate.opsForSet().members(LOGIN_STATUS_PREFIX);
        for (String member : members) {
            if (set == null){
                set = new HashSet<>(64) ;
            }
            //通过usrid获取到UserInfo
            CIMUserInfo cimUserInfo = loadUserInfoByUserId(Long.valueOf(member)) ;
            set.add(cimUserInfo) ;
        }

        return set;
    }

通过LOGIN_STATUS_PREFIX记录所有的登陆用户,因此获取到这个Set集合就行。

3.3 登陆并获取到可用的一个服务节点
@ApiOperation("登录并获取服务器")
    @RequestMapping(value = "login", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<CIMServerResVO> login(@RequestBody LoginReqVO loginReqVO) throws Exception {
        BaseResponse<CIMServerResVO> res = new BaseResponse();

        //登录校验,如果登陆成功,则保存登陆状态
        StatusEnum status = accountService.login(loginReqVO);
        if (status == StatusEnum.SUCCESS) {

            String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
            String[] serverInfo = server.split(":");
            //下面讲到一致性hash算法
            CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));

            //保存路由信息,即把(userid,server)对应起来
            accountService.saveRouteInfo(loginReqVO,server);

            res.setDataBody(vo);

        }
        res.setCode(status.getCode());
        res.setMessage(status.getMessage());

        return res;
    }
  • 这里选择服务器的形式有3中,如下图:
    image.png

    其中,LoopHandle就是对轮询的形式获取到服务节点,RandomHandle就是随机获取到服务节点,ConsistentHashHandle就是通过一致性hash获取到服务节点。关于一致性hash,有两种实现形式,如下:
public class TreeMapConsistentHash extends AbstractConsistentHash {
    //通过treemap来实现
    private TreeMap<Long,String> treeMap = new TreeMap<Long, String>() ;

    /**
     * 虚拟节点数量
     */
    private static final int VIRTUAL_NODE_SIZE = 2 ;
    //加入虚拟节点
    @Override
    public void add(long key, String value) {
        for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
            Long hash = super.hash("vir" + key + i);
            treeMap.put(hash,value);
        }
        treeMap.put(key, value);
    }

    @Override
    public String getFirstNodeValue(String value) {
        long hash = super.hash(value);
        System.out.println("value=" + value + " hash = " + hash);
        //返回大于等于value的视图SortedMap
        SortedMap<Long, String> last = treeMap.tailMap(hash);
        if (!last.isEmpty()) {
            //返回第一个key大于value的对应map里面保存的server
            return last.get(last.firstKey());
        }
        //如果没有,则返回第一个
        return treeMap.firstEntry().getValue();
    }
}

//用Node数组实现的
public class SortArrayMapConsistentHash extends AbstractConsistentHash {

    private SortArrayMap sortArrayMap = new SortArrayMap();

    /**
     * 虚拟节点数量
     */
    private static final int VIRTUAL_NODE_SIZE = 2 ;

    @Override
    public void add(long key, String value) {
        for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
            Long hash = super.hash("vir" + key + i);
            sortArrayMap.add(hash,value);
        }
        sortArrayMap.add(key, value);
    }
    //Arrays的sort
    @Override
    public void sort() {
        sortArrayMap.sort();
    }
    //顺时针找第一个比给定key大的server
    @Override
    public String getFirstNodeValue(String value) {
        long hash = super.hash(value);
        System.out.println("value=" + value + " hash = " + hash);
        return sortArrayMap.firstNodeValue(hash);
    }
}
  • 上面是两种一种一致性hash的实现,都是加入虚拟节点,然后把寻找第一个比给定value大的节点,返回该虚拟节点对应的value就行。
3.4 用户下线
@ApiOperation("客户端下线")
    @RequestMapping(value = "offLine", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<NULLBody> offLine(@RequestBody ChatReqVO groupReqVO) throws Exception {
        BaseResponse<NULLBody> res = new BaseResponse();

        CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());

        LOGGER.info("下线用户[{}]", cimUserInfo.toString());
        accountService.offLine(groupReqVO.getUserId());

        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        return res;
    }

 @Override
    public void offLine(Long userId) throws Exception {
        //删除路由
        redisTemplate.delete(ROUTE_PREFIX + userId) ;

        //删除登录状态
        userInfoCacheService.removeLoginStatus(userId);
    }
  • 下线就是删除,删除登录状态就行。
3.5 群聊
 public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) throws Exception {
        BaseResponse<NULLBody> res = new BaseResponse();
        LOGGER.info("msg=[{}]", groupReqVO.toString());
        //获取所有的推送列表
        Map<Long, CIMServerResVO> serverResVOMap = accountService.loadRouteRelated();
        for (Map.Entry<Long, CIMServerResVO> cimServerResVOEntry : serverResVOMap.entrySet()) {
            Long userId = cimServerResVOEntry.getKey();
            CIMServerResVO value = cimServerResVOEntry.getValue();
            if (userId.equals(groupReqVO.getUserId())){
                //过滤掉自己
                CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
                LOGGER.warn("过滤掉了发送者 userId={}",cimUserInfo.toString());
                continue;
            }
            //推送消息
            String url = "http://" + value.getIp() + ":" + value.getHttpPort() + "/sendMsg" ;
            ChatReqVO chatVO = new ChatReqVO(userId,groupReqVO.getMsg()) ;

            accountService.pushMsg(url,groupReqVO.getUserId(),chatVO);

        }
        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        return res;
    }
 
//扫描以ROUTE_PREFIX开始的字符串,这个字符串对应着每个用户使用的server
 public Map<Long, CIMServerResVO> loadRouteRelated() {

        Map<Long, CIMServerResVO> routes = new HashMap<>(64);


        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        ScanOptions options = ScanOptions.scanOptions()
                .match(ROUTE_PREFIX + "*")
                .build();
        Cursor<byte[]> scan = connection.scan(options);

        while (scan.hasNext()) {
            byte[] next = scan.next();
            String key = new String(next, StandardCharsets.UTF_8);
            LOGGER.info("key={}", key);
            parseServerInfo(routes, key);

        }
        try {
            scan.close();
        } catch (IOException e) {
            LOGGER.error("IOException",e);
        }

        return routes;
    }
  • 上面就是找到找到每个节点的对应的服务节点
 public void pushMsg(String url, long sendUserId, ChatReqVO groupReqVO) throws Exception {
        CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(sendUserId);

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("msg", cimUserInfo.getUserName() + ":【" + groupReqVO.getMsg() + "】");
        jsonObject.put("userId", groupReqVO.getUserId());
        RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());

        Request request = new Request.Builder()
                .url(url)
                .post(requestBody)
                .build();

        Response response = okHttpClient.newCall(request).execute();
        try {
            if (!response.isSuccessful()) {
                throw new IOException("Unexpected code " + response);
            }
        }finally {
            response.body().close();
        }
    }
  • 发送到具体的Server,让sever负责广播它收到的群聊消息。
3.6 私聊
 @ApiOperation("私聊 API")
    @RequestMapping(value = "p2pRoute", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PReqVO p2pRequest) throws Exception {
        BaseResponse<NULLBody> res = new BaseResponse();

        try {
            //获取接收消息用户的路由信息
            CIMServerResVO cimServerResVO = accountService.loadRouteRelatedByUserId(p2pRequest.getReceiveUserId());
            //推送消息
            String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort() + "/sendMsg" ;

            //p2pRequest.getReceiveUserId()==>消息接收者的 userID
            ChatReqVO chatVO = new ChatReqVO(p2pRequest.getReceiveUserId(),p2pRequest.getMsg()) ;
            accountService.pushMsg(url,p2pRequest.getUserId(),chatVO);

            res.setCode(StatusEnum.SUCCESS.getCode());
            res.setMessage(StatusEnum.SUCCESS.getMessage());

        }catch (CIMException e){
            res.setCode(e.getErrorCode());
            res.setMessage(e.getErrorMessage());
        }
        return res;
    }
  • 与群聊很相似,只不过这里是一个用户。都是定位具体的sever,然后再根据登陆在server的消息,发送消息到具体的channel就可以。
总结

至此,router分析也完成了,router的实现主要是获取到server,同过http调用,从而发送消息。这个过程中,也涉及到负载均衡,在用户注册的时候,尽可能均衡分布。这里用了轮询,随机,一致性hash这三种负载均衡算法。而且后面也方便拓展。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容

  • 网关(Gateway) 简单定义一:网络连接到另一个网络的“关口”。 理论 如果网络A中的主机发现数据包的目的主机...
    Enomothem阅读 5,958评论 0 2
  • 实在开心到不行,人生头次拥有腹肌,暂且叫她们小A,小B好了。 顺产八斤多后,不堪入目的肚皮,一看见就无比嫌弃自己。...
    Vinc姐阅读 215评论 0 0
  • 《话说服务缘起》:千古“活结”为哪般?——百谈服务经之来龙去脉1 古诗云:“人事有代谢,往来成古今”.如果循着历史...
    严满意聊服务事阅读 452评论 0 0
  • 彤云郁郁愁容笑,骤雨惊风任曳摇。一抹珠帘天扯地,天开日出映虹桥。
    古道西风廋马阅读 121评论 0 0
  • 大家好,我是质检岗的新同事。4月份之前我也跟大家一样,在电催一线工作,平均每天打至少300个电话。为什么我写这篇文...
    容大小姐阅读 6,962评论 7 1