canal源码解析之canal.deployer

canal源码的启动类查找
想要知道canal的启动类 只需看一下自带的启动脚本。下载到canal.deployer-1.0.22.tar.gz (github项目的发布文件一般在release目录下)解压 进入bin目录 找到对应平台的启动脚本 startup.bat 用文本方式打开

@echo off
@if not "%ECHO%" == ""  echo %ECHO%
@if "%OS%" == "Windows_NT"  setlocal

set ENV_PATH=.\
if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%

set conf_dir=%ENV_PATH%\..\conf
set canal_conf=%conf_dir%\canal.properties
set logback_configurationFile=%conf_dir%\logback.xml

set CLASSPATH=%conf_dir%
set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%

set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"

set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%

set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher
echo start cmd : %CMD_STR%

java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher

分析是设置了一些jvm参数 用于启动。发现最终启动的类是CanalLauncher

运行CanalLauncher的main方法 因为此时的classpath 已经在eclispe 的buildpath中指定了 并且配置文件已经是在classpath中了,所以启动的时候不需要再设置了。

此图反应了源码启动类所在的项目以及classpath的设定等

查看此类的main方法 首先是加载了配置文件到内存 用于启动的参数。
接着 将properties 传给类CanalController 此变量是final的 可见 此变量后续是不允许更改的 及只会初始化一次 想改变此类的时候都是不可行的。
看一下 构造方法 此类中涉及到Guava库Guava Cache的MapMaker 还有 toString的方式ToStringBuilder ToStringStyle

其中比较重要的配置项 canal.instance.global.mode 指定是spring 还有就是大家熟悉的spring的context类似的文件canal.instance.global.spring.xml 许多用到的bean的定义都在其中

canal.instance.global.mode = spring 
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

file-instance.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
           http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
    default-autowire="byName">

    <!-- properties -->
    <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
        <property name="ignoreResourceNotFound" value="true" />
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
        <property name="locationNames">
            <list>
                <value>classpath:canal.properties</value>
                <value>classpath:${canal.instance.destination:}/instance.properties</value>
            </list>
        </property>
    </bean>
    
    <bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
    <bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
        <property name="propertyEditorRegistrars">
            <list>
                <ref bean="socketAddressEditor" />
            </list>
        </property>
    </bean>
    
    <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
        <property name="destination" value="${canal.instance.destination}" />
        <property name="eventParser">
            <ref local="eventParser" />
        </property>
        <property name="eventSink">
            <ref local="eventSink" />
        </property>
        <property name="eventStore">
            <ref local="eventStore" />
        </property>
        <property name="metaManager">
            <ref local="metaManager" />
        </property>
        <property name="alarmHandler">
            <ref local="alarmHandler" />
        </property>
    </bean>
    
    <!-- 报警处理类 -->
    <bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
    
    <bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
        <property name="dataDir" value="${canal.file.data.dir:../conf}" />
        <property name="period" value="${canal.file.flush.period:1000}" />
    </bean>
    
    <bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
        <property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
        <property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
        <property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
        <property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
    </bean>
    
    <bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
        <property name="eventStore" ref="eventStore" />
    </bean>

    <bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
        <property name="destination" value="${canal.instance.destination}" />
        <property name="slaveId" value="${canal.instance.mysql.slaveId:1234}" />
        <!-- 心跳配置 -->
        <property name="detectingEnable" value="${canal.instance.detecting.enable:false}" />
        <property name="detectingSQL" value="${canal.instance.detecting.sql}" />
        <property name="detectingIntervalInSeconds" value="${canal.instance.detecting.interval.time:5}" />
        <property name="haController">
            <bean class="com.alibaba.otter.canal.parse.ha.HeartBeatHAController">
                <property name="detectingRetryTimes" value="${canal.instance.detecting.retry.threshold:3}" />
                <property name="switchEnable" value="${canal.instance.detecting.heartbeatHaEnable:false}" />
            </bean>
        </property>
        
        <property name="alarmHandler" ref="alarmHandler" />
        
        <!-- 解析过滤处理 -->
        <property name="eventFilter">
            <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
                <constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
            </bean>
        </property>
        
        <property name="eventBlackFilter">
            <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
                <constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
                <constructor-arg index="1" value="false" />
            </bean>
        </property>
        <!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
        <property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
        
        <!-- 网络链接参数 -->
        <property name="receiveBufferSize" value="${canal.instance.network.receiveBufferSize:16384}" />
        <property name="sendBufferSize" value="${canal.instance.network.sendBufferSize:16384}" />
        <property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
        
        <!-- 解析编码 -->
        <!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
        <property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
    
        <!-- 解析位点记录 -->
        <property name="logPositionManager">
            <bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
                <property name="primary">
                    <bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
                </property>
                <property name="failback">
                    <bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
                        <property name="metaManager" ref="metaManager" />
                    </bean>
                </property>
            </bean>
        </property>
        
        <!-- failover切换时回退的时间 -->
        <property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
        
        <!-- 解析数据库信息 -->
        <property name="masterInfo">
            <bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
                <property name="address" value="${canal.instance.master.address}" />
                <property name="username" value="${canal.instance.dbUsername:retl}" />
                <property name="password" value="${canal.instance.dbPassword:retl}" />
                <property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
            </bean>
        </property>
        <property name="standbyInfo">
            <bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
                <property name="address" value="${canal.instance.standby.address}" />
                <property name="username" value="${canal.instance.dbUsername:retl}" />
                <property name="password" value="${canal.instance.dbPassword:retl}" />
                <property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
            </bean>
        </property>
        
        <!-- 解析起始位点 -->
        <property name="masterPosition">
            <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
                <property name="journalName" value="${canal.instance.master.journal.name}" />
                <property name="position" value="${canal.instance.master.position}" />
                <property name="timestamp" value="${canal.instance.master.timestamp}" />
            </bean>
        </property>
        <property name="standbyPosition">
            <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
                <property name="journalName" value="${canal.instance.standby.journal.name}" />
                <property name="position" value="${canal.instance.standby.position}" />
                <property name="timestamp" value="${canal.instance.standby.timestamp}" />
            </bean>
        </property>
        <property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
        <property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
        <property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
        <property name="filterRows" value="${canal.instance.filter.rows:false}" />
        <property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
        <property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
        <property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
    </bean>
</beans>

这里有个知识点 PropertyPlaceholderConfigurer设置默认值的方式 可以点击过去查看另一篇文章
知识点spring属性编辑器的使用。接着到了最主要的类
CanalInstanceWithSpring 看其类图

CanalInstanceWithSpring .png
AbstractCanalInstance.png
架构图

说明:server代表一个canal服务端的运行实例,对应一个jvm
instance对应一个数据队列(一个server可以对应多个instance,意思我们可以配置多个instance在canal中,具体配置方式 是在canal.properties 的canal.destinations的key 多个的话以逗号分隔,怎么知道的呢?找到源码

private void initInstanceConfig(Properties properties) {
        String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
        String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);

        for (String destination : destinations) {
            InstanceConfig config = parseInstanceConfig(properties, destination);
            InstanceConfig oldConfig = instanceConfigs.put(destination, config);

            if (oldConfig != null) {
                logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
                        oldConfig, config });
            }
        }
    }

其中 public static final String CANAL_DESTINATION_SPLIT = ",";

instance下的子模块:看类图的属性就知道有
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
alarmHandler( alarm报警机制)
这些组件在项目代码中都有对应的子项目与之对应起来

启动CanalLauncher类 其中有main方法直接运行即可。看源码是先初始化了Properties 然后传给CanalController类

 final CanalController controller = new CanalController(properties);

这个实例是final的 说明这个类一旦被初始化时不可变的,防止多次启动。

debug跟进去 看CanalController 的实例化,先初始化instance的公共属性。然后实例化embededCanalServer和canalServer ,一个用于连接mysql master 一个供客户端连接。初始化ServerRunningMonitors,初始化monitor机制。初始化完成后 调用controller.start()的方法
start方法中先是去zk创建整个canal的工作节点(如果基于zk构建了HA的话)然后优先启动embeded服务,嗲用的是CanalServerWithEmbedded的start方法 查看此类得知 是去创建了canalinstance

public void start() {
        if (!isStart()) {
            super.start();

            canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {

                public CanalInstance apply(String destination) {
                    return canalInstanceGenerator.generate(destination);
                }
            });

            // lastRollbackPostions = new MapMaker().makeMap();
        }
    }

然后去启动对应的instance
此时会调用查看mod方式,如果是spring的话

else if (config.getMode().isSpring()) {
                    SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
                    synchronized (this) {
                        try {
                            // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                            System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
                            instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
                            return instanceGenerator.generate(destination);
                        } catch (Throwable e) {
                            logger.error("generator instance failed.", e);
                            throw new CanalException(e);
                        } finally {
                            System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
                        }
                    }
                }

利用spring的

 ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
        return applicationContext;

去加载对应instance的xml文件 然后 通过 beanFactory.getBean(beanName); 方式得到 CanalInstance,一般是spring,所以得到的是CanalInstanceWithSpring,然后启动 调用start方法,查看CanalInstanceWithSpring的父类AbstractCanalInstance的start方法

@Override
    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();
        }

        if (!alarmHandler.isStart()) {
            alarmHandler.start();
        }

        if (!eventStore.isStart()) {
            eventStore.start();
        }

        if (!eventSink.isStart()) {
            eventSink.start();
        }

        if (!eventParser.isStart()) {
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }

可以看到启动了 instance下所有的子模块
各个子模块的start的方法分开分析,此时cananl的启动算是完成了,最后注册关闭的时调用的方法

 Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    try {
                        logger.info("## stop the canal server");
                        controller.stop();
                    } catch (Throwable e) {
                        logger.warn("##something goes wrong when stopping canal Server:\n{}",
                            ExceptionUtils.getFullStackTrace(e));
                    } finally {
                        logger.info("## canal server is down.");
                    }
                }

            });
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,803评论 6 342
  • 一、背景 早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务...
    献给记性不好的自己阅读 46,131评论 3 40
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,622评论 18 399
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,239评论 11 349