一、背景
MySQL作为最流行的关系型数据库产品之一,当数据规模增大遭遇性能瓶颈时,最容易想到的解决方案就是分库分表。无论是进行水平拆分还是垂直拆分,第一步必然需要数据迁移与同步。由此可以衍生出一系列数据迁移过程中的需求:
1.原本一张表迁移到单库多表(或多库多表),这是最基本的需求;
2.原本单库多表(或多库多表)迁移到新的多库多表(因表设计不合理、数据规模增大等原因导致需要再次分库分表)
3.新表与旧表的表结构可能不一致,如:类型表更(自增主键id由int改为bigint)、字段数量不一致(删减、增加)、字段名称变更等
4.字段映射,如:旧表中的多个字段映射为新表的一个字段,或旧表中的一个字段映射为新表的多个字段
5.增量数据的实时同步,以及当涉及表结构转换时增量部分(binlog)如何方便地实现同样的转换
6.如何支持垂直拆分的数据迁移
7.MySQL到NewSQL的迁移(如:TiDB、CockroachDB)
8.异构数据源的实时迁移,如:MySQL到HBase、MongoDB(关于异构数据源的实时同步设计不在本文内容范围,后续将专题介绍)
9.迁移前后的数据一致性校验
二、设计
为满足以上需求,下面将从全量迁移和增量同步两部分来说明MySQL数据迁移同步工具的设计与实现。
2.1 全量迁移
mysqldump是MySQL官方自带的数据备份工具,也可以用于数据迁移,但不足之处是单线程处理,迁移大表时速度极慢,并且不支持写入分库分表。因此开源社区还开发了一个多线程的类似工具mydumper,性能有不少提升,但同样不支持写入分库分表,也不支持字段的转换。
接下来介绍下快速分片并行读取MySQL表数据的做法:
1、 自动查找表的主键pk;
2、 查询主键的最大值及最小值:max(pk),min(pk);
3、 对主键范围分片,每个分片跨度1万(即最多读取1万行数据),由此即可将整张表的查询分成多个查询分片:
第1个分片查询条件为pk >= min(pk) AND pk < min(pk)+10000
第2个分片查询条件为pk>= min(pk)+10000 AND pk < min(pk)+20000
第3个分片查询条件为pk >= min(pk)+20000 AND pk < min(pk)+30000
以此类推。
以上分片查询除了可以并行读取之外,另外一个优势是失败可恢复,某个分片查询失败并不影响整体查询的进度,只需失败重试即可。当然也可以将所有分片持久化,即使程序异常退出,重启后也可以恢复,避免重新查询全表数据。
2.2 增量同步
增量数据的读取基于MySQL的binlog主从复制。在全量迁移之前首先获取当前MySQL的位点信息(FileName、Position),以便全量数据迁移完成之后从该位点继续重放binlog。
三、实现
3.1 全量迁移
基于RxJava的观察者(或生产者消费者)模式实现链式最大化并行处理:多张表并行生成查询分片(Query Split),然后由Source并行执行查询分片从MySQL中读取数据,然后统一由Sink Selector根据分库分表的sharding字段及规则计算出每行数据所属的slot(即应该写入到哪张分表),当一个slot中的数据积累到一个batch size时会生成一个插入分片(Insert Split),最终由Sink并行地批量写入对应的目标表中。
为了避免累积的数据过多造成GC压力,slot超过一定时间后即使没有累积到一个batch size也会生成Insert Split分发给Sink执行写入。此外还要考虑另外一个问题:当生产者生产过快导致消费者来不及处理时,将会导致事件堆积,严重时还会OOM,即所谓的背压(Backpressure)。幸好RxJava作为一个成熟的Reactive框架已经对背压处理有很好的支持,这也是为什么要基于RxJava来实现的重要原因之一。
3.2 增量同步
binlog的抽取使用了开源的Java类库mysql-binlog-connector-java,与Canal相比更加轻量,源码清晰易懂,不依赖其他第三方jar包,也没有那么多不需要的繁杂功能。
为了实现对binlog的字段转换,采用了Apache开源的SQL引擎calcite来实现:将binlog的每行数据根据原表的表结构映射为一张内存表,然后由calcite执行SQL转换后输出结果。(PS:calcite当前已被多个开源项目采用,Hive用calcite优化查询,Flink的Streaming SQL基于calcite实现,Kylin的查询引擎也采用calcite)
3.3 数据校验
因MySQL表的checksum与数据的行顺序无关,当新表与旧表的表结构相同并且数据不需要转换时采用执行CHECKSUM TABLE tbl_name查询语句获取每张新表和旧表的checksum,然后分别求和对比最终的checksum是否相同以此校验数据是否一致。
当新表与旧表存在字段类型变更、字段数量不一致、数据经过转换等会导致checksum发生变化时,采用排除有关字段,由迁移工具内部只对剩余字段数据进行checksum计算。Checksum算法可以选择CRC32或Adler32,这两种算法均采用Java自带的实现类,默认情况下使用Adler32因为其具有更快的计算效率。
四、总结
无论是分库分表常规方案的实施,还是未来新一代分布式关系型数据存储NewSQL的落地实践,数据的迁移与同步都是必不可少的重要环节。毕竟,快速、准确、平滑地完成数据迁移,便已成功了一半。