使用 MySQL 5.7 做测试,数据库引擎为 InnoDB,数据库隔离级别为可重复读(REPEATABLE-READ),读读共享,读写互斥。在这个隔离级别下,在多事务并发的情况下,还是会出现数据更新的冲突问题。
先分析一下更新冲突的问题是如何产生的。
假设我们有一张销量表 goods_sale
,表结构如下:
字段 | 数据类型 | 说明 |
---|---|---|
goods_sale_id | varchar(32) | 销量 id |
goods_id | varchar(32) | 商品 id |
count | int(11) | 销量 |
比如在某一时刻事务 A 和事务 B,在同时操作表 goods_sale
的 goods_id = 20191017344713049535651840506935 的数据,当前销量为 100。
goods_sale_id | goods_id | count |
---|---|---|
20191017344778600995856384326638 | 20191017344713049535651840506935 | 100 |
两个事务的内容一样,都是先读取的数据,count +100 后更新。
我们这里只讨论乐观锁的实现,为了便于描述,假设项目已经集成 Spring 框架,使用 MyBatis 做 ORM,Service 类的所有方法都使用了事务,事务传播级别使用 PROPAGATION_REQUIRED
,在事务失败会自动回滚。
Service 为 GoodsSaleService
,更新数量的方法为 addCount()
。
@Service
@Transaction
pubic class GoodsSaleService {
@Autowire
private GoodsSaleDao dao;
public void addCount(String goodsId, Integer count) {
GoodsSale goodsSale = dao.selectByGoodsId(goodsId);
if (goodsSale == null) {
throw new Execption("数据不存在");
}
int count = goodsSale.getCount() + count;
goodsSale.setCount(count);
int count = dao.updateCount(goodsSale);
if (count == 0) {
throw new Exception("添加数量失败");
}
}
}
使用的 Dao 为 GoodsSaleDao
,有两个方法
public interface GoodsSaleDao {
GoodsSale selectByGoodsId(@Param("goodsId") String goodsId);
int updateCount(@Param("record") GoodsSale goodsSale);
}
mapper 文件对应的 sql 操作为:
<!-- 查询 -->
<select id="selectByGoodsId" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from goods_sale
where goods_id = #{goodsId}
</select>
<!-- 更新 -->
<update id="updateCount">
update
goods_sale
set count = #{record.count},
where goods_sale_id = #{record.goodsSaleId}
</update>
好了,假设现在有两个线程同时调用了 GoodsSaleService#addCount
,操作同一行数据,会有什么问题?
假设这两个线程对应的事务分为事务 A 和事务 B。用一张流程图来说明问题:
更新冲突了!两次 addCount(100)
,结果应该是 300,结果还是 200。
该如何处理这个问题,有一个简单粗暴的方法,既然这里多线程访问会有线程安全问题,那就上锁,方法加入 synchronized
进行互斥。
public synchronized void addCount(String goodsId, Integer count) {
...
}
这个方案确实也可以解决问题,但是这种简单互斥的做法,锁的粒度太高,事务排队执行,并发度低,性能低。但如果是分布式应用,还得考虑应用分布式锁,性能就更低了。
考虑到这些更新冲突发生的概率其实并不高。这里讨论另一种解决方案,使用乐观锁来实现。原理就是基于 CAS
,比较并交换数据,如果发现被更新过了,直接更新失败。然后加入自旋(自循环)接着更新,直到成功。乐观就在于我们相信冲突发生概率低,如果发生了,就用一种廉价的机制迅速发现,快速失败。
我们来讨论如何实现它。数据库表 GoodsSale
新增一行 data_version
来记录数据更新的版本号。新的表结构如下:
字段 | 数据类型 | 说明 |
---|---|---|
goods_sale_id | varchar(32) | 销量 id |
goods_id | varchar(32) | 商品 id |
count | int(11) | 销量 |
data_version | int(11) | 版本号 |
GoodsSaleDao#updateCount
对应的 mapper 的 SQL 语句进行调整,数据更新的时候同时进行 data_version = data_version + 1
,执行这个 sql 时候已经对数据上行锁了,所以这个 data_version 加 1 的操作为原子操作。
<!-- 乐观锁更新 -->
<update id="updateCount">
update
goods_sale
set count = #{record.count}, data_version = data_version + 1
where goods_sale_id = #{record.goodsSaleId}
and data_version = #{record.dataVersion}
</update>
Dao 调整之后,事务 A 和事务 B 的变化如下:
有了发现冲突快速失败的方案,要想让更新成功,可以在 GoodsSaleService
中加入自旋,重新开始事务业务逻辑的执行,直到没有发生冲突,更新成功。自旋的实现有两种,一种是使用循环,一种是使用递归。
循环实现:
public void addCount(String goodsId, Integer count) {
while(true) {
GoodsSale goodsSale = dao.selectByGoodsId(goodsId);
if (goodsSale == null) {
throw new Execption("数据不存在");
}
int count = goodsSale.getCount() + count;
goodsSale.setCount(count);
int count = dao.updateCount(goodsSale);
if (count > 0) {
return;
}
}
}
递归实现:
public void addCount(String goodsId, Integer count) {
GoodsSale goodsSale = dao.selectByGoodsId(goodsId);
if (goodsSale == null) {
throw new Execption("数据不存在");
}
int count = goodsSale.getCount() + count;
goodsSale.setCount(count);
int count = dao.updateCount(goodsSale);
if (count == 0) {
addCount(goodsId, count)
}
}
通过乐观锁+自旋的方式,解决数据更新的线程安全问题,而且锁粒度比互斥锁低,并发性能好。