转载-mycat启动分析

  1. 程序的入口是io.mycat.mycat2.MycatCore.
    在main 方法中 首选取得ProxyRuntime的实例,该类是一个单例模式.
runtime.setConfig(new MycatConfig());

该方法为runtime设置了一个MycatConfig ,MycatConfig 是负责读取配置文件的.

ConfigLoader.INSTANCE.loadCore();

调用ConfigLoader 进行配置文件的加载.

3.1. 加载mycat.yml,heartbeat.yml,cluster.yml,balancer.yml,user.yml 的配置文件.代码如下:

public void loadCore() throws IOException {
    loadConfig(ConfigEnum.PROXY, GlobalBean.INIT_VERSION);
    loadConfig(ConfigEnum.HEARTBEAT, GlobalBean.INIT_VERSION);
    loadConfig(ConfigEnum.CLUSTER, GlobalBean.INIT_VERSION);
    loadConfig(ConfigEnum.BALANCER, GlobalBean.INIT_VERSION);
    loadConfig(ConfigEnum.USER, GlobalBean.INIT_VERSION);
}

3.2 进行配置文件的加载,调用了YamlUtil进行加载,并将配置信息赋值给MycatConfig,关键代码如下 :

  conf.putConfig(configEnum, (Configurable) YamlUtil.load(fileName, configEnum.getClazz()), version);

3.3 在io.mycat.util.YamlUtil.load(String, Class<T>) 中,处理也很简单,通过Yaml进行文件的加载,然后通过反射机制将属性进行赋值.代码如下:

/**
 * 从指定的文件中加载配置
 * @param fileName 需要加载的文件名
 * @param clazz 加载后需要转换成的类对象
 * @return
 * @throws FileNotFoundException
 */
public static <T> T load(String fileName, Class<T> clazz) throws FileNotFoundException {
    InputStreamReader fis = null;
    try {
        URL url = YamlUtil.class.getClassLoader().getResource(fileName);
        if (url != null) {
            Yaml yaml = new Yaml();
            fis = new InputStreamReader(new FileInputStream(url.getFile()), StandardCharsets.UTF_8);
            T obj = yaml.loadAs(fis, clazz);
            return obj;
        }
        return null;
    } finally {
        if (fis != null) {
            try {
                fis.close();
            } catch (IOException ignored) {
            }
        }
    }
}
  1. 进行通过命令行传入参数的解析.如:可以传入
    • -mycat.proxy.port 8067
    • -mycat.cluster.enable true
    • -mycat.cluster.port 9067
    • -mycat.cluster.myNodeId leader-2

参数等
支持的参数可在 io.mycat.mycat2.beans.ArgsBean 中进行查看.

获得参数后通过遍历进行赋值操作即可,代码如下:

private static void solveArgs(String[] args) {
int lenght = args.length;

MycatConfig conf = ProxyRuntime.INSTANCE.getConfig();
ProxyConfig proxyConfig = conf.getConfig(ConfigEnum.PROXY);
ClusterConfig clusterConfig = conf.getConfig(ConfigEnum.CLUSTER);
BalancerConfig balancerConfig= conf.getConfig(ConfigEnum.BALANCER);

for (int i = 0; i < lenght; i++) {
    switch(args[i]) {
        case ArgsBean.PROXY_PORT:
            proxyConfig.getProxy().setPort(Integer.parseInt(args[++i]));
            break;
        case ArgsBean.CLUSTER_ENABLE:
            clusterConfig.getCluster().setEnable(Boolean.parseBoolean(args[++i]));
            break;
        case ArgsBean.CLUSTER_PORT:
            clusterConfig.getCluster().setPort(Integer.parseInt(args[++i]));
            break;
        case ArgsBean.CLUSTER_MY_NODE_ID:
            clusterConfig.getCluster().setMyNodeId(args[++i]);
            break;
        case ArgsBean.BALANCER_ENABLE:
            balancerConfig.getBalancer().setEnable(Boolean.parseBoolean(args[++i]));
            break;
        case ArgsBean.BALANCER_PORT:
            balancerConfig.getBalancer().setPort(Integer.parseInt(args[++i]));
            break;
        case ArgsBean.BALANCER_STRATEGY:
            BalancerBean.BalancerStrategyEnum strategy = BalancerBean.BalancerStrategyEnum.getEnum(args[++i]);
            if (strategy == null) {
                throw new IllegalArgumentException("no such balancer strategy");
            }
            balancerConfig.getBalancer().setStrategy(strategy);
            break;
        default:
            break;
    }
}
}
  1. 设置NioReactorThreads,线程数目按照cpu的数目而定.
int cpus = Runtime.getRuntime().availableProcessors();
runtime.setNioReactorThreads(cpus);
runtime.setReactorThreads(new MycatReactorThread[cpus]);

6.设置MycatSessionManager

  1. 然后调用io.mycat.proxy.ProxyRuntime 的init方法,进行资源的初始化.代码如下:
public void init() {
//心跳调度独立出来,避免被其他任务影响
heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
HeartbeatConfig heartbeatConfig = config.getConfig(ConfigEnum.HEARTBEAT);
timerExecutor = ExecutorUtil.create("Timer", heartbeatConfig.getHeartbeat().getTimerExecutor());
businessExecutor = ExecutorUtil.create("BusinessExecutor",Runtime.getRuntime().availableProcessors());
listeningExecutorService = MoreExecutors.listeningDecorator(businessExecutor);
MatchMethodGenerator.initShrinkCharTbl();
}

7.1. 建立心跳线程, 原因是避免被其他任务影响.

7.2 建立timerExecutor,线程数目默认为2.

7.3 建立businessExecutor,线程数目为cpu数目.

7.4 建立listeningExecutorService, 该ExecutorService将会进行任务的提交给businessExecutor

7.5 调用io.mycat.mycat2.sqlparser.MatchMethodGenerator#initShrinkCharTbl方法.该方法只是对0-9a-zA-Z的字符进行映射.代码如下:

static final byte[] shrinkCharTbl = new byte[96];//为了压缩hash字符映射空间,再次进行转义
public static void initShrinkCharTbl () {
    shrinkCharTbl[0] = 1;//从 $ 开始计算
    IntStream.rangeClosed('0', '9').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'0'+2));
    IntStream.rangeClosed('A', 'Z').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'A'+12));
    IntStream.rangeClosed('a', 'z').forEach(c -> shrinkCharTbl[c-'$'] = (byte)(c-'a'+12));
    shrinkCharTbl['_'-'$'] = (byte)38;
}
  1. 调用ProxyStarter的start,首先 是创建了NIOAcceptor,负责对请求进行处理.然后根据ClusterConfig的配置进行相应的处理.
ClusterConfig clusterConfig = conf.getConfig(ConfigEnum.CLUSTER);
ClusterBean clusterBean = clusterConfig.getCluster();
if (clusterBean.isEnable()) {
    // 启动集群
    startCluster(runtime, clusterBean, acceptor);
} else {
    // 未配置集群,直接启动
    startProxy(true);
}

9.接下来分析未配置集群的启动方式.

9.1 首先通过ProxyRuntime获取到ProxyConfig(该对象是mycat.yml的封装),mycat.yml的配置文件如下:

proxy:
  ip: 0.0.0.0
  port: 8066

因此可以通过该类拿到启动端口和ip.因此传入NIOAcceptor进行监听.

9.2 在io.mycat.proxy.NIOAcceptor#startServerChannel中,做了如下处理:

9.2.1 首先检查ServerSocketChannel是否已经启动,如果已经启动,不进行后续处理.

9.2.2 根据传入的ServerType做不同的处理,当前我们传入的值为MYCAT. 因此不进行处理.

if (serverType == ServerType.CLUSTER) {
    adminSessionMan = new DefaultAdminSessionManager();
    ProxyRuntime.INSTANCE.setAdminSessionManager(adminSessionMan);
    logger.info("opend cluster conmunite port on {}:{}", ip, port);
} else if (serverType == ServerType.LOAD_BALANCER){
    logger.info("opend load balance conmunite port on {}:{}", ip, port);
    ProxyRuntime.INSTANCE.setProxySessionSessionManager(new ProxySessionManager());
    ProxyRuntime.INSTANCE.setLbSessionSessionManager(new LBSessionManager());
}

9.2.3 调用io.mycat.proxy.NIOAcceptor#getServerSocketChannel,进行启动.代码如下:

private void openServerChannel(Selector selector, String bindIp, int bindPort, ServerType serverType)
    throws IOException {
final ServerSocketChannel serverChannel = ServerSocketChannel.open();
final InetSocketAddress isa = new InetSocketAddress(bindIp, bindPort);
serverChannel.bind(isa);
serverChannel.configureBlocking(false);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.register(selector, SelectionKey.OP_ACCEPT, serverType);
if (serverType == ServerType.CLUSTER) {
    logger.info("open cluster server port on {}:{}", bindIp, bindPort);
    clusterServerSocketChannel = serverChannel;
} else if (serverType == ServerType.LOAD_BALANCER) {
    logger.info("open load balance server port on {}:{}", bindIp, bindPort);
    loadBalanceServerSocketChannel = serverChannel;
} else {
    logger.info("open proxy server port on {}:{}", bindIp, bindPort);
    proxyServerSocketChannel = serverChannel;
}
}

9.4 调用io.mycat.mycat2.ProxyStarter#startReactor,进行MycatReactorThread的启动.该线程负责对请求进行处理.

9.5 加载配置文件信息,该方法在ROOT_PATH目录下创建了prepare和archive两个文件夹,并加载了replica-index.yml,datasource.yml,schema.yml,最后调用了AnnotationProcessor#getInstance 进行初始化.

9.5.1 创建DynamicAnnotationManager.

9.5.2 对AnnotationProcessor.class.getClassLoader().getResource("") 的路径进行监听.当文件目录有变化时会回调io.mycat.mycat2.sqlannotations.AnnotationProcessor#listen方法.代码如下:

public static void listen() {
try {
while (true) {
    WatchKey key = watcher.take();//todo 线程复用,用 poll比较好?
    boolean flag = false;
    for (WatchEvent<?> event: key.pollEvents()) {
        String str = event.context().toString();
        if ("actions.yml".equals(str)|| "annotations.yml".equals(str)) {
            flag=true;
            break;
        }
    }
    if (flag){
        System.out.println("动态注解更新次数" + count.incrementAndGet());
        init();
    }
    boolean valid = key.reset();
    if (!valid) {
        break;
    }

}
} catch (Exception e) {
e.printStackTrace();
}
}

==可以看到,当actions.yml或者annotations.yml变化时,就会重新调用init方法,从而对DynamicAnnotationManager进行修改.==

9.6 分别对datasource.yml 和 schema.yml 进行加载.

9.6.1 对MySQLRepBean进行初始化.该bean是对datasource.yml的封装.datasource.yml如下所示:

replicas:
  - name: test                      # 复制组 名称   必须唯一
    repType: MASTER_SLAVE           # 复制类型
    switchType: SWITCH              # 切换类型
    balanceType: BALANCE_ALL_READ   # 读写分离类型
    mysqls:            
      - hostName: mysql-01              # mysql 主机名
        ip: 127.0.0.1               # ip
        port: 3306                  # port
        user: root                  # 用户名
        password: root            # 密码
        minCon: 1                   # 最小连接
        maxCon: 10                  # 最大连接
        maxRetryCount: 3            # 连接重试次数

conf.getMysqlRepMap().forEach((repName, repBean) -> {
    repBean.initMaster();
    repBean.getMetaBeans().forEach(metaBean -> metaBean.prepareHeartBeat(repBean, repBean.getDataSourceInitStatus()));
});

9.6.2 调用io.mycat.mycat2.beans.MySQLRepBean#initMaster, 做了如下处理:

首先加载replica-index.yml,获取replica name 对应的index,并对MySQLRepBean中的metaBeans进行赋值,完成对mysqls 的封装.

之后调用MySQLMetaBean的prepareHeartBeat方法.
完成

9.7 ==调用io.mycat.proxy.ProxyRuntime#startHeartBeatScheduler,启动heartbeatScheduler线程.该线程会每10000 ms 调用io.mycat.proxy.ProxyRuntime#replicaHeartbeat.== 心跳的配置在heartbeat.yml中,默认配置如下:

heartbeat:
  timerExecutor: 2
  replicaHeartbeatPeriod: 10000
  replicaIdleCheckPeriod: 2000
  idleTimeout: 2000
  processorCheckPeriod: 2000
  minSwitchtimeInterval: 120000

9.7.1 在io.mycat.proxy.ProxyRuntime#replicaHeartbeat,方法中代码如下:

private Runnable replicaHeartbeat() {
return ()->{
ProxyReactorThread<?> reactor  = getReactorThreads()[ThreadLocalRandom.current().nextInt(getReactorThreads().length)];
reactor.addNIOJob(()-> config.getMysqlRepMap().values().stream().forEach(f -> f.doHeartbeat()));
};
}

最终会调用io.mycat.mycat2.beans.heartbeat.MySQLHeartbeat#heartbeat.

9.8 对cluster的配置进行处理.代码如下:

BalancerConfig balancerConfig = conf.getConfig(ConfigEnum.BALANCER);
BalancerBean balancerBean = balancerConfig.getBalancer();
// 集群模式下才开启负载均衡服务
if (clusterBean.isEnable() && balancerBean.isEnable()) {
    runtime.getAcceptor().startServerChannel(balancerBean.getIp(), balancerBean.getPort(), ServerType.LOAD_BALANCER);
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 3,843评论 0 5
  • 1、通过CocoaPods安装项目名称项目信息 AFNetworking网络请求组件 FMDB本地数据库组件 SD...
    阳明先生_X自主阅读 15,979评论 3 119
  • 今天一直在项目上。没有思绪。回答一下训练营的每日一问把。 今天的问题是:什么时候你会生起惭愧之心?惭愧心生起之时,...
    黄沿溪阅读 187评论 0 2
  • 幸福 你问我什么是幸福我想说幸福就是悖论和一个不世俗的人过着世俗的生活 你问我什么是幸福我想说幸福就是约分和一个不...
    方正的石头阅读 237评论 7 7