靠嘴说出来的方案肯定会出问题的,如果没有出问题,只能感谢上帝了!
背景
公司的其中一个业务库的一个业务表,由于之前没有预见这么大的数据量,只是简单分成了32个分表。现在由于业务的快速扩展,导致目前单表的数据量基本都已经是800w以上,最多的一张表已经到达了1000w,导致的直接结果就是相关业务的查询越来越慢(当然我们这里不去深究查询方式、索引、磁盘等因素,我个人是觉得,任何的数据库调优技巧在绝对数据面前都是瞎扯淡)。所以需要对这些分表进行水平扩容。
分析
提到数据库表水平扩容,我想最简单的肯定是停服扩容。只要在一个深夜,将这个服务彻底下线,然后后台进行数据迁移工作;等迁移结束,然后更换分库分表路由方式,重新上线,然后一边喝着咖啡一边看着Kibana上的日志。不得不说,这种简单无脑易上手的做法确实很直接,但是缺点也很明显,万一有部分用户可能要着急使用服务,而开发却傲娇得要死地说,“系统正在更新升级,暂且不可用”,这搁谁都不会不爽——有没有搞错,我是付了钱的,我可是上帝!所以,能够尽可能平滑水平扩容,才是我们这些开发,或者说是服务提供者想要追求的效果。
那么我们总结下,我们想要的效果是:在单库多表的情况下,尽可能平滑水平扩容。并且我可以定义一下角色服务:
- 数据业务服务
- 数据迁移服务
- 数据检查服务
双写
在库表水平扩容方面,很多人会采用双写的方案。在这个需求当中,我之所以采用双写,是因为这个业务背景只涉及单库多表这样的简单的模型,所以采用双写会稍微简单点。相比于上一篇文章当中的场景,最大的因素在于是否跨库。
当跨库的时候,MySQL的事务机制就无法得到保障,就不得不依赖分布式事务组件。那些口嗨双写解决问题的人,要么是自己公司已经提供了分布式事务基础组件,要么就是根本不考虑这方面。而且引入分布式事务组件,例如seats等,那么势必会增加程序的复杂度以及维护难度等,所以对于细心的小伙伴来说,我们是不是应该好好思考这些问题。
双写的时候,假如不考虑分布式事务,那么当数据不一致的时候,就只能依赖数据校准程序,但是这个就不能保证实时性。此外,还有很多方面因素需要考虑,这也正是这篇文章想跟大家分享的部分!
实验
看过我之前文章的小伙伴肯定知道,我向来很保护公司的业务及代码,所以这里我又只能用简单的模型进行模拟,各位也可以在自己的机子上进行实操。由于本次分享重点是讲述水平扩容方案,所以这里就不再引入ShardingSphere或者MyCat等分库分表方案,而是采用简单的取模路由。另外,这里模拟1000个用户,用户ID的范围为[0, 999]。
库表
既然是库表迁移,那么我们当然需要一个业务表啦,我这里就取大家经常见的order表吧。我们假设系统中原先只有order_0, order_1两张表,现在我们希望扩容到原先的两倍,变成4张分表。
CREATE TABLE `order_x` (
`id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`user_id` int(11) DEFAULT NULL COMMENT '用户ID',
`item` varchar(100) DEFAULT NULL COMMENT '商品名称',
`count` int(11) DEFAULT NULL COMMENT '商品数量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
order表里面就只有简单的四个字段,看注释就可以明白。由于是单库,所以只要考虑到分表就可以,这里我们的路由策略是根据用户ID进行取模路由。接下来我们可以先往这order_0, order_1两张表当中插入一些数据。
@RequestMapping(value = "/insertData", method = RequestMethod.GET)
public Object insertData() {
final String sql0 = Constants.insertMap.get(0);
final String sql1 = Constants.insertMap.get(1);
List<Object[]> params0 = Lists.newArrayListWithExpectedSize(1000);
List<Object[]> params1 = Lists.newArrayListWithExpectedSize(1000);
int index = 1;
for (int i = 0; i < 100000; i++) {
long id = idWorker.nextId();
String userId = chooseUser();
int userIdInt = Integer.parseInt(userId);
String item = RandomStringUtils.random(2, true, false);
Object[] param = new Object[]{id, userIdInt, item, 1};
if ((userIdInt & 1) == 0) {
params0.add(param);
} else {
params1.add(param);
}
if (i % 2000 == 0) {
jdbcTemplate.batchUpdate(sql0, params0);
jdbcTemplate.batchUpdate(sql1, params1);
params0.clear();
params1.clear();
log.info("第" + index++ + "次数据插入");
}
}
return "SUCCESS";
}
这里生成的ID采用的是雪花算法。(amazing,不会还有人不知道在分表的情况下是不建议MySQL主键自增吧!)
数据检查服务
既然我们已经生成好了上面的这些数据,那么我们就先做最简单的工作吧。关于这一块,其实真的没什么好讲的。我在上一篇文章里面也提到,因为这个涉及到各自的业务情况,所以数据如何检查就要视具体情况而定,比如银行系统就要关心用户的余额是否是正确的,仓储系统就要关心商品的库存是否是正确的。大不了,那就一条一条,一个字段一个字段地进行对比。这里的话,我就查询每个用户下有多少订单和总共涉及到的商品数量当做检查。
/**
* 检查工作
* 主要是为了检查两阶段的结果
* 1\. 迁移工作造成的数据差异(迁移工作应该要保证这一部分不要出错)
* 2\. 迁移工作之后正常流量的双写造成的数据差异
* <p>
* 此外,检查工作不是一次性任务,而是需要定时地进行查询
* 这里为了简便,以查询每个用户下有多少订单和总共涉及到的商品数量当做检查
* 每一个场景需要具体深入判断,这个视观众具体业务场景判断
*/
private void doCheckWork() {
Map<Integer, String> map1 = Maps.newHashMap();
map1.put(0, "select count(*) from order_0 where user_id = ?");
map1.put(1, "select count(*) from order_1 where user_id = ?");
map1.put(2, "select count(*) from order_2 where user_id = ?");
map1.put(3, "select count(*) from order_3 where user_id = ?");
Map<Integer, String> map2 = Maps.newHashMap();
map2.put(0, "select sum(count) from order_0 where user_id = ?");
map2.put(1, "select sum(count) from order_1 where user_id = ?");
map2.put(2, "select sum(count) from order_2 where user_id = ?");
map2.put(3, "select sum(count) from order_3 where user_id = ?");
// 由于测试数据是模拟0-1000这些用户的,所以这里就当做所有用户进行检查工作
int pass = 0;
int fail = 0;
for (int i = 0; i < 1000; i++) {
int from = i % 2;
int to = i % 4;
if (from == to) {
pass++;
continue;
}
Integer fromCount = jdbcTemplate.queryForObject(map1.get(from), new Object[]{i}, Integer.class);
Integer toCount = jdbcTemplate.queryForObject(map1.get(to), new Object[]{i}, Integer.class);
Integer fromSum = jdbcTemplate.queryForObject(map2.get(from), new Object[]{i}, Integer.class);
Integer toSum = jdbcTemplate.queryForObject(map2.get(to), new Object[]{i}, Integer.class);
if (fromCount.equals(toCount) && fromSum.equals(toSum)) {
log.info("用户ID:" + i + "检查通过, 原表数量: " + fromCount + ", 新表数量: " + toCount + ", 原表商品总数: " + fromSum + ", 新表商品总数: " + toSum);
pass++;
} else {
log.error("用户ID:" + i + "检查不通过,原表数量: " + fromCount + ", 新表数量: " + toCount + ", 原表商品总数: " + fromSum + ", 新表商品总数: " + toSum);
fail++;
}
}
System.out.println("检查通过: " + pass + ",检查失败: " + fail);
}
检查工作的目的是为了两个阶段的数据,一是当迁移工作结束之后的数据检查,二是由双写过程中的数据检查,所以这个工作是一个持续性的,而不是一次性的。
流程图
好吧,我觉得我还是要简单画一下流程图,否则就显得不那么专业,而且……(我知道你们都是只想看图不想看文字的一群人,哈哈,因为我也是_)
首先是数据业务服务的流程图
接下来是迁移数据服务的流程图
看到这两个图,再比较上一篇文章当中的两个图,大家可以发现,在这里的双写方案,好像也还是有点麻烦。
数据业务服务
在数据业务服务这一部分,因为我们需要保障在数据迁移过程当中还是可以保证服务对外,所以这个时候是有可能接受新的SQL,也就是我们常说的增删改查这四类的SQL操作。我们分别来分析下这四类SQL操作:
- 查询操作:由于查询操作不会对数据库表产生数据的变动,所以这一类的操作可以忽略。
- 新增操作:新增的order记录,由于是双写,所以会在旧表新表都会产生一条记录。只是需要注意的是,当新的记录插入到新表之后,数据迁移服务这个时候会处于两种状态:1).当前用户的数据已经迁移,那么在事务的保证下,两端可以保持数据一致;2).当前用户的数据还没有迁移,或者正在迁移,那么新增的一条记录会优先插入到新表,在迁移的过程中是会发生主键冲突。这个时候需要留意,这一类的日志是数据库的主键完整性约束来保证两端数据的一致性造成的。
- 删除操作:这类操作,需要考虑五种状态:1).当前用户的数据已经迁移,那么在事务的保证下,两端可以保持数据一致,所以删除之后返回的影响行数肯定大于0;2).当前用户的数据还没有迁移,那么操作之后,旧表的数据会被执行,而新表因为没有数据,所以没有影响;3)当前用户的数据正在迁移并且已经插入了这条数据,那么性质就跟第一种情况一样;4)当前用户的数据正在迁移并且还没有插入这条数据,并且还没有加载这些数据,那么性质就跟第2条一样;5).当前用户的数据正在迁移并且还没有插入这条数据,但是已经加载这些数据在内存中,那么这个时候就需要注意了,如果只是做单纯的双写,那么反而导致了旧表的数据被删除了但是新表却插入了这条数据(因为对于新表一开始进行删除返回的影响行数为0,但是数据迁移服务的插入晚于这个操作),这条数据就是一条脏数据。所以我们需要有一个地方进行存储这类执行的SQL。也就是希望数据迁移服务能注意到还需要执行这条SQL。
- 更新操作:可能很多小伙伴会觉得,删除和更新操作不是一样的吗,为什么在流程图中没有进行双写。起初我也是这么认为的,按照删除的操作一样,采用双写+追加SQL的方案不就可以了吗?但是当我写的时候,突然脑袋里面出现了“ABA”的字样。是的,更新操作不同于删除,删除针对的是整条记录本身,也就是说,这个记录只有两种状态——删除或者未删除,并且删除操作本身就确定了一个终态操作。但是更新操作则不同,它针对的是记录当中的字段,并且,还需要强调顺序性质。举个例子,有一个字段原本是A,当业务请求将A改成B,这个时候数据迁移服务加载了这条数据,但是还未写入新表的时候,又来一条业务请求,将B改成C。由于新表还未写入这条记录,所以更新影响的行数为0。紧接着数据迁移服务需要将刚才加载出来的数据插入到新表当中,这个时候就会造成旧表数据和新表数据不一致的问题!如下图:
正是因为存在这些特殊的情况(当前,这些情况也是比较极端的情况),所以这类简单的双写肯定是需要额外考虑的。在这里的实现中,我使用了ConcurrentHashMap来为每个用户存储需要进行这些SQL操作,这些的SQL操作又存储在ConcurrentLinkedQueue当中。用这两个数据结构来保证操作的并发以及顺序性。
当然,不知道大家是否留意到,我在获取这个内存队列的时候使用了分布式锁。我们来看里面在这里的实现
public static Map<Integer, ConcurrentLinkedQueue<Tuple2<String, Object[]>>> appendSQLs = new ConcurrentHashMap<>();
RLock rLock = redissonClient.getLock(Constants.D_KEY + userId);
ConcurrentLinkedQueue<Tuple2<String, Object[]>> queue = null;
try {
rLock.lock();
queue = Constants.appendSQLs.get(userId);
if (queue == null) {
Constants.appendSQLs.put(userId, new ConcurrentLinkedQueue<>());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
尽管ConcurrentHashMap和ConcurrentLinkedQueue作为单个来看是线程安全的,但是如果先取再设置的话,这个就不再是原子操作了。在并发的环境中,这个地方就需要分布式锁了。
数据迁移服务
其实知道整个流程之后,尤其是讲述完上面数据业务服务的流程之后,对于这一步就会比较清楚了。基本上的原则就是根据用户,查询到相关的记录。由于可能存在比较大的数据量,所以需要分页进行获取,再进行插入。在插入完成之后,需要检查是否还存在需要追加的SQL,如果存在的话,就需要进行执行,知道队列消费完。
接下来大家想想,这个地方是否需要进行加锁呢?给你们几分钟思考
……
……
……
……
……
……
……
……
OK,我知道你们肯定不会去思考,哈哈哈。之所以会提出这个问题,是因为我在写的时候也想到了这个问题。尽管ConcurrentLinkedQueue可以保证线程安全,可以保证追加的SQL的入队和出队。既然都这样了为什么还会有这样的顾虑呢?这是因为我想到,会不会有一种情况,就是数据迁移服务在将所有追加的SQL消费完毕之后,准备执行下一个用户的时候,这个时候这个内存队列要是又加入的新的SQL,那么这个时候就无法被消费了?对于新增、删除而言倒是不必,但是对于更新操作就需要额外考虑了。所以这里是需要对这些用户进行一个标识,用来标识是否迁移完成。这样在更新操作的时候,只需要判断用户是否迁移完成进行操作了。如果用户迁移完了,那么就进行事务双写即可;如果没有迁移完,那么就需要追加SQL。所以事先需要进行设置:
/**
* 在Redis当中初始化用户数据
*/
@RequestMapping(value = "/initRedisUser", method = RequestMethod.GET)
public Object initRedisUser() {
for (int i = 0; i < 1000; i++) {
RBucket<String> redisUser = redissonClient.getBucket(Constants.USER_KEY + i);
redisUser.set("0");
}
return "SUCCESS";
}
内存队列
这里存储追加的SQL是放在内存队列当中的。其实各位可以和上一篇文章的技术选型比较这来看,上一篇文章的存储介质是MySQL。各位观众肯定会奇怪,为什么这次不采用MySQL,而是存储在内存中,万一机器挂掉了怎么办?现在都是多台机器,那么岂不是每台机器上都维护了一些队列?我觉得大家能有这方面的考量,那还是十分细心的。在实际的操作中我也是用MySQL当做存储的,这还是得益于MySQL的事务机制。只是在这里,由于给大家的实验是单机的,所以用这种内存队列会比较方便,而且效率也比较高(好吧,我承认我就是懒得写,嫌麻烦)。
那么回过头来,假如我们确实是单机的情况下,使用这个内存队列作为存储介质,那万一服务挂掉了,写在内存当中的SQL不就都没有了吗?对于这种情况,可以有两种方式处理:
- 反正是数据迁移,大不了新表的数据不要了,truncate一下新表,重新开始迁移(当然,这个还是不推荐的)。
- 根据写在redis当中的用户标识,我们可以判断出,当前还有那些用户还没有完全迁移好的,所以只要从这些用户重新开始迁移就好了。不过需要考虑的是,可能在服务挂掉的时候,此时这个用户的部分数据已经插入到新表了,那么就需要将这一部分数据进行删除,然后再重新开始,这是因为内存队列当中追加的SQL不存在了,这一部分数据的状态无法与旧表保持一致,除非跟旧表当中主键对应的这部分数据,一个字段一个字段地进行对比。
具体操作
接下来给大家总结下实验的具体步骤:
- /initRedisUser: 调用这个接口,将用户的标识初始化到Redis当中
- /migrationWork: 调用这个接口,进行迁移工作
- /doubleWrite: 调用这个接口,模拟用户业务流量
- /checkWork: 调用这个接口,用来检查是否数据保持一致
大家可以在本地运行一波哈!然后顺便看看日志。我是用IDEA的,大家记得将控制台的日志输出到文件当中来查看,这样会比较方便。
完整的代码也已经上传到Github上完整的代码也已经上传到Github上完整的代码也已经上传到Github上。
扩展
在将公司的库表水平扩容结束之后,尽管得到大家一波商业互吹,但是我还是在想,如果是跨库的场景呢?这种双写是否可还行?这也正是我在开头讲的那样,由于是单库多表的操作,所以可以使用MySQL的事务。但是在多库的场景下,我们只能依赖分布式事务组件,例如阿里的seats之类的,在尚未完全搭建这类基础服务的场景下,贸然地使用这类技术选型肯定会适得其反。其实从上面的流程上来看,只是这两种最终的状态是,一个是依赖事务进行双写,一个是通过消费RocketMQ当中的消息来保证两端数据一个。一个是真双写,一个倒像是从库同步主库的样子。
其实我说了这么多,还是给大家一个思路,还是要根据自己的具体业务场景选择更加合适自己的技术选型及思路,可千万不要人云亦云,还是要有自己的思考。我是觉得,只有自己一点点学会思考,人才会有一点点进步!
Github地址
:https://github.com/showyool/juejin.git