t-io 集群解决方案以及源码解析

0x01 概要说明

本博客是基于谭总 t-io showcase中的tio-websocket-showcase 示例来实现集群。看showcase 入门还是挺容易的,入坑(入门)请看老谭写的用t-io来写一个网页聊天室或客服是个怎样的体验。 要深入理解具体实现原理后续的业务扩展,把t-io玩6起来还需要耐心看看源码,看了之后我相信你一定会有收获的,祝你好运。

其实t-io2.4的版本中已加入的集群实现的逻辑代码,只是官方没有写文档以及完整的示例而已,在此不得不说t-io 是一个比较良心的开源项目,很多业务场景都有考虑到。你们有需求也可以去t-ioissues

0x02 已有的集群解决方案

实现思路就是基于redis来做一个发布/订阅的方式达到多节点协作的目的,t-io内置的集群也是使用的此解决方案。下面就来聊聊如何使用t-io的内置集群。

0x03 t-io的内置集群

t-io中是否开启集群是通过org.tio.core.GroupContext中的tioClusterConfig是否为空来判断的。

好了,闲话少说直接上菜(代码)

判断是否开启集群(org.tio.core.GroupContext)

/**
 * 是否是集群
 * @return true: 是集群
 * @author: tanyaowu
 */
public boolean isCluster() {
    return tioClusterConfig != null;
}

tio-websocket-showcase中增加集群解决方案

//实例化t-io集群配置
TioClusterConfig tioClusterConfig = TioClusterConfig.newInstance("Javen", RedissonTemplate.me().getRedissonClient());
//开启群组集群-默认不集群
tioClusterConfig.setCluster4group(true);
//配置t-io集群
serverGroupContext.setTioClusterConfig(tioClusterConfig);
  • TioClusterConfig 中为我们封装了各种场景下是否开启集群的参数配置、消息的发布与订阅以及添加消息监听
  • RedissonTemplate 是使用J-IM中的部分代码,目的是来实例化RedissonClient

RedissonTemplate 代码如下慢慢品读

package org.jim.common.cache.redis;

import java.io.Serializable;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @author WChao
 * @date 2018年5月18日 下午2:46:55
 */
public class RedissonTemplate implements Serializable{

    private static final long serialVersionUID = -4528751601700736437L;
    private static final Logger logger = LoggerFactory.getLogger(RedissonTemplate.class);
    private static RedissonTemplate instance = null;
    private static RedisConfiguration redisConfig = null;
    private static final String REDIS = "redis";
    private static RedissonClient redissonClient = null;
    
    private RedissonTemplate(){};
    
    public static RedissonTemplate me() throws Exception{
         if (instance == null) { 
                synchronized (RedissonTemplate.class) {
                    if(instance == null){
                        redisConfig = RedisConfigurationFactory.parseConfiguration();
                        init();
                        instance = new RedissonTemplate();
                    }
                }
         }
         return instance;
    }
    
    private static final void init() throws Exception {
            String host = redisConfig.getHost();
            if(host == null) {
                logger.error("the server ip of redis  must be not null!");
                throw new Exception("the server ip of redis  must be not null!");
            }   
            int port = redisConfig.getPort();
            String password = redisConfig.getAuth();
            Config redissonConfig = new Config();
            SingleServerConfig singleServerConfig = redissonConfig.useSingleServer();
            singleServerConfig.setAddress(REDIS+"://"+host+":"+port).setPassword(password).setTimeout(redisConfig.getTimeout()).setRetryAttempts(redisConfig.getRetryNum());
            try {
               redissonClient = Redisson.create(redissonConfig);
            } catch (Exception e) {
                logger.error("cann't create RedissonClient for server"+redisConfig.getHost());
                throw new Exception("cann't create RedissonClient for server"+redisConfig.getHost());
            }
            
    }
    /**
     * 获取RedissonClient客户端;
     * @return
     */
    public final RedissonClient getRedissonClient(){
        return redissonClient;
    }
}

看到这里有人可能要问,在什么地方发布消息以及处理订阅消息!!!

  • 什么地方发布消息

    当然是发送消息的时候,调用Tio.sendXxx()系列方法的时候。在tio-websocket-showcase中主要实现的是群聊,调用的是Tio.sendToGroup(),具体实现代码如下:

    /**
       * 发消息到组
       * @param groupContext
       * @param group
       * @param packet
       * @param channelContextFilter
       * @author tanyaowu
       */
      private static Boolean sendToGroup(GroupContext groupContext, String group, Packet packet, ChannelContextFilter channelContextFilter, boolean isBlock) {
          try {
              SetWithLock<ChannelContext> setWithLock = groupContext.groups.clients(groupContext, group);
              if (setWithLock == null) {
                  log.debug("{}, 组[{}]不存在", groupContext.getName(), group);
                  return false;
              }
              Boolean ret = sendToSet(groupContext, setWithLock, packet, channelContextFilter, isBlock);
              return ret;
          } finally {
                //判断是否集群以及是不是集群通过topic转过来的消息包
              if (groupContext.isCluster() && !packet.isFromCluster()) {
                  TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig();
                  //判断是否开启了群组集群
                  if (tioClusterConfig.isCluster4group()) {
    //                    TioClusterVo tioClusterVo = new TioClusterVo(packet);
    //                    tioClusterVo.setGroup(group);
    //                    tioClusterConfig.publishAsyn(tioClusterVo);
                        //在集群环境下,把群组消息通知到集群中的其它机器
                      notifyClusterForGroup(groupContext, group, packet);
                  }
              }
          }
      }
    
    /**
       * 在集群环境下,把群组消息通知到集群中的其它机器
       * @param groupContext
       * @param group
       * @param packet
       */
      public static void notifyClusterForGroup(GroupContext groupContext, String group, Packet packet) {
          TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig();
          TioClusterVo tioClusterVo = new TioClusterVo(packet);
          tioClusterVo.setGroup(group);
          tioClusterConfig.publishAsyn(tioClusterVo);
      }
    
  • 处理订阅消息

其实在t-io中有默认实现,具体的代码如下

public void setTioClusterConfig(TioClusterConfig tioClusterConfig) {
    this.tioClusterConfig = tioClusterConfig;
    if (this.tioClusterConfig != null) {
        this.tioClusterConfig.addMessageListener(new DefaultMessageListener(this));
    }
}

org.tio.core.cluster.DefaultMessageListener 有详细的注释慢慢品读

package org.tio.core.cluster;

import org.apache.commons.lang3.StringUtils;
import org.redisson.api.listener.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Tio;
import org.tio.core.GroupContext;
import org.tio.core.intf.Packet;
import org.tio.utils.json.Json;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 默认的集群消息监听类
 * 作者: 陈磊(Cooppor)
 * 日期: 2018-05-28 15:08
 */
public class DefaultMessageListener implements MessageListener<TioClusterVo> {

    private static Logger log = LoggerFactory.getLogger(DefaultMessageListener.class);

    /**
     * 收到了多少次topic
     */
    private static final AtomicLong RECEIVED_TOPIC_COUNT = new AtomicLong();

    private GroupContext groupContext;

    public DefaultMessageListener(GroupContext groupContext) {
        this.groupContext = groupContext;
    }

    @Override
    public void onMessage(String channel, TioClusterVo tioClusterVo) {
        log.info("收到topic:{}, count:{}, tioClusterVo:{}", channel, RECEIVED_TOPIC_COUNT.incrementAndGet(), Json.toJson(tioClusterVo));
        String clientid = tioClusterVo.getClientId();
        if (StringUtils.isBlank(clientid)) {
            log.error("clientid is null");
            return;
        }
        if (Objects.equals(TioClusterVo.CLIENTID, clientid)) {
            log.info("自己发布的消息,忽略掉,{}", clientid);
            return;
        }

        Packet packet = tioClusterVo.getPacket();
        if (packet == null) {
            log.error("packet is null");
            return;
        }
        packet.setFromCluster(true);
        
        //发送给所有
        boolean isToAll = tioClusterVo.isToAll();
        if (isToAll) {
            Tio.sendToAll(groupContext, packet);
        }

        //发送给指定组
        String group = tioClusterVo.getGroup();
        if (StringUtils.isNotBlank(group)) {
            Tio.sendToGroup(groupContext, group, packet);
        }

        //发送给指定用户
        String userid = tioClusterVo.getUserid();
        if (StringUtils.isNotBlank(userid)) {
            Tio.sendToUser(groupContext, userid, packet);
        }
        
        //发送给指定token
        String token = tioClusterVo.getToken();
        if (StringUtils.isNotBlank(token)) {
            Tio.sendToToken(groupContext, token, packet);
        }

        //发送给指定ip
        String ip = tioClusterVo.getIp();
        if (StringUtils.isNotBlank(ip)) {
            Tio.sendToIp(groupContext, ip, packet);
        }
        
        //发送给指定channelId
        String channelId = tioClusterVo.getChannelId();
        if (StringUtils.isNotBlank(channelId)) {
            Tio.sendToId(groupContext, channelId, packet);
        }
    }
}
0x05 配置redis

哥们,测试时别忘了配置Redis。

/tio-websocket-showcase/src/main/resources/redis.properties

#连接池连接不够用时,重试获取连接次数
retrynum = 100
#可用连接实例的最大数目,默认值为8;
maxactive = -1
#控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
maxidle = 20
#等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。
maxwait = 5000
timeout = 2000
#redis所在机器ip
host = 127.0.0.1
#redis端口号
port = 6379
#redis密码
auth = 

开启两个端口测试 9326以及9327

9326端口
9327端口

到这里在t-io 中借助Redis来实现集群部署实现步骤就介绍完了,个人能力有限如有错误欢迎指正。你有更好的解决方案或者建议欢迎一起交流讨论,如有疑问欢迎留言。

Fork的源码地址 https://gitee.com/javen205/tio-websocket-showcase

0x06 广而告之
  • IJPay 让支付触手可及,封装了微信支付、支付宝支付、银联支付常用的支付方式以及各种常用的接口。不依赖任何第三方 mvc 框架,仅仅作为工具使用简单快速完成支付模块的开发,可轻松嵌入到任何系统里。
  • t-io 让天下没有难开发的网络编程
  • J-IM 是用JAVA语言,基于t-io开发的轻量、高性能、单机支持几十万至百万在线用户IM,主要目标降低即时通讯门槛,快速打造低成本接入在线IM系统。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容

  • 维也纳篇 玩 维也纳,我给它的定位是金色,说到维也纳我脑子中就蹦出金色,应该是源于金色大厅。 一、史特凡大教堂 可...
    江南茱儿阅读 280评论 0 0
  • 北京时间10月19日,据NBA官网报道,布鲁克林篮网队今天正式宣布,华裔控卫林书豪在昨天与印第安纳步行者队的比赛中...
    余生将忆北阅读 466评论 0 4
  • 不知走到了哪里 好像这条路总是尽头又想是没有边 呀,扁着的裤脚不知何时松了扎起的衣摆悄悄地掉了 一阵急促的秋雨说来...
    一只机器喵阅读 197评论 0 0
  • 看完这本《书都不会读,你还想成功》一书,实实在在被震惊到了。平时也自诩是一位喜欢读书之人,可和书中这些大咖们比起来...
    清水无香LY阅读 298评论 0 3