1. 技术原理:Binlog 增量订阅
MySQL 数据库在执行任何增删改操作时,都会记录 Binlog(归档日志)。
Canal 会模拟成一个 MySQL 的“从库”(Slave),向主库发送 dump 协议。
主库收到请求后,将 Binlog 推送给 Canal。
Canal 解析这些二进制日志,将其转化成结构化的数据(如 JSON),告诉你:“store库的duser表,ID为10的行,名字从‘张三’改成了‘李四’。”
2. 架构设计
针对您的场景,建议采用 Canal + MQ (Kafka/RabbitMQ) 的架构,实现“一处变更,多处消费”:
源数据库 (Source):开启 Binlog(Row模式)。
Canal Server:负责解析 Binlog。
消息队列 (MQ):Canal 将解析后的变化发送到 MQ 的一个 Topic(主题)中。
-
适配器/消费者 (Consumers):
您可以编写一个通用的“同步程序”。
该程序订阅 MQ,每当有新消息,它就并行地将 SQL 执行到那 11 个目标数据库中。
3. 为什么Canal 比脚本好?
极低延迟:您现在的脚本可能是每小时同步一次,而 Canal 是毫秒级同步。入职一个新员工,所有系统瞬间可见。
极低压力:脚本每次都要全表 dump(即便数据没变),这会产生 IO 尖峰。Canal 只同步“变了的数据”,对网络和数据库几乎无压力。
-
解耦与可靠性:
如果其中一个目标库(比如 LIMS)宕机了,MQ 会保留这条消息。
等 LIMS 恢复后,同步程序会自动从上次断开的地方继续消费,数据绝不会丢失。
安全性:不再需要把所有库的密码写在一个脚本里。同步程序可以放在受限的内网环境,统一管理连接池。
4. 实施步骤(以 Canal 为例)
部署 Canal Server 主要分为三个核心步骤:MySQL 源库配置、Canal Server 安装 以及 实例参数配置
一、 MySQL 源库准备 (必须是源头库 10.3.26.1)
- 修改MySQL配置:源库配置 (my.cnf)
需要开启 Row 模式的 Binlog:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 每个实例 ID 必须唯一
- 创建 Canal 专用账号: 在源库执行 SQL,授予其从库权限:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
二、 下载并解压 Canal Server
Canal 运行在 Java 环境中,请确保服务器已安装 JDK 1.8+。
1.下载安装包(建议去 GitHub Release 页面下载最新稳定版):
mkdir /usr/local/canal
cd /usr/local/canal
# 下载(以 1.1.7 为例)
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
# 解压
tar -zxvf canal.deployer-1.1.7.tar.gz
三、 配置 Canal Server
Canal 有两层配置:canal.properties(全局配置)和 instance.properties(具体数据库实例配置)。
- 修改实例配置文件: 进入目录 conf/example/(example 是默认实例名,你可以改名): vi conf/example/instance.properties
# 源数据库地址
canal.instance.master.address= 10.3.26.1:13306
# 之前创建的数据库账号
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_password
canal.instance.connectionCharset=UTF-8
# 重点:配置过滤规则,只同步你脚本里的两张表
# 格式:数据库名\\.表名,多个用逗号隔开
canal.instance.filter.regex=store\\.duser,store\\.ddep
2.修改全局配置(可选): vi conf/canal.properties
canal.port: 默认 11111。
canal.serverMode: 默认 tcp。如果你要配合 Kafka 使用,改为 kafka。
四、 启动与验证
启动:
sh bin/startup.sh
查看日志:
查看 Server 日志:tail -f logs/canal/canal.log
查看实例日志:tail -f logs/example/example.log
如果看到 "instance : example/default/xxx ... started" 字样,表示启动成功。
五、 如何实现库的同步?
Canal Server 只是负责抓取数据,要同步到另外 11 个目标库,你有两种选择:
方案 A:使用 Canal-Adapter (推荐)
这是官方提供的配套工具,专门用于“同步到目标数据库”。
下载 canal.adapter。
在 conf/rdb/ 目录下创建 11 个 .yml 文件(每个目标库一个)。
配置映射关系,例如 duser -> duser。
Adapter 会自动把抓到的变更转化为 INSERT/UPDATE/DELETE 发送到各个库。
方案 B:编写简单的消费者 (Java/Python/Go)
编写一个程序连接 Canal Server (11111 端口)。
实时拉取变更消息。
程序内部写循环,同时连接 11 个库执行相应的 SQL。
六 Canal-Adapter配置
在 Canal-Adapter 的方案中,每一个目标数据库(RDB)对应一个 .yml 配置文件。如果你有 11 个目标库,你需要创建 11 个这样的文件放在 conf/rdb/ 目录下。
application.yml配置文件
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts: ""
syncBatchSize: 1000
retries: -1
timeout: 5000
accessKey: ""
secretKey: ""
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 192.168.37.152:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username: ""
canal.tcp.password: ""
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
#rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
#rocketmq.access.channel:
#rocketmq.subscribe.filter:
# rabbitMQ consumer
#rabbitmq.host:
#rabbitmq.virtual.host:
#rabbitmq.username:
#rabbitmq.password:
#rabbitmq.resource.ownerId:
# srcDataSources:
# defaultDS:
# url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
# username: root
# password: 121212
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: ds_gposm
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://11.1.1.111:13306/gposm?useUnicode=true&useSSL=false
jdbc.username: gpxft
jdbc.password: gpxft123
ds_gpxft.yml
dataSourceKey: ds_gposm
outerAdapterKey: ds_gposm
destination: example
groupId: g1
concurrent: true
dbMapping:
database: store
table: duser
targetTable: duser
targetPk:
pkid: pkid
还是要先文档,再问AI工具,要不很容易被AI错误的配置文件误导了.IT技术还是手册先行,AI辅助,两者的顺序不能颠倒了