canal源码的启动类查找
想要知道canal的启动类 只需看一下自带的启动脚本。下载到canal.deployer-1.0.22.tar.gz (github项目的发布文件一般在release目录下)解压 进入bin目录 找到对应平台的启动脚本 startup.bat 用文本方式打开
@echo off
@if not "%ECHO%" == "" echo %ECHO%
@if "%OS%" == "Windows_NT" setlocal
set ENV_PATH=.\
if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
set conf_dir=%ENV_PATH%\..\conf
set canal_conf=%conf_dir%\canal.properties
set logback_configurationFile=%conf_dir%\logback.xml
set CLASSPATH=%conf_dir%
set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"
set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher
echo start cmd : %CMD_STR%
java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher
分析是设置了一些jvm参数 用于启动。发现最终启动的类是CanalLauncher
运行CanalLauncher的main方法 因为此时的classpath 已经在eclispe 的buildpath中指定了 并且配置文件已经是在classpath中了,所以启动的时候不需要再设置了。
查看此类的main方法 首先是加载了配置文件到内存 用于启动的参数。
接着 将properties 传给类CanalController 此变量是final的 可见 此变量后续是不允许更改的 及只会初始化一次 想改变此类的时候都是不可行的。
看一下 构造方法 此类中涉及到Guava库Guava Cache的MapMaker 还有 toString的方式ToStringBuilder ToStringStyle
其中比较重要的配置项 canal.instance.global.mode 指定是spring 还有就是大家熟悉的spring的context类似的文件canal.instance.global.spring.xml 许多用到的bean的定义都在其中
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
file-instance.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
default-autowire="byName">
<!-- properties -->
<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
<property name="ignoreResourceNotFound" value="true" />
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
<property name="locationNames">
<list>
<value>classpath:canal.properties</value>
<value>classpath:${canal.instance.destination:}/instance.properties</value>
</list>
</property>
</bean>
<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
<property name="propertyEditorRegistrars">
<list>
<ref bean="socketAddressEditor" />
</list>
</property>
</bean>
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
<property name="destination" value="${canal.instance.destination}" />
<property name="eventParser">
<ref local="eventParser" />
</property>
<property name="eventSink">
<ref local="eventSink" />
</property>
<property name="eventStore">
<ref local="eventStore" />
</property>
<property name="metaManager">
<ref local="metaManager" />
</property>
<property name="alarmHandler">
<ref local="alarmHandler" />
</property>
</bean>
<!-- 报警处理类 -->
<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
<bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
<property name="dataDir" value="${canal.file.data.dir:../conf}" />
<property name="period" value="${canal.file.flush.period:1000}" />
</bean>
<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
</bean>
<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
<property name="eventStore" ref="eventStore" />
</bean>
<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
<property name="destination" value="${canal.instance.destination}" />
<property name="slaveId" value="${canal.instance.mysql.slaveId:1234}" />
<!-- 心跳配置 -->
<property name="detectingEnable" value="${canal.instance.detecting.enable:false}" />
<property name="detectingSQL" value="${canal.instance.detecting.sql}" />
<property name="detectingIntervalInSeconds" value="${canal.instance.detecting.interval.time:5}" />
<property name="haController">
<bean class="com.alibaba.otter.canal.parse.ha.HeartBeatHAController">
<property name="detectingRetryTimes" value="${canal.instance.detecting.retry.threshold:3}" />
<property name="switchEnable" value="${canal.instance.detecting.heartbeatHaEnable:false}" />
</bean>
</property>
<property name="alarmHandler" ref="alarmHandler" />
<!-- 解析过滤处理 -->
<property name="eventFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
</bean>
</property>
<property name="eventBlackFilter">
<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
<constructor-arg index="1" value="false" />
</bean>
</property>
<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
<!-- 网络链接参数 -->
<property name="receiveBufferSize" value="${canal.instance.network.receiveBufferSize:16384}" />
<property name="sendBufferSize" value="${canal.instance.network.sendBufferSize:16384}" />
<property name="defaultConnectionTimeoutInSeconds" value="${canal.instance.network.soTimeout:30}" />
<!-- 解析编码 -->
<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
<!-- 解析位点记录 -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<property name="primary">
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</property>
<property name="failback">
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<property name="metaManager" ref="metaManager" />
</bean>
</property>
</bean>
</property>
<!-- failover切换时回退的时间 -->
<property name="fallbackIntervalInSeconds" value="${canal.instance.fallbackIntervalInSeconds:60}" />
<!-- 解析数据库信息 -->
<property name="masterInfo">
<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
<property name="address" value="${canal.instance.master.address}" />
<property name="username" value="${canal.instance.dbUsername:retl}" />
<property name="password" value="${canal.instance.dbPassword:retl}" />
<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
</bean>
</property>
<property name="standbyInfo">
<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
<property name="address" value="${canal.instance.standby.address}" />
<property name="username" value="${canal.instance.dbUsername:retl}" />
<property name="password" value="${canal.instance.dbPassword:retl}" />
<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
</bean>
</property>
<!-- 解析起始位点 -->
<property name="masterPosition">
<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
<property name="journalName" value="${canal.instance.master.journal.name}" />
<property name="position" value="${canal.instance.master.position}" />
<property name="timestamp" value="${canal.instance.master.timestamp}" />
</bean>
</property>
<property name="standbyPosition">
<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
<property name="journalName" value="${canal.instance.standby.journal.name}" />
<property name="position" value="${canal.instance.standby.position}" />
<property name="timestamp" value="${canal.instance.standby.timestamp}" />
</bean>
</property>
<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
</bean>
</beans>
这里有个知识点 PropertyPlaceholderConfigurer设置默认值的方式 可以点击过去查看另一篇文章
知识点spring属性编辑器的使用。接着到了最主要的类
CanalInstanceWithSpring 看其类图
说明:server代表一个canal服务端的运行实例,对应一个jvm
instance对应一个数据队列(一个server可以对应多个instance,意思我们可以配置多个instance在canal中,具体配置方式 是在canal.properties 的canal.destinations的key 多个的话以逗号分隔,怎么知道的呢?找到源码
private void initInstanceConfig(Properties properties) {
String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
for (String destination : destinations) {
InstanceConfig config = parseInstanceConfig(properties, destination);
InstanceConfig oldConfig = instanceConfigs.put(destination, config);
if (oldConfig != null) {
logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
oldConfig, config });
}
}
}
其中 public static final String CANAL_DESTINATION_SPLIT = ",";
)
instance下的子模块:看类图的属性就知道有
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
alarmHandler( alarm报警机制)
这些组件在项目代码中都有对应的子项目与之对应起来
启动CanalLauncher类 其中有main方法直接运行即可。看源码是先初始化了Properties 然后传给CanalController类
final CanalController controller = new CanalController(properties);
这个实例是final的 说明这个类一旦被初始化时不可变的,防止多次启动。
debug跟进去 看CanalController 的实例化,先初始化instance的公共属性。然后实例化embededCanalServer和canalServer ,一个用于连接mysql master 一个供客户端连接。初始化ServerRunningMonitors,初始化monitor机制。初始化完成后 调用controller.start()的方法
start方法中先是去zk创建整个canal的工作节点(如果基于zk构建了HA的话)然后优先启动embeded服务,嗲用的是CanalServerWithEmbedded的start方法 查看此类得知 是去创建了canalinstance
public void start() {
if (!isStart()) {
super.start();
canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
public CanalInstance apply(String destination) {
return canalInstanceGenerator.generate(destination);
}
});
// lastRollbackPostions = new MapMaker().makeMap();
}
}
然后去启动对应的instance
此时会调用查看mod方式,如果是spring的话
else if (config.getMode().isSpring()) {
SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
synchronized (this) {
try {
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
return instanceGenerator.generate(destination);
} catch (Throwable e) {
logger.error("generator instance failed.", e);
throw new CanalException(e);
} finally {
System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
}
}
}
利用spring的
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
return applicationContext;
去加载对应instance的xml文件 然后 通过 beanFactory.getBean(beanName); 方式得到 CanalInstance,一般是spring,所以得到的是CanalInstanceWithSpring,然后启动 调用start方法,查看CanalInstanceWithSpring的父类AbstractCanalInstance的start方法
@Override
public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();
}
if (!alarmHandler.isStart()) {
alarmHandler.start();
}
if (!eventStore.isStart()) {
eventStore.start();
}
if (!eventSink.isStart()) {
eventSink.start();
}
if (!eventParser.isStart()) {
beforeStartEventParser(eventParser);
eventParser.start();
afterStartEventParser(eventParser);
}
logger.info("start successful....");
}
可以看到启动了 instance下所有的子模块
各个子模块的start的方法分开分析,此时cananl的启动算是完成了,最后注册关闭的时调用的方法
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the canal server");
controller.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:\n{}",
ExceptionUtils.getFullStackTrace(e));
} finally {
logger.info("## canal server is down.");
}
}
});