1.首先mysql要开启binlog,并配置为row格式
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2.为Canal创建用户并授权
CREATE USER canal IDENTIFIED BY 'canal';
grant select,replication slave,replication client on *.* to 'canal' @ '%' identified by 'canal' #IP需要修改为Canal节点所在的信息(这里有个坑,replication slave 的级别是global,所以如果想限制用户只在某一个数据库下生效,如:grant replication slave on test.* to 'canal' @ '%' , 想让用户只在test库下生效,会报错:Incorrect usage of DB GRANT and GLOBAL PRIVILEGES,所以不能只作用于某一数据库,而是全局, 如果想以数据库粒度限制,在my.cnf中添加binlog-do-db=test来限制主从复制的数据库为test)
3.开始部署Canal集群(需要安装zookeeper,此处略)
下载Canal安装包,地址为https://github.com/alibaba/canal,推荐使用1.1.4版本,该版本引入了canal-admin,可通过WebUI对Canal进行管理
Canal主要配置文件有2个,一个是conf/canal.properties,一个是conf/example/instance.properties。Canal有个强大的地方,修改完配置文件后不需要重启服务,会自动重新加载。
a.修改conf/canal.properties,该文件属于canal全局配置文件,用于配置canal工作模式、zk与kafka等信息
canal.zkServers = 192.168.94.10:2181,192.168.94.11:2181,192.168.94.12:2181 #zk集群地址
canal.instance.global.spring.xml = classpath:spring/default-instance.xml #默认的file-instance不支持HA持久化,需要修改为default
b.修改conf/example/instance.properties。这是每个Canal实例的配置文件,里面配置了需要同步的数据库信息、mq信息等。instance.properties文件所属的目录就属于一个实例,canal默认已创建好一个名为example的示范实例,如果要同步多个数据库实例的话,复制example目录后修改instance.properties即可。
canal.instance.mysql.slaveId=2131 #ID不可与主库和其他从库重复
# position info 需要同步的数据库信息
canal.instance.master.address=192.168.10.56:3306
canal.instance.dbUsername=canal #canal在数据库中的用户
canal.instance.dbPassword=canal #canal用户的密码
-------------------------------------------------------------------------------------------------------
最后记得要修改配置文件中的各种端口,修改 canal.propeties 文件中的 canal.port、canal.metrics.pull.port、canal.admin.jmx.port(这个我暂时没看到) 各配置项,指向其它端口!
canal.port = 12111
canal.metrics.pull.port = 12112
然后就可启动Canal
bin/startup.sh #此启动文件中还有一个启动端口需修改,否则多个实例就会端口冲突(address=9099(此为startup.bat中的))!!启动后需要观察2个日志,logs/example/example.log和logs/canal/canal.log
启动每Canal节点后,通过zk客户端可以查看当前正在工作的Canal节点。当Canal主节点故障时,客户端能通过zk获取到另外节点继续工作
zkcli.sh
[zk: localhost:2181(CONNECTED) 10] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.255.10:12111"}
[zk: localhost:2181(CONNECTED) 11] ls /otter/canal/destinations/example/cluster
[192.168.255.10:11111, 192.168.255.10:12111]
4.在springboot中,引入canal相关的包
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.common</artifactId>
<version>1.1.4</version>
</dependency>
(坑B: <dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
,这几个排除的极容易冲突,导致项目起不来)
建立连接(连接地址写zookeeper即可,后面参数是instance的名字及连接mysql的用户名和密码):
CanalConnectors.newClusterConnector("127.0.0.1:2181","example","canal","canal");
最后就可以去定时获取canal同步到的数据了。
!!有两个没有暂时还解决的问题:
1.canal客户端在消费server端发来的binlog变化时,当既没有ack,也没有rollback时,将程序kill -9关掉,发现server端的binlog的position居然更新了,这导致有可能出现,es还未同步数据,程序宕掉了,而消费进度缺前进以至于数据丢失的问题(但我反复测试后,在es同步前关掉程序,发现server端并未更新pos,但在最后ack前关掉程序,server的pos却更新了)
2.canal不能同时多个client消费一个server,在官网上说的是一个client消费,另一个冷备,但实际是,一个在消费,另一个在捣乱! 当同时开两个client时,总有一个会被server端返回batchId不存在,即在server的内存中batchId被清掉了,这他妈就导致会丢数据啊,到底是配置还是要调整什么在github 的issue上也没得到回答,项目紧没时间去看server端的源码再来解决问题,只能搁置,先用kafka来同步吧!
####################################################################
canal还提供了界面ui,canal-admin,首先下载canal.admin-1.1.4.tar.gz,然后替换它lib目录下的数据库jar包为mysql-connector-java-8.0.12.jar(现在都用mysql8,它自带了驱动版本低了),然后修改conf里的application.yml 配置文件,主要就是一些数据库的配置,它需要建立一个库(canal_manager),要在mysql中导入它准备好的数据(conf下的canal_manager.sql,source conf/canal_manager.sql),然后启动即可,如果界面上登录报错,那十九是mysql8密码加密模式的问题,修改mysql中canal-admin使用的用户的密码加密模式:alter user '用户名'@localhostIDENTIFIED WITH mysql_native_password by '你的密码';
最后登录成功后,在界面上的集群管理选择新建集群,结果还他妈报错,Error[Column 'modified_time' cannot be null],只好把所有表的这个字段加上默认时间:
ALTER TABLE canal_manager.canal_node_server
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_adapter_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_cluster
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_instance_config
MODIFY modified_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;
ALTER TABLE canal_manager.canal_user
MODIFY creation_date timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;