本篇我们使用mysql实现一个分布式锁。
环境:mysql8,navicat,maven,springboot2.3.11,mybatis-plus
分布式锁的功能
1,分布式锁使用者位于不同的机器中,锁获取成功之后,才可以对共享资源进行操作
2,锁具有重入的功能:即一个使用者可以多次获取某个锁
3,获取锁有超时的功能:即在指定的时间内去尝试获取锁,超过了超时时间,如果还未获取成功,则返回获取失败
4,能够自动容错,比如:A机器获取锁lock1之后,在释放锁lock1之前,A机器挂了,导致锁lock1未释放,结果会lock1一直被A机器占有着,遇到这种情况时,分布式锁要能够自动解决,可以这么做:持有锁的时候可以加个持有超时时间,超过了这个时间还未释放的,其他机器将有机会获取锁
预备技能:乐观锁
通常我们修改表中一条数据过程如下:
t1:select获取记录R1
t2:对R1进行编辑
t3:update R1
我们来看一下上面的过程存在的问题:
如果A、B两个线程同时执行到t1,他们俩看到的R1的数据一样,然后都对R1进行编辑,然后去执行t3,最终2个线程都会更新成功,后面一个线程会把前面一个线程update的结果给覆盖掉,这就是并发修改数据存在的问题。
我们可以在表中新增一个版本号,每次更新数据时候将版本号作为条件,并且每次更新时候版本号+1,过程优化一下,如下:
t1:打开事务start transaction
t2:select获取记录R1,声明变量v=R1.version
t3:对R1进行编辑
t4:执行更新操作
update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
t5:t4中的update会返回影响的行数,我们将其记录在count中,然后根据count来判断提交还是回滚
if(count==1){
//提交事务
commit;
}else{
//回滚事务
rollback;
}
上面重点在于步骤t4,当多个线程同时执行到t1,他们看到的R1是一样的,但是当他们执行到t4的时候,数据库会对update的这行记录加锁,确保并发情况下排队执行,所以只有第一个的update会返回1,其他的update结果会返回0,然后后面会判断count是否为1,进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。
上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。
使用mysql实现分布式锁
我们创建一个分布式锁表,如下
DROP TABLE IF EXISTS t_lock;
create table t_lock(
lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '锁唯一标志',
request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用来标识请求对象的',
lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数',
timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间',
version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1'
)COMMENT '锁信息表';
java代码如下
mapper接口
package com.shiguiwu.springmybatis.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import org.springframework.stereotype.Repository;
/**
* @description: 锁mapper
* @author: stone
* @date: Created by 2021/5/30 11:12
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.mapper
*/
@Repository
public interface LockMapper extends BaseMapper<LockModel> {
}
锁对象model
package com.shiguiwu.springmybatis.lock.model;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.Version;
import lombok.Data;
/**
* @description: 锁模型
* @author: stone
* @date: Created by 2021/9/10 11:13
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock.model
*/
@Data
@TableName("t_lock")
public class LockModel {
/**
* 锁的唯一值
*/
@TableId
private String lockKey;
/**
* 请求id,同一个线程里请求id一样
*/
private String requestId;
//锁次数
private Integer lockCount;
//锁超时
private Long timeout;
//乐观锁版本
@Version
private Integer version;
}
锁接口
package com.shiguiwu.springmybatis.lock;
/**
* @description: 锁接口
* @author: stone
* @date: Created by 2021/9/10 11:40
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock
*/
public interface ILock<T> {
/**
* 获取分布式锁,支持重入
* @param lockKey 锁可以
* @param lockTimeout 持有锁的有效时间,防止死锁
* @param getTimeout 获取锁超时时间,
* @return 是否锁成功
*/
public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception;
/**
* 解锁
* @param lockKey 锁key
*
*/
public void unlock(String lockKey);
/**
* 重置锁对象
* @param t 锁对象
* @return 返回锁记录
*/
public int restLock(T t);
}
锁的实现代码如下
package com.shiguiwu.springmybatis.lock;
import cn.hutool.core.util.StrUtil;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import com.shiguiwu.springmybatis.mapper.LockMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @description: mysql实现分布式锁
* @author: stone
* @date: Created by 2021/9/10 11:09
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis.lock
*/
@Component
@Slf4j
public class MysqlLock implements ILock<LockModel>{
static ThreadLocal<String> requestIds = new ThreadLocal<>();
@Autowired
private LockMapper lockMapper;
public String getRequestId() {
String requestId = requestIds.get();
if (StrUtil.isBlank(requestId)) {
requestId = UUID.randomUUID().toString();
requestIds.set(requestId);
}
log.info("获取到的requestId===> {}", requestId);
return requestId;
}
/**
* 获取锁
* @param lockKey 锁可以
* @param lockTimeout 持有锁的有效时间,防止死锁
* @param getTimeout 获取锁超时时间,
* @return
*/
@Override
public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception {
log.info(" lock start =======================> {}",lockKey);
//从local中获取 请求id
String requestId = this.getRequestId();
//获取锁的结果
boolean lockResult = false;
//开始时间
long startTime = System.currentTimeMillis();
while (true) {
LockModel lockModel = lockMapper.selectById(lockKey);
if (Objects.nonNull(lockModel)) {
//获取锁对象的请求id
String reqId = lockModel.getRequestId();
//如果是空,表示改锁未被占有
if (StrUtil.isBlank(reqId)) {
//马上占有它
//设置请求id
lockModel.setRequestId(requestId);
//设置锁次数
lockModel.setLockCount(1);
//设置超时时间,防止死锁
lockModel.setTimeout(System.currentTimeMillis() + lockTimeout);
if (lockMapper.updateById(lockModel) == 1) {
lockResult = true;
break;
}
}
//如果request_id和表中request_id一样表示锁被当前线程持有者,此时需要加重入锁
else if (requestId.equals(reqId)) {
//可重入锁
lockModel.setTimeout(System.currentTimeMillis() + lockTimeout);
//设置获取初次
lockModel.setLockCount(lockModel.getLockCount() + 1);
if (lockMapper.updateById(lockModel) == 1) {
lockResult = true;
break;
}
}
//不为空,也不相等,说明是其他线程占有
else {
//锁不是自己的,并且已经超时了,则重置锁,继续重试
if (lockModel.getTimeout() < System.currentTimeMillis()) {
//未超时,继续重试
this.restLock(lockModel);
}
//如果未超时,休眠100毫秒,继续重试
else {
if (startTime + getTimeout > System.currentTimeMillis()) {
TimeUnit.MILLISECONDS.sleep(100);
}
else {
//防止长时间阻塞
break;
}
}
}
}
//如果是空,就插入一个锁,重新尝试获取锁
else {
lockModel = new LockModel();
//设置锁key
lockModel.setLockKey(lockKey);
lockMapper.insert(lockModel);
}
}
log.info(" lock end =======================> {}",lockKey);
return lockResult;
}
/**
* 释放锁
* @param lockKey 锁key
*/
@Override
public void unlock(String lockKey) {
LockModel lockModel = lockMapper.selectById(lockKey);
//获取当前线程的请求id
String reqId = this.getRequestId();
//获取锁次数
int count = 0;
//当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
if (Objects.nonNull(lockModel)
&& reqId.equals(lockModel.getRequestId())
&& (count = lockModel.getLockCount()) > 0) {
if (count == 1) {
//重置锁
this.restLock(lockModel);
}
//重入锁的问题,锁的次数减一
else {
lockModel.setLockCount(lockModel.getLockCount() - 1);
//更新次数
lockMapper.updateById(lockModel);
}
}
}
/**
* 重置锁
* @param lockModel 锁对象
* @return 更新条数
*/
@Override
public int restLock(LockModel lockModel) {
lockModel.setLockCount(0);
lockModel.setRequestId("");
lockModel.setTimeout(0L);
return lockMapper.updateById(lockModel);
}
}
上面代码中实现了文章开头列的分布式锁的所有功能,大家可以认真研究下获取锁的方法:lock,释放锁的方法:unlock。
测试用例
package com.shiguiwu.springmybatis;
import com.shiguiwu.springmybatis.lock.ILock;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @description: 锁测试
* @author: stone
* @date: Created by 2021/9/10 15:32
* @version: 1.0.0
* @pakeage: com.shiguiwu.springmybatis
*/
@SpringBootTest
@Slf4j
public class LockApplicationTests {
@Autowired
private ILock<LockModel> mysqlLock;
////测试重复获取和重复释放
@Test
public void testRepeat() throws Exception {
for (int i = 0; i < 10; i++) {
mysqlLock.lock("key1", 10000L, 1000);
}
for (int i = 0; i < 10; i++) {
mysqlLock.unlock("key1");
}
}
// //获取之后不释放,超时之后被thread1获取
@Test
public void testTimeout() throws Exception {
String lockKey = "key2";
mysqlLock.lock(lockKey, 5000L, 1000);
Thread thread1 = new Thread(() -> {
try {
mysqlLock.lock(lockKey, 5000L, 7000);
} catch (Exception e) {
e.printStackTrace();
} finally {
mysqlLock.unlock(lockKey);
}
}, "thread1");
thread1.start();
thread1.join();
}
}
test1方法测试了重入锁的效果。
test2测试了主线程获取锁之后一直未释放,持有锁超时之后被thread1获取到了
留给大家一个问题
上面分布式锁还需要考虑一个问题:比如A机会获取了key1的锁,并设置持有锁的超时时间为10秒,但是获取锁之后,执行了一段业务操作,业务操作耗时超过10秒了,此时机器B去获取锁时可以获取成功的,此时会导致A、B两个机器都获取锁成功了,都在执行业务操作,这种情况应该怎么处理?大家可以思考一下然后留言,我们一起讨论一下。