什么是Canal
Canal 是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析的数据同步工具,主要功能是提供增量数据订阅和消费
Canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)

image.png
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
-
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
46ec982fc5daaa5ed75c3cc10124efb7.png
实现流程
一、MySQL 配置
1、修改 MySQL 配置文件
MySQL的配置文件名称取决于操作系统:在Windows系统中是my.ini,而在Linux、macOS等类Unix系统中是my.cnf
- Windows系统:通常位于 C:\ProgramData\MySQL\MySQL Server 8.0\my.ini(路径可能因版本不同而变化)
- Linux系统:常见路径包括 /etc/my.cnf 或 /etc/mysql/my.cnf
以下参数可能已存在,存在则不需配置,一般只需要配置 binlog-format=ROW
[mysqld]
# 开启 binlog 日志功能,用于记录数据库的所有变更操作
# 注意:在较新版本的 MySQL 中默认可能已开启,但文件名可能不是 mysql-bin
log-bin=mysql-bin
# 设置 binlog 格式为 ROW 模式,记录每一行数据的具体变化
# 注意:默认格式通常是 STATEMENT 或 MIXED,需要显式配置为 ROW
binlog-format=ROW
# 配置 MySQL 服务器 ID,用于主从复制标识
# 注意:需保证唯一性,且不能与 canal 客户端的 slaveId 冲突
server_id=1
修改配置后需要重启MySQL服务!!!
检查配置是否成功
SHOW VARIABLES LIKE 'log_bin'; -- 必须 ON
SHOW VARIABLES LIKE 'binlog_format'; -- 必须 ROW
2、新增MySQL账号密码
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
Canal 配置文件中默认MySQL授权的账号密码是 canal/canal
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
二、Canal 配置
1、下载Canal工具 ,点击跳转即可
点击下载地址,选择版本后点击canal.deployer文件下载,下载完后解压到你自定义的canal目录中即可

image.png
命令方式下载:
mkdir -p /opt/canal
cd /opt/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz
tar -zxvf canal.deployer-1.1.8.tar.gz
Canal有两个重要的配置文件:
// instance.properties中的配置项优先级高于canal.properties中的全局配置
// 而通过代码API传入的参数具有最高的优先级,会直接覆盖配置文件中的值
canal/conf/example/instance.properties
canal/conf/canal.properties
canal.properties 配置文件一般保持不变即可
# canal端口
canal.port=11111
# canal实例名称
canal.destinations=example
conf/example/instance.properties配置文件修改:
# mysql serverId,不要和 mysql 的 server_id 重复
canal.instance.mysql.slaveId=10
# master address,需要改成自己的数据库IP端口
canal.instance.master.address=127.0.0.1:3306
# username/password,需要改成自己的数据库账号密码
# Canal进行连接时有传递数据库账号密码的,可以不用管此配置
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
2、启动和关闭
#进入文件目录下的bin文件夹
#启动(window环境下直接点击 startup.bat 脚本即可启动)
sh startup.sh
#关闭
sh stop.sh
三、Springboot集成Canal
1、引入Canal依赖
<!-- canal客户端依赖,用于连接和消费MySQL binlog数据 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.8</version>
</dependency>
<!-- canal协议依赖,提供binlog解析和传输协议支持 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.8</version>
</dependency>
2、在 application.properties 配置文件中增加Canal配置参数
# ======================
# 配置canal
# ======================
# Canal服务器地址
canal.instance.host=127.0.0.1
# Canal服务器端口号
canal.instance.port=11111
# Canal实例名称,需要与Canal Server中的配置保持一致
canal.instance.destination=example
# Canal服务器用户名,用于连接数据库
canal.instance.dbUsername=canal
# Canal服务器密码,用于连接数据库
canal.instance.dbPassword=canal
# Canal客户端每次获取数据的批量大小,默认值为1000
canal.instance.batchSize=1000
# Canal客户端重连最大重试次数
canal.instance.maxRetries=5
# Canal客户端每次重连的时间间隔(毫秒)
canal.instance.retryInterval=60000
# canal处理FileInfo的数据库
canal.process.fileInfo.databaseName=point_general
# canal处理FileInfo的表名
canal.process.fileInfo.tableName=file_info
3、Canal 客户端实现
@Slf4j
@Component
public class CanalClient {
@Value("${canal.instance.host:127.0.0.1}")
private String host;
@Value("${canal.instance.port:11111}")
private int port;
@Value("${canal.instance.destination:example}")
private String destination;
@Value("${canal.instance.dbUsername:canal}")
private String dbUsername;
@Value("${canal.instance.dbPassword:canal}")
private String dbPassword;
@Value("${canal.instance.batchSize:1000}")
private int batchSize;
@Value("${canal.instance.maxRetries:5}")
private int maxRetries;
@Value("${canal.instance.retryInterval:60000}")
private int retryInterval;
@Value("${canal.process.fileInfo.databaseName:point_general}")
private String fileInfoDatabaseName;
@Value("${canal.process.fileInfo.tableName:file_info}")
private String fileInfoTableName;
@Resource
private FileInfoService fileInfoService;
private volatile boolean running = false;
private Thread canalThread;
@PostConstruct
public void start() {
log.info("创建并启动Canal客户端线程");
running = true;
canalThread = new Thread(this::run);
canalThread.setName("canal-client-thread");
canalThread.setDaemon(false);
canalThread.start();
}
public void stop() {
log.info("停止Canal客户端线程");
running = false;
if (canalThread != null) {
canalThread.interrupt();
}
}
@PreDestroy
public void destroy() {
stop();
}
/**
* 运行Canal客户端
*/
public void run() {
CanalConnector connector = null;
try {
// 创建连接器
connector = reconnect();
log.info("Canal客户端连接成功!!!");
// 判断是否运行
while (running && !Thread.currentThread().isInterrupted()) {
Message message = connector.getWithoutAck(batchSize);
long messageId = message.getId();
int size = message.getEntries().size();
if (messageId == -1 || size == 0) {
// 未获取到数据,短暂休眠以降低轮询频率
Thread.sleep(1000);
continue;
}
// 处理数据变更事件
processMysqlDataChange(message.getEntries());
// 确认处理完成
connector.ack(messageId);
}
} catch (InterruptedException e) {
log.info("Canal客户端线程被中断");
} catch (Exception e) {
log.error("Canal客户端线程异常", e);
} finally {
running = retryWithLimit(maxRetries, retryInterval, this::reconnect);
if (!running && connector != null) {
connector.disconnect();
}
}
}
/**
* Canal连接服务
*/
private CanalConnector reconnect() {
// 创建连接器 参数: 服务器地址、实例名称、数据库用户名、数据库密码
// 此数据库账号密码会覆盖掉instance.properties配置文件中的canal.instance.dbUsername和canal.instance.dbPassword的值
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(host, port), destination, dbUsername, dbPassword);
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
return connector;
}
/**
* Canal客户端重连操作
*
* @param maxRetries 最大重试次数
* @param retryInterval 重试间隔(毫秒)
* @param operation 需要执行的操作
* @return 是否执行成功
*/
public boolean retryWithLimit(int maxRetries, long retryInterval, Runnable operation) {
int retryCount = 0;
while (retryCount < maxRetries) {
try {
operation.run();
log.info("Canal客户端重连成功!!!");
return true; // 执行成功,返回
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
log.error("重试{}次后仍失败", retryCount, e);
return false;
}
try {
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
}
}
}
return false;
}
/**
* 处理MySQL数据变更事件
*
* @param entries 从Canal获取的数据变更事件列表
*/
public void processMysqlDataChange(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
try {
// 跳过事务开始、结束、心跳、GTID事件
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND ||
entry.getEntryType() == CanalEntry.EntryType.HEARTBEAT ||
entry.getEntryType() == CanalEntry.EntryType.GTIDLOG) {
continue;
}
String databaseName = entry.getHeader().getSchemaName();
if (StringUtils.isBlank(databaseName)) {
log.error("数据变更事件中未获取到数据库名称");
continue;
}
String tableName = entry.getHeader().getTableName();
if (StringUtils.isBlank(tableName)) {
log.error("数据变更事件中未获取到表名称");
continue;
}
// 打印基本信息
log.info("数据库: {}, 表名: {}, 操作类型:{}", databaseName, tableName, entry.getEntryType());
// 处理数据行, 类型判断是防止官方后续增加其他类型
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 处理文件上传数据同步到ES
if (Objects.equals(databaseName, fileInfoDatabaseName) && Objects.equals(tableName, fileInfoTableName)) {
try {
fileInfoService.syncToFileInfoEs(entry);
} catch (Exception e) {
log.error("同步 file info to ES failed, entry:{}", JSONUtil.toJsonStr(entry), e);
// 可以考虑实现重试机制或者发送告警
}
}
}
} catch (Exception e) {
log.error("处理单个entry时发生异常: {}", JSONUtil.toJsonStr(entry), e);
// 继续处理下一个entry,避免影响整个批次
}
}
}
}
public interface FileInfoService extends IService<FileInfo> {
/**
* 同步文件信息到Elasticsearch
*
* @param entry CanalEntry对象,包含文件信息
*/
void syncToFileInfoEs(CanalEntry.Entry entry) throws Exception;
}
@Service
public class FileInfoServiceImpl implements FileInfoService {
@Resource
private ElasticsearchRestTemplate elasticsearchRestTemplate;
@Override
public void syncToFileInfoEs(CanalEntry.Entry entry) throws Exception {
ByteString storeValue = entry.getStoreValue();
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.INSERT) {
List<FileInfo> fileInfos = new ArrayList<>();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
fileInfos.add(createFileInfo(rowData.getAfterColumnsList()));
}
elasticsearchRestTemplate.save(fileInfos);
} else if (eventType == CanalEntry.EventType.UPDATE) {
List<FileInfo> afterFileInfoList = new ArrayList<>();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
afterFileInfoList.add(createFileInfo(rowData.getAfterColumnsList()));
}
elasticsearchRestTemplate.save(afterFileInfoList);
} else if (eventType == CanalEntry.EventType.DELETE) {
List<String> idList = new ArrayList<>();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if ("id".equals(column.getName())) {
idList.add(column.getValue());
break;
}
}
}
if (!idList.isEmpty()) {
for (String id : idList) {
elasticsearchRestTemplate.delete(id, FileInfo.class);
}
}
}
}
private FileInfo createFileInfo(List<CanalEntry.Column> columnList) {
FileInfo fileInfo = new FileInfo();
for (CanalEntry.Column column : columnList) {
String name = column.getName();
String value = column.getValue();
switch (column.getName()) {
case FileInfoIndexField.ID:
fileInfo.setId(Long.valueOf(column.getValue()));
break;
case FileInfoIndexField.FILE_NAME:
fileInfo.setFileName(column.getValue());
break;
// 其他字段处理...
default:
break;
}
}
return fileInfo;
}
}
四、测试验证, 亲测有效

image.png

image.png
