今天工作不饱满,上班时间闲的无聊,决定学一下 canal。其实关于 canal 的教程在 https://github.com/alibaba/canal/wiki 这里写的挺清楚的,基本上照着步骤搞就完事了。我写这个文章的目的主要是给自己做个笔记~
1. 准备 canal server
- 官方下载 canal server,即 https://github.com/alibaba/canal/releases/tag/canal-1.1.4 里面的 canal.deployer-1.1.4.tar.gz 文件
- 解压到指定目录
mkdir canal.deployer-1.1.4
tar -xvzf canal.deployer-1.1.4.tar.gz -C canal.deployer-1.1.4
2. 配置 canal server
- 因为原始的 bin/restart.sh 有 bug,只能在 bin 目录下执行才可以,所以修改了一下,修改后完整文件如下
#!/bin/bash
args=$@
SCRIPT_DIR=$(cd "$(dirname "$0")"; pwd)
sh $SCRIPT_DIR/stop.sh $args
sh $SCRIPT_DIR/startup.sh $args
- conf/canal.properties 文件是 canal server 的基础配置文件,其中有一段代码如下,各项配置的作用如注释所说
#################################################
######### destinations #############
#################################################
# 这里定义了 canal server 启动的时候要添加的 instance 名称,默认是 example
canal.destinations = example
# 这里定义了 canal server 查找 instance 配置文件的根路径。
# 举个例子,假如前面配置了 example instance, 那么 canal server 会查找 ../conf/example/instance.properties 文件
canal.conf.dir = ../conf
# 这里控制着 canal server 是否在运行过程中自动扫描 canal.conf.dir 目录以动态添加或删除 instance,默认打开,扫描时间间隔 5s
canal.auto.scan = true
canal.auto.scan.interval = 5
- 因为我不想使用 example instance, 所以我将 conf/canal.properties 文件修改如下
# 指定 instance 为 content
canal.destinations = content
# 关闭自动扫描 instance, 这样就可以忽略掉 example instance 了
canal.auto.scan = false
- 为自定义的 content instance 创建配置文件 conf/content/instance.properties
canal.instance.mysql.slaveId = 1234
canal.instance.master.address = 127.0.0.1:3307
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
canal.instance.defaultDatabaseName = content
canal.instance.connectionCharset = UTF-8
# 这里官方文档写的是 canal.instance.filter.regex = .\*\\\\..\*
# 在 TCP 模式下两者没有任何区别,但是在 kafka 模式下只有下面这种写法生效
# 测试的过程中发现了这种现象,但还没有查清具体原因
canal.instance.filter.regex = .*\\..*
3. 启动 canal server
canal server 启动过程中的关键信息如下:
- 确定 binlog first position
(1) 先从 conf/content/meta.dat 文件中查找 last position, 也就是最后一次成功 dump binlog 的位点
(2) 如果不存在 last position, 则从 conf/content/instance.properties 配置文件中查找 initial position, 这是我们人为配置的初始化位点
(3) 如果不存在 initial position, 则执行 show master status 命令获取 mysql binlog lastest position
通过以上三步就可以确定 canal server 启动之后 binlog 初始位点 - 将 first position 赋值给 last position 保存在内存中
- 将 schema 缓存到 conf/content/h2.mv.db 文件中
4. 启动 canal client
- canal client 的 java demo 可以去官方 GitHub 上找一下,记得将 destination 等配置信息改正确。请参考 https://github.com/alibaba/canal/wiki/ClientExample
- canal client connect
- canal client describe
(1) 在收到客户端订阅请求之后,logs/content/content.log 文件会打印出相关日志
(2) conf/content/meta.dat 文件记录了客户端的订阅信息,包括 clientId, destination, filter 等 - canal client getWithoutAck
(1) canal server 在收到 canal client 查询请求之后,以内存中的 last position 作为参数向 mysql server 发送 dump 请求
(2) 如果存在比 last position 更新的 binlog, canal server 会收到 mysql server 的返回数据,然后将其转换为 Message 数据结构返回给 canal client - canal client ack
canal server 在收到 canal client 确认请求之后,更新内存中的 last position 并同步保存到 conf/content/meta.dat 文件中,在 logs/content/meta.log 文件中打印日志
5. 补充
- 因为在 TCP 模式下,一个 instance 只能有一个 canal client 订阅,即使同时有多个 canal client 订阅相同的 instance, 也只会有一个 canal client 成功获取 binlog, 所以 canal server 写死 clientId = 1001. 也正是因为一个 instance 只有一个 canal client, 所以 canal server 将 binlog 位点信息维护在了 instance 级别,即 conf/content/meta.dat 文件中
- 在 TCP 模式下,如果 canal client 想重新获取以前的 binlog,只能通过修改 canal server 的 initial position 配置并重启服务来达到目的
- 在 TCP 模式下 canal server 主要提供了两个功能
(1) 维护 mysql binlog position 信息,目的是作为 dump 的请求参数,这也是 canal server 唯一保存的数据
(2) 对客户端提供接口以查询 binlog