基于Canal的实时数据同步

1. 技术原理:Binlog 增量订阅

MySQL 数据库在执行任何增删改操作时,都会记录 Binlog(归档日志)。

  • Canal 会模拟成一个 MySQL 的“从库”(Slave),向主库发送 dump 协议。

  • 主库收到请求后,将 Binlog 推送给 Canal。

  • Canal 解析这些二进制日志,将其转化成结构化的数据(如 JSON),告诉你:“store库的duser表,ID为10的行,名字从‘张三’改成了‘李四’。”

2. 架构设计

针对您的场景,建议采用 Canal + MQ (Kafka/RabbitMQ) 的架构,实现“一处变更,多处消费”:

  • 源数据库 (Source):开启 Binlog(Row模式)。

  • Canal Server:负责解析 Binlog。

  • 消息队列 (MQ):Canal 将解析后的变化发送到 MQ 的一个 Topic(主题)中。

  • 适配器/消费者 (Consumers):

    • 您可以编写一个通用的“同步程序”。

    • 该程序订阅 MQ,每当有新消息,它就并行地将 SQL 执行到那 11 个目标数据库中。

3. 为什么Canal 比脚本好?

  • 极低延迟:您现在的脚本可能是每小时同步一次,而 Canal 是毫秒级同步。入职一个新员工,所有系统瞬间可见。

  • 极低压力:脚本每次都要全表 dump(即便数据没变),这会产生 IO 尖峰。Canal 只同步“变了的数据”,对网络和数据库几乎无压力。

  • 解耦与可靠性:

    • 如果其中一个目标库(比如 LIMS)宕机了,MQ 会保留这条消息。

    • 等 LIMS 恢复后,同步程序会自动从上次断开的地方继续消费,数据绝不会丢失。

  • 安全性:不再需要把所有库的密码写在一个脚本里。同步程序可以放在受限的内网环境,统一管理连接池。

4. 实施步骤(以 Canal 为例)

部署 Canal Server 主要分为三个核心步骤:MySQL 源库配置、Canal Server 安装 以及 实例参数配置

一、 MySQL 源库准备 (必须是源头库 10.3.26.1)

  1. 修改MySQL配置:源库配置 (my.cnf)
    需要开启 Row 模式的 Binlog:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1       # 每个实例 ID 必须唯一
  1. 创建 Canal 专用账号: 在源库执行 SQL,授予其从库权限:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

二、 下载并解压 Canal Server

Canal 运行在 Java 环境中,请确保服务器已安装 JDK 1.8+。

1.下载安装包(建议去 GitHub Release 页面下载最新稳定版):

mkdir /usr/local/canal
cd /usr/local/canal
# 下载(以 1.1.7 为例)
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
# 解压
tar -zxvf canal.deployer-1.1.7.tar.gz

三、 配置 Canal Server

Canal 有两层配置:canal.properties(全局配置)和 instance.properties(具体数据库实例配置)。

  1. 修改实例配置文件: 进入目录 conf/example/(example 是默认实例名,你可以改名): vi conf/example/instance.properties

# 源数据库地址
canal.instance.master.address= 10.3.26.1:13306

# 之前创建的数据库账号
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_password
canal.instance.connectionCharset=UTF-8

# 重点:配置过滤规则,只同步你脚本里的两张表
# 格式:数据库名\\.表名,多个用逗号隔开
canal.instance.filter.regex=store\\.duser,store\\.ddep

2.修改全局配置(可选): vi conf/canal.properties

  • canal.port: 默认 11111。

  • canal.serverMode: 默认 tcp。如果你要配合 Kafka 使用,改为 kafka。

四、 启动与验证

启动:


sh bin/startup.sh

查看日志:

查看 Server 日志:tail -f logs/canal/canal.log

查看实例日志:tail -f logs/example/example.log

如果看到 "instance : example/default/xxx ... started" 字样,表示启动成功。

五、 如何实现库的同步?

Canal Server 只是负责抓取数据,要同步到另外 11 个目标库,你有两种选择:

方案 A:使用 Canal-Adapter (推荐)
这是官方提供的配套工具,专门用于“同步到目标数据库”。

下载 canal.adapter。

在 conf/rdb/ 目录下创建 11 个 .yml 文件(每个目标库一个)。

配置映射关系,例如 duser -> duser。

Adapter 会自动把抓到的变更转化为 INSERT/UPDATE/DELETE 发送到各个库。

方案 B:编写简单的消费者 (Java/Python/Go)
编写一个程序连接 Canal Server (11111 端口)。

实时拉取变更消息。

程序内部写循环,同时连接 11 个库执行相应的 SQL。

六 Canal-Adapter配置

在 Canal-Adapter 的方案中,每一个目标数据库(RDB)对应一个 .yml 配置文件。如果你有 11 个目标库,你需要创建 11 个这样的文件放在 conf/rdb/ 目录下。
application.yml配置文件

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts: ""
  syncBatchSize: 1000
  retries: -1
  timeout: 5000
  accessKey: ""
  secretKey: ""
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 192.168.37.152:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username: ""
    canal.tcp.password: ""
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer

    #rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    #rocketmq.access.channel:
    #rocketmq.subscribe.filter:
    # rabbitMQ consumer
    #rabbitmq.host:
    #rabbitmq.virtual.host:
    #rabbitmq.username:
    #rabbitmq.password:
    #rabbitmq.resource.ownerId:

#  srcDataSources:
#    defaultDS:
#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
#      username: root
#      password: 121212



  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: rdb
        key: ds_gposm
        properties:
           jdbc.driverClassName: com.mysql.jdbc.Driver
           jdbc.url: jdbc:mysql://11.1.1.111:13306/gposm?useUnicode=true&useSSL=false
           jdbc.username: gpxft
           jdbc.password: gpxft123

ds_gpxft.yml

dataSourceKey: ds_gposm
outerAdapterKey: ds_gposm
destination: example
groupId: g1
concurrent: true
dbMapping:
  database: store  
  table: duser
  targetTable: duser
  targetPk:
    pkid: pkid 

还是要先文档,再问AI工具,要不很容易被AI错误的配置文件误导了.IT技术还是手册先行,AI辅助,两者的顺序不能颠倒了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容