canal源码解析之canal.deployer

canal源码的启动类查找
想要知道canal的启动类 只需看一下自带的启动脚本。下载到canal.deployer-1.0.22.tar.gz (github项目的发布文件一般在release目录下)解压 进入bin目录 找到对应平台的启动脚本 startup.bat 用文本方式打开

@echo off
@if not "%ECHO%" == ""  echo %ECHO%
@if "%OS%" == "Windows_NT"  setlocal

set ENV_PATH=.\
if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%

set conf_dir=%ENV_PATH%\..\conf
set canal_conf=%conf_dir%\canal.properties
set logback_configurationFile=%conf_dir%\logback.xml

set CLASSPATH=%conf_dir%
set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%

set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"

set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%

set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher
echo start cmd : %CMD_STR%

java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher

分析是设置了一些jvm参数 用于启动。发现最终启动的类是CanalLauncher

运行CanalLauncher的main方法 因为此时的classpath 已经在eclispe 的buildpath中指定了 并且配置文件已经是在classpath中了,所以启动的时候不需要再设置了。

此图反应了源码启动类所在的项目以及classpath的设定等

查看此类的main方法 首先是加载了配置文件到内存 用于启动的参数。
接着 将properties 传给类CanalController 此变量是final的 可见 此变量后续是不允许更改的 及只会初始化一次 想改变此类的时候都是不可行的。
看一下 构造方法 此类中涉及到Guava库Guava Cache的MapMaker 还有 toString的方式ToStringBuilder ToStringStyle

其中比较重要的配置项 canal.instance.global.mode 指定是spring 还有就是大家熟悉的spring的context类似的文件canal.instance.global.spring.xml 许多用到的bean的定义都在其中

canal.instance.global.mode = spring 
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

file-instance.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
           http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
    default-autowire="byName">

    <!-- properties -->
    <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
        <property name="ignoreResourceNotFound" value="true" />
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
        <property name="locationNames">
            <list>
                <value>classpath:canal.properties</value>
                <value>classpath:${canal.instance.destination:}/instance.properties</value>
            </list>
        </property>
    </bean>
    
    <bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
    <bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
        <property name="propertyEditorRegistrars">
            <list>
                <ref bean="socketAddressEditor" />
            </list>
        </property>
    </bean>
    
    <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
        <property name="destination" value="${canal.instance.destination}" />
        <property name="eventParser">
            <ref local="eventParser" />
        </property>
        <property name="eventSink">
            <ref local="eventSink" />
        </property>
        <property name="eventStore">
            <ref local="eventStore" />
        </property>
        <property name="metaManager">
            <ref local="metaManager" />
        </property>
        <property name="alarmHandler">
            <ref local="alarmHandler" />
        </property>
    </bean>
    
    <!-- 报警处理类 -->
    <bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
    
    <bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
        <property name="dataDir" value="${canal.file.data.dir:../conf}" />
        <property name="period" value="${canal.file.flush.period:1000}" />
    </bean>
    
    <bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
        <property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
        <property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
        <property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
        <property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
    </bean>
    
    <bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
        <property name="eventStore" ref="eventStore" />
    </bean>

    <bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
        <property name="destination" value="${canal.instance.destination}" />
        <property name="slaveId" value="${canal.instance.mysql.slaveId:1234}" />
        <!-- 心跳配置 -->
        <property name="detectingEnable" value="${canal.instance.detecting.enable:false}" />
        <property name="detectingSQL" value="${canal.instance.detecting.sql}" />
        <property name="detectingIntervalInSeconds" value="${canal.instance.detecting.interval.time:5}" />
        <property name="haController">
            <bean class="com.alibaba.otter.canal.parse.ha.HeartBeatHAController">
                <property name="detectingRetryTimes" value="${canal.instance.detecting.retry.threshold:3}" />
                <property name="switchEnable" value="${canal.instance.detecting.heartbeatHaEnable:false}" />
            </bean>
        </property>
        
        <property name="alarmHandler" ref="alarmHandler" />
        
        <!-- 解析过滤处理 -->
        <property name="eventFilter">
            <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
                <constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
            </bean>
        </property>
        
        <property name="eventBlackFilter">
            <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
                <constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
                <constructor-arg index="1" value="false" />
            </bean>
        </property>
        <!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
        <property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
        
        <!-- 网络链接参数 -->
        <property name="receiveBufferSize" value="${canal.instance.network.receiveBufferSize:16384}" />
        <property name="sendBufferSize" value="${canal.instance.network.sendBufferSize:16384}" />
        <property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
        
        <!-- 解析编码 -->
        <!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
        <property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
    
        <!-- 解析位点记录 -->
        <property name="logPositionManager">
            <bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
                <property name="primary">
                    <bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
                </property>
                <property name="failback">
                    <bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
                        <property name="metaManager" ref="metaManager" />
                    </bean>
                </property>
            </bean>
        </property>
        
        <!-- failover切换时回退的时间 -->
        <property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
        
        <!-- 解析数据库信息 -->
        <property name="masterInfo">
            <bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
                <property name="address" value="${canal.instance.master.address}" />
                <property name="username" value="${canal.instance.dbUsername:retl}" />
                <property name="password" value="${canal.instance.dbPassword:retl}" />
                <property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
            </bean>
        </property>
        <property name="standbyInfo">
            <bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
                <property name="address" value="${canal.instance.standby.address}" />
                <property name="username" value="${canal.instance.dbUsername:retl}" />
                <property name="password" value="${canal.instance.dbPassword:retl}" />
                <property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
            </bean>
        </property>
        
        <!-- 解析起始位点 -->
        <property name="masterPosition">
            <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
                <property name="journalName" value="${canal.instance.master.journal.name}" />
                <property name="position" value="${canal.instance.master.position}" />
                <property name="timestamp" value="${canal.instance.master.timestamp}" />
            </bean>
        </property>
        <property name="standbyPosition">
            <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
                <property name="journalName" value="${canal.instance.standby.journal.name}" />
                <property name="position" value="${canal.instance.standby.position}" />
                <property name="timestamp" value="${canal.instance.standby.timestamp}" />
            </bean>
        </property>
        <property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
        <property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
        <property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
        <property name="filterRows" value="${canal.instance.filter.rows:false}" />
        <property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
        <property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
        <property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
    </bean>
</beans>

这里有个知识点 PropertyPlaceholderConfigurer设置默认值的方式 可以点击过去查看另一篇文章
知识点spring属性编辑器的使用。接着到了最主要的类
CanalInstanceWithSpring 看其类图

CanalInstanceWithSpring .png
AbstractCanalInstance.png
架构图

说明:server代表一个canal服务端的运行实例,对应一个jvm
instance对应一个数据队列(一个server可以对应多个instance,意思我们可以配置多个instance在canal中,具体配置方式 是在canal.properties 的canal.destinations的key 多个的话以逗号分隔,怎么知道的呢?找到源码

private void initInstanceConfig(Properties properties) {
        String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
        String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);

        for (String destination : destinations) {
            InstanceConfig config = parseInstanceConfig(properties, destination);
            InstanceConfig oldConfig = instanceConfigs.put(destination, config);

            if (oldConfig != null) {
                logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
                        oldConfig, config });
            }
        }
    }

其中 public static final String CANAL_DESTINATION_SPLIT = ",";

instance下的子模块:看类图的属性就知道有
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
alarmHandler( alarm报警机制)
这些组件在项目代码中都有对应的子项目与之对应起来

启动CanalLauncher类 其中有main方法直接运行即可。看源码是先初始化了Properties 然后传给CanalController类

 final CanalController controller = new CanalController(properties);

这个实例是final的 说明这个类一旦被初始化时不可变的,防止多次启动。

debug跟进去 看CanalController 的实例化,先初始化instance的公共属性。然后实例化embededCanalServer和canalServer ,一个用于连接mysql master 一个供客户端连接。初始化ServerRunningMonitors,初始化monitor机制。初始化完成后 调用controller.start()的方法
start方法中先是去zk创建整个canal的工作节点(如果基于zk构建了HA的话)然后优先启动embeded服务,嗲用的是CanalServerWithEmbedded的start方法 查看此类得知 是去创建了canalinstance

public void start() {
        if (!isStart()) {
            super.start();

            canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {

                public CanalInstance apply(String destination) {
                    return canalInstanceGenerator.generate(destination);
                }
            });

            // lastRollbackPostions = new MapMaker().makeMap();
        }
    }

然后去启动对应的instance
此时会调用查看mod方式,如果是spring的话

else if (config.getMode().isSpring()) {
                    SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
                    synchronized (this) {
                        try {
                            // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                            System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
                            instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
                            return instanceGenerator.generate(destination);
                        } catch (Throwable e) {
                            logger.error("generator instance failed.", e);
                            throw new CanalException(e);
                        } finally {
                            System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
                        }
                    }
                }

利用spring的

 ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
        return applicationContext;

去加载对应instance的xml文件 然后 通过 beanFactory.getBean(beanName); 方式得到 CanalInstance,一般是spring,所以得到的是CanalInstanceWithSpring,然后启动 调用start方法,查看CanalInstanceWithSpring的父类AbstractCanalInstance的start方法

@Override
    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();
        }

        if (!alarmHandler.isStart()) {
            alarmHandler.start();
        }

        if (!eventStore.isStart()) {
            eventStore.start();
        }

        if (!eventSink.isStart()) {
            eventSink.start();
        }

        if (!eventParser.isStart()) {
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }

可以看到启动了 instance下所有的子模块
各个子模块的start的方法分开分析,此时cananl的启动算是完成了,最后注册关闭的时调用的方法

 Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    try {
                        logger.info("## stop the canal server");
                        controller.stop();
                    } catch (Throwable e) {
                        logger.warn("##something goes wrong when stopping canal Server:\n{}",
                            ExceptionUtils.getFullStackTrace(e));
                    } finally {
                        logger.info("## canal server is down.");
                    }
                }

            });
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,933评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,958评论 6 342
  • 一、背景 早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务...
    献给记性不好的自己阅读 46,205评论 3 40
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,765评论 18 399
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,366评论 11 349