线上爆出一个问题,出库服务中排线表中排线时段数据和排线服务中数据对不上,排查了一下是前两天同事上线导致的,主要原因是处理数据库并发操作时,选择的乐观锁字段不当导致的。 下面记录一下问题原因和解决过程。
业务背景介绍
排线服务中有两张表t_route表(排线主表)和t_route_order表(排线对应订单表),多个订单对应同一个排线。
在出库服务中,有很多业务涉及到排线数据的关联查询,因此需要将排线服务中t_route表和t_route_order表中的数据同步给出库服务
(==下面以服务A代替排线服务,服务B代替出库服务==)。
同步的实现方式是,服务A中使用databus拉取相关表的binlog,然后封装成通用的数据实体,通过kafka同步给服务B。
服务B收到服务A中t_route表的变更消息后,会将数据插入到本地数据库t_route_info表中(该表不仅包含排线主体信息,还有一些需要计算才能得到的信息,为了简单起见这里不再赘述)。
服务B收到服务A中t_route_order表变更的消息后,会得到其中的排线时段数据,并将该数据更新到t_route_info的 route_times字段中,多个排线时段用英文逗号隔开(排线字段主要用于展示使用)。
功能实现
下面是t_route_info表部分字段:
字段名 | 字段类型 | 备注 |
---|---|---|
route_no | varchar(100) | 排线号 |
route_name | varchar(100) | 排线名称 |
route_times | varchar(100) | 排线时段 |
update_time | datetime | 更新时间 |
由于一个排线上有多个订单,每个订单对应一个排线时段,并且在服务A中,插入t_route_order时是一个batch insert操作,这就会导致有多条数据变更的消息同时发送到kafka中,我们都知道kafka中同一个topic下一个分区对应一个消费线程,如果有多个分区,消费端就会开启多个线程进行消费,此时如果有多条订单数据的变更消息同时(或者间隔时间很短)到来,服务B中就会出现并发更新t_route_info表的问题,为了解决并发操作的问题,同时在开发时选择了update_time作为乐观锁字段。
每条订单消息的处理逻辑伪代码如下:
// 处理排线订单消息
public boolean executeMsg(RouteOrder order){
//重试次数
int retryCount = 5;
while(retryCount>0){
//根据排线号查询排线主体信息t_route_info,包含旧的排线时段信息
RouteInfo routeinfo = queryRouteInfo(order.getRouteNo());
//构建排线时段信息,用英文逗号隔开
String routeTimes = buildRouteTimes(routeinfo.getRouteTimes(),order.getRouteTime());
// 更新排线主体信息
Date oldUpdateTime = routeinfo.getUpdateTime();
routeinfo.setRouteTimes(routeTimes);
// 设置一个新的时间
routeinfo.setUpdateTime(new Date());
// 更新排线主体信息,使用updateTime作为乐观锁
// sql为: update t_route_info set route_times = xxx,update_time = xxxx where route_no = xxx and update_time = oldUpateTime
// 如果并发更新失败,返回为0
int count = updateRouteInfo(routeinfo,oldUpdateTime);
if(count > 0){
return true;
}
// 每次并发更新失败,都要slee 1秒,并且次数减1
Thread.sleep(1000)
retryCount--;
}
}
结果发现,如果同时有多个相同排线的订单新同步过来,则排线时段数据会出现丢失的情况。
问题分析
上面功能实现中,使用update_time作为乐观锁,update_time字段类型为datetime,也就是会精确到秒(比如2018-12-01 15:33:20), 而代码中使用Thread.sleep(1000),也即睡眠1秒。
为了简单起见,假设当前有3条消息同时到来,此时数据库中update_time值是2018-12-01 15:33:20,route_times值为空, 这3个线程的查询得到的旧的值是一样的。 消息1中排线时段值为10:00-11:00,消息1会将自己的排线时段新和旧的值拼接为“10:00-11:00”。
消息2中排线时段值为15:00-16:00,消息2会将自己的排线时段新和旧的值拼接为“15:00-16:00”。
消息3中排线时段值为17:00-18:00,消息3会将自己的排线时段新和旧的值拼接为“17:00-18:00”。
如果消息1的线程首先执行update操作,将update_time更新为2018-12-01 15:33:30 (假设此时系统时间为2018-12-01 15:33:30秒),route_times更新为“10:00-11:00”。接着消息2和消息3对应的线程去更新时由于updaet_time已经被更新,则这两次操作返回结果都是0,这两个线程都会sleep 1秒,然后再次重申执行查询,更新的操作(此时消息2拼接的排线时段值应该是“10:00-11:00,15:00-16:00 ”,而消息3拼接的值为“10:00-11:00,17:00-18:00”)。
此时消息2和消息3得到的旧的update_time都是2018-12-01 15:33:30 (系统时间是2018-12-01 15:33:31),然后消息2先执行更新,由于操作时间很短(几毫秒内),update_time仍然会被设置为2018-12-01 15:33:30,接着消息3再去判断update_time,仍然等于旧的oldUpateTime,可以更新成功,则消息2的更新就会被覆盖。
解决办法
根本原因是update_time不是一个原子变化的值,作为乐观锁的字段,其值在获取或者说创建时应该保证唯一性(可以使用redis的incr,或者数据库的 update set version = version+1)。
因此可以使用一下方式解决:
(1)分布式锁。 但是这种方式性能太低,不建议使用。
(2)使用一个递增的值作为乐观锁字段。
比如 添加version int(10) 字段。
每次查询时,都得到旧的版本号,更新时:
update t_route_info set route_times = xxx,version = version +1 where route_no = xxx and version = oldVersion.