Canal + Kafka 实现 MySQL 的 binlog 近实时同步

背景

经过上篇文章的测试 Kafka Connect For MySQL 实现增量数据同步,因为研究时间较短,网上资源较少,只能自己一步一步去探索,在理论层面上是可以实现业务需求,但是实践过程中遇到一些原因,导致实际效果没有达到业务需求。所以将基于业务需求重新进行技术调研。因为笔者目前工作重心在于搭建一个小型的实时数仓。优先级比较高的一个任务就是需要近实时同步业务系统的数据(包括保存、更新或者软删除)到一个另一个数据源,持久化之前需要清洗数据并且构建一个相对合理的便于后续业务数据指标统计、数据指标计算等扩展功能的业务模型。基于当前团队的资源和能力,优先调研了Alibaba开源中间件Canal的使用。

这篇文章将测试一下如何快速地搭建一套Canal相关的组件。

关于 Canal

简介

下面的简介和下一节的原理均来自于Canal项目的README

canal[kə'næl],译意为水道/管道/沟渠,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trigger获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括:

数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务Cache刷新
带业务逻辑的增量数据处理

工作原理

MySQL主备复制原理:


  • MySQLMaster 实例将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看)
  • MySQLSlave 实例将master的 binary log events 拷贝到它的中继日志(relay log)
  • MySQLSlave 实例重放relay log中的事件,将数据变更反映它到自身的数据

Canal 的工作原理如下:

  • Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议
  • MySQL Master收到dump请求,开始推送binary log给Slave(即Canal)
  • Canal解析binary log对象(原始为byte流),并且可以通过连接器发送到对应的消息队列等中间件中

部署所需的中间件

搭建一套可以用的组件需要部署MySQLZookeeperKafkaCanal 四个中间件的实例,下面简单分析一下部署过程。选用的服务器系统是CentOS7。

MySQL 环境搭建

MySQL 安装

MySQL-5.7 的安装参考文章 Centos 7.3 安装 MySQL 5.7
MySQL-8.0 的安装参考文章 Centos 7.3 安装 MySQL 8.0

Mysql 开启 binlog

参考文章 MySQL 开启 binlog

Zookeeper 分布式环境搭建

参考文章 Zookeeper 分布式环境搭建

Kafka 分布式环境搭建

参考文章 Kafka 分布式环境搭建

安装和使用 Canal

终于到了主角登场,这里选用 Canal 的v1.1.4稳定发布版,只需要下载deployer模块:

mkdir /data/canal
cd /data/canal
# 这里注意一点,Github在国内被墙,下载速度极慢,可以先用其他下载工具下载完再上传到服务器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz

笔者下载这个安装包下载了很久,故将安装包放入百度云盘:(链接:https://pan.baidu.com/s/1ZXZXAEEhoLcnnhSwpUrZmg, 提取码:52me),以供大家下载

解压后的目录如下:

- bin   # 运维脚本
- conf  # 配置文件
  canal_local.properties  # canal本地配置,一般不需要动
  canal.properties        # canal服务配置
  logback.xml             # logback日志配置
  metrics                 # 度量统计配置
  spring                  # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
  example                 # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
    instance.properties   # 实例配置,一般指单个数据库的配置
- lib   # 服务依赖包
- logs  # 日志文件输出目录

在开发和测试环境建议把 logback.xml 的日志级别修改为DEBUG方便定位问题。这里需要关注 canal.propertiesinstance.properties 两个配置文件。 canal.properties 文件中,需要修改:

  • 去掉 canal.instance.parser.parallelThreadSize = 16 这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。
  • canal.serverMode 配置项指定为 kafka ,可选值有 tcpkafkarocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以选用 rabbitmq),默认是 kafka
  • canal.mq.servers 配置需要指定为 Kafka 服务或者集群 Broker 的地址,这里配置为 127.0.0.1:9092

canal.mq.servers在不同的canal.serverMode有不同的意义。

  • kafka模式下,指Kafka服务或者集群Broker的地址,也就是bootstrap.servers
  • rocketmq模式下,指NameServer列表
  • rabbitmq模式下,指RabbitMQ服务的Host和Port

其他配置项可以参考下面两个官方Wiki的链接:

instance.properties 一般指一个数据库实例的配置,Canal架构支持一个Canal服务实例,处理多个数据库实例的binlog异步解析。instance.properties 需要修改的配置项主要包括:

  • canal.instance.mysql.slaveId 需要配置一个和Master节点的服务ID完全不同的值,这里笔者配置为 654321
  • 配置数据源实例,包括地址、用户、密码和目标数据库:
    • canal.instance.master.address,这里指定为 127.0.0.1:3306
    • canal.instance.dbUsername,这里指定为canal。
    • canal.instance.dbPassword,这里指定为 QWqw12!@
    • 新增 canal.instance.defaultDatabaseName,这里指定为test(需要在MySQL中建立一个 test 数据库,见前面的流程)。
  • Kafka 相关配置,这里暂时使用静态 topic 和单个 partition
    • canal.mq.topic,这里指定为 test,也就是解析完的 binlog 结构化数据会发送到Kafka 的命名为 test 的topic中。
    • canal.mq.partition,这里指定为 0
      配置工作做好之后,可以启动 Canal 服务:
sh /data/canal/bin/startup.sh 
# 查看服务日志
tail -100f /data/canal/logs/canal/canal
# 查看实例日志  -- 一般情况下,关注实例日志即可
tail -100f /data/canal/logs/example/example.log

启动正常后,见实例日志如下:

test 数据库创建一个订单表,并且执行几个简单的DML:

USE test;

CREATE TABLE IF NOT EXISTS test.omneo(
    pid int(11) NOT NULL AUTO_INCREMENT,
    uuid varchar(100) NOT NULL,
    firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    birthdate varchar(20),
    postalcode varchar(20),
    city varchar(20),
    sexe varchar(20),
    status varchar(20),
    commenttime timestamp NOT NULL DEFAULT current_timestamp,
    PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 插入 4 条测试数据
insert into omneo values(1,"0049683542a7-5bdb-d564-3133-276ae3ce","Maurice","Samuel","01/11/1977","H2M2V5","Ballancourt","male","en couple","2020-05-09 11:01:54");
insert into omneo values(2,"8338623542a7-5bdb-d564-3133-276ae3ce","Gauthier","Garbuet","23/05/1965","05274","Cocagne","female","maried","2020-05-09 11:01:54");
insert into omneo values(3,"3374573542a7-5bdb-d564-3133-276ae3ce","Maurice","Samuel","01/11/1977","H0H0H0","Ottawa","male","en couple","2020-05-09 11:01:54");
insert into omneo values(4,"5494133542a7-5bdb-d564-3133-276ae3ce","Nicole","Garbuet","01/11/1977","H0H0H0","Maugio","unknown","single","2020-05-09 11:01:54");

# 更新测试数据
update omneo_incrementing_timestamp set firstname = "world" ,commenttime="2020-12-20 15:55:10" where pid in(2,4);
# 删除测试数据
delete from omneo where pid = 1;

具体的数据如下:

# 修改 `root`@`localhost` 密码操作
{"data":null,"database":"","es":1589265975000,"id":4,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER USER 'root'@'localhost' IDENTIFIED WITH 'caching_sha2_password' AS '$A$005$k>XD\\n6\\\"[hx\u0001Ocm/s\u00164\u007F\u00030iVZ3nTJnQORvohw7T4wWWQnSTz4zvFGfLPO3OxQ1m8'","sqlType":null,"table":"","ts":1589272027697,"type":"QUERY"}
# 创建 `test` 数据库
{"data":null,"database":"test","es":1589271839000,"id":4,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1589272027697,"type":"QUERY"}
# 创建 `test.omneo` 表
{"data":null,"database":"test","es":1589271993000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE IF NOT EXISTS test.omneo(\n    pid int(11) NOT NULL AUTO_INCREMENT,\n    uuid varchar(100) NOT NULL,\n    firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,\n    lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,\n    birthdate varchar(20),\n    postalcode varchar(20),\n    city varchar(20),\n    sexe varchar(20),\n    status varchar(20),\n    commenttime timestamp NOT NULL DEFAULT current_timestamp,\n    PRIMARY KEY (pid)\n)ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"omneo","ts":1589272027697,"type":"CREATE"}
# 插入第一条测试数据
{"data":[{"pid":"1","uuid":"0049683542a7-5bdb-d564-3133-276ae3ce","firstname":"Maurice","lastname":"Samuel","birthdate":"01/11/1977","postalcode":"H2M2V5","city":"Ballancourt","sexe":"male","status":"en couple","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272135000,"id":5,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272135125,"type":"INSERT"}
# 插入第二条测试数据
{"data":[{"pid":"2","uuid":"8338623542a7-5bdb-d564-3133-276ae3ce","firstname":"Gauthier","lastname":"Garbuet","birthdate":"23/05/1965","postalcode":"05274","city":"Cocagne","sexe":"female","status":"maried","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272136000,"id":6,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272136230,"type":"INSERT"}
# 插入第三条测试数据
{"data":[{"pid":"3","uuid":"3374573542a7-5bdb-d564-3133-276ae3ce","firstname":"Maurice","lastname":"Samuel","birthdate":"01/11/1977","postalcode":"H0H0H0","city":"Ottawa","sexe":"male","status":"en couple","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272156000,"id":7,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272156356,"type":"INSERT"}
# 插入第四条测试数据
{"data":[{"pid":"4","uuid":"5494133542a7-5bdb-d564-3133-276ae3ce","firstname":"Nicole","lastname":"Garbuet","birthdate":"01/11/1977","postalcode":"H0H0H0","city":"Maugio","sexe":"unknown","status":"single","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272156000,"id":8,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272157060,"type":"INSERT"}
# 修改 `pid = 2` and `pid = 4` 的测试数据
{"data":[{"pid":"2","uuid":"8338623542a7-5bdb-d564-3133-276ae3ce","firstname":"world","lastname":"Garbuet","birthdate":"23/05/1965","postalcode":"05274","city":"Cocagne","sexe":"female","status":"maried","commenttime":"2020-12-20 15:55:10"},{"pid":"4","uuid":"5494133542a7-5bdb-d564-3133-276ae3ce","firstname":"world","lastname":"Garbuet","birthdate":"01/11/1977","postalcode":"H0H0H0","city":"Maugio","sexe":"unknown","status":"single","commenttime":"2020-12-20 15:55:10"}],"database":"test","es":1589272181000,"id":9,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":[{"firstname":"Gauthier","commenttime":"2020-05-09 11:01:54"},{"firstname":"Nicole","commenttime":"2020-05-09 11:01:54"}],"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272181392,"type":"UPDATE"}
# 删除 `pid = 1` 的测试数据
{"data":[{"pid":"1","uuid":"0049683542a7-5bdb-d564-3133-276ae3ce","firstname":"Maurice","lastname":"Samuel","birthdate":"01/11/1977","postalcode":"H2M2V5","city":"Ballancourt","sexe":"male","status":"en couple","commenttime":"2020-05-09 11:01:54"}],"database":"test","es":1589272196000,"id":10,"isDdl":false,"mysqlType":{"pid":"int(11)","uuid":"varchar(100)","firstname":"varchar(20)","lastname":"varchar(20)","birthdate":"varchar(20)","postalcode":"varchar(20)","city":"varchar(20)","sexe":"varchar(20)","status":"varchar(20)","commenttime":"timestamp"},"old":null,"pkNames":["pid"],"sql":"","sqlType":{"pid":4,"uuid":12,"firstname":12,"lastname":12,"birthdate":12,"postalcode":12,"city":12,"sexe":12,"status":12,"commenttime":93},"table":"omneo","ts":1589272196114,"type":"DELETE"}

可见 Kafka 的名为 testtopic 已经写入了对应的结构化 binlog 事件数据,可以编写消费者监听 Kafka 对应的 topic 然后对获取到的数据进行后续处理。

总结

这篇文章大部分篇幅用于介绍其他中间件是怎么部署的,这个问题侧面说明了 Canal 本身部署并不复杂,它的配置文件属性项比较多,但是实际上需要自定义和改动的配置项是比较少的,也就是说明了它的运维成本和学习成本并不高。后面会分析基于结构化 binlog 事件做 ELT 和持久化相关工作以及Canal的生产环境可用级别HA集群的搭建。

参考资料:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342