本章讲解一下基于redis实现的分布式锁
基于redis的分布式锁
1、基本实现
借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。
多个客户端同时获取锁(setnx)
获取成功,执行业务逻辑,执行完成释放锁(del)
-
其他客户端等待重试
public void deduct() { // 加锁setnx Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111"); // 重试:递归调用 if (!lock){ try { Thread.sleep(50); this.deduct(); } catch (InterruptedException e) { e.printStackTrace(); } } else { try { // 1. 查询库存信息 String stock = redisTemplate.opsForValue().get("stock").toString(); // 2. 判断库存是否充足 if (stock != null && stock.length() != 0) { Integer st = Integer.valueOf(stock); if (st > 0) { // 3.扣减库存 redisTemplate.opsForValue().set("stock", String.valueOf(--st)); } } } finally { // 解锁 this.redisTemplate.delete("lock"); } } }
其中,加锁也可以使用循环,防止递归调用栈溢出
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "111")){
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2、防死锁
问题:setnx刚刚获取了锁后,此时服务器宕机,导致del释放锁无法执行,进而导致锁无法释放(死锁)
解决:需要给锁设置过期时间,超过时间如果没有手动释放,则自动释放
设置过期时间两种方式:
通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)需要使用lua脚本来保证原子性
-
使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)
while(Boolean.FALSE.equals(this.stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS))) { try { //此处使我们对竞争压力变得小(类似于自旋),如果去掉睡眠时间,那么会导致大量的线程去竞争锁,从而导致cpu占用率过高,性能降低 Thread.sleep(20); } catch (InterruptedException e) { throw new RuntimeException(e); } }
3、防误删
锁的有效时间是10秒 第一个请求因为某些原因执行了15秒那么在第一个请求执行的第10秒后,锁过期自动释放了,被b线程获取了锁,此时b执行到第5秒a线程执行完毕了,把锁释放了,此时释放的锁事b线程的锁不是自己的锁了,就会导致出现线程安全问题,导致锁失效
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
问题:缺少原子性
如果在删除锁之前,锁已经过期了,那么就会误删其他线程的锁,所以此处需要保证原子性,没有一个命令可以同时做到判断 + 删除,所以需要使用lua脚本
//此处需要通过lua脚本判断是否是自己的锁,再删除锁,可以防止误删(判断和释放锁保证了原子性)
String script = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
this.stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList("lock"), uuid);
4、可重入
由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。
解决 :hash数据模型+lua脚本
Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑(hincrby也可以初始化key相当于setnx)
//如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1。
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) ==
1)
then
redis.call('hincrby', KEYS[1], ARGV[1], 1);
redis.call('expire', KEYS[1], ARGV[2]);
return 1;
else
return 0;
end
解锁脚本
-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 不是自己的锁,在尝试解其他线程的锁,解锁失败
-- 如果为 >0 代表 可重入次数被减 1
-- 如果为 =0 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return nil;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then
return 0;
else
redis.call('del', KEYS[1]);
return 1;
end;
具体代码如下
/**
* @ClassName DistributedLockFactory
* @Description 分布式锁工厂
* @Author lvh
* @Date 2022/11/30 21:29
* @Version 1.0
*/
@Component
public class DistributedLockFactory {
@Autowired
private StringRedisTemplate redisTemplate;
private String uuid;
public DistributedLockFactory() {
this.uuid = UUID.randomUUID().toString();
}
public DistributedRedisLock getRedisLock(String lockName) {
return new DistributedRedisLock(lockName,uuid, redisTemplate);
}
}
DistributedRedisLock代码如下
/**
* @ClassName distributedRedisLock
* @Description redis分布式锁
* @Author lvh
* @Date 2022/11/30 19:03
* @Version 1.0
*/
public class DistributedRedisLock implements Lock {
private String lockName;
private String uuid;
private long expire = 30;
private StringRedisTemplate redisTemplate;
public DistributedRedisLock(String lockName, String uuid, StringRedisTemplate redisTemplate) {
this.lockName = lockName;
this.redisTemplate = redisTemplate;
this.uuid = uuid+":"+Thread.currentThread().getId();
}
@Override
public void lock() {
this.tryLock();
}
@Override
public void lockInterruptibly() {
}
@Override
public boolean tryLock() {
try {
return this.tryLock(-1L, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 加锁方法
*
* @param time the maximum time to wait for the lock
* @param unit the time unit of the {@code time} argument
* @retur 获取锁成功返回true,否则返回false
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time != -1) {
this.expire = unit.toSeconds(time);
}
//加锁的lua脚本(如果key不存在或者/key存在并且uuid和当前线程的uuid一致,则执行then 否则执行else)
String script = "if redis.call('exists',KEYS[1],ARGV[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire','lock',ARGV[2]) return 1 else return 0 end";
//如果获取锁执行失败返回false取反为true则循环获取锁
while (Boolean.FALSE.equals(this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, String.valueOf(expire)))) {
Thread.sleep(100);
}
//加锁成功,返回之前,开启定时任务自动续期
this.renewExpire();
return true;
}
/**
* 解锁方法
*/
@Override
public void unlock() {
//解锁的lua脚本
String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then redis.call('del',KEYS[1]) return 1 else return 0 end";
//此处的Long.class是因为要对方返回的nil和0和1兼容 nil对应long的null 其余的对应各个值
Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList(lockName), uuid);
if (flag == null) {
throw new IllegalMonitorStateException("You do not own this lock");
}
}
@Override
public Condition newCondition() {
return null;
}
private String getId() {
return uuid + ":" + Thread.currentThread().getId();
}
/**
* 续期方法
*/
private void renewExpire() {
String script = "if redis.call('hexists',KEYS[1],ARGV[1])==1 " +
"then " +
"return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
Boolean flag = redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, String.valueOf(expire));
if (Boolean.TRUE.equals(flag)) {
renewExpire();
}
}
}, this.expire * 1000 / 3, this.expire * 1000 / 3);
}
}
程序中使用
public void deduct() {
DistributedRedisLock redisLock=this.distributedLockClient.getRedisLock("lock");
redisLock.lock();
try {
// 1. 查询库存信息
String stock = redisTemplate.opsForValue().get("stock").toString();
// 2. 判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
// 3.扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
redisLock.unlock();
}
}
自动续期 每间隔三分之一的时间执行一次续期
private void renewExpire(){
String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
"then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
renewExpire();
}
}
}, this.expire * 1000 / 3);
}
手写redis分布式锁小结
锁操作:
加锁:
setnx:独占排他 死锁、不可重入、原子性
set k v ex 30 nx:独占排他、死锁 不可重入
-
hash + lua脚本:可重入锁
- 判断锁是否被占用(exists),如果没有被占用则直接获取锁(hset/hincrby)并设置过期时间(expire)
- 如果锁被占用,则判断是否当前线程占用的(hexists),如果是则重入(hincrby)并重置过期时间(expire)
- 否则获取锁失败,将来代码中重试
-
Timer定时器 + lua脚本:实现锁的自动续期
判断锁是否自己的锁(hexists == 1),如果是自己的锁则执行expire重置过期时间
解锁
- del:导致误删
- 先判断再删除同时保证原子性:lua脚本
- hash + lua脚本:可重入
- 判断当前线程的锁是否存在,不存在则返回nil,将来抛出异常
- 存在则直接减1(hincrby -1),判断减1后的值是否为0,为0则释放锁(del),并返回1
- 不为0,则返回0
重试:递归 循环
二、红锁算法
redis集群状态下的问题:
客户端A从master获取到锁
在master将锁同步到slave之前,master宕掉了。
slave节点被晋级为master节点
客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。
安全失效!
解决集群下锁失效,参照redis官方网站针对redlock文档:https://redis.io/topics/distlock
在算法的分布式版本中,我们假设有N个Redis服务器。这些节点是完全独立的,因此我们不使用复制或任何其他隐式协调系统。将N设置为5是一个合理的值,因此需要在不同的计算机或虚拟机上运行5个Redis主服务器,确保它们以独立的方式发生故障。
为了获取锁,客户端执行以下操作:
客户端以毫秒为单位获取当前时间的时间戳,作为起始时间。
客户端尝试在所有N个实例中顺序使用相同的键名、相同的随机值来获取锁定。每个实例尝试获取锁都需要时间,客户端应该设置一个远小于总锁定时间的超时时间。例如,如果自动释放时间为10秒,则尝试获取锁的超时时间可能在5到50毫秒之间。这样可以防止客户端长时间与处于故障状态的Redis节点进行通信:如果某个实例不可用,尽快尝试与下一个实例进行通信。
客户端获取当前时间 减去在步骤1中获得的起始时间,来计算获取锁所花费的时间。当且仅当客户端能够在大多数实例(至少3个)中获取锁时,并且获取锁所花费的总时间小于锁有效时间,则认为已获取锁。
如果获取了锁,则将锁有效时间减去 获取锁所花费的时间,如步骤3中所计算。
如果客户端由于某种原因(无法锁定N / 2 + 1个实例或有效时间为负)而未能获得该锁,它将尝试解锁所有实例(即使没有锁定成功的实例)。
每台计算机都有一个本地时钟,我们通常可以依靠不同的计算机来产生很小的时钟漂移。只有在拥有锁的客户端将在锁有效时间内(如步骤3中获得的)减去一段时间(仅几毫秒)的情况下终止工作,才能保证这一点。以补偿进程之间的时钟漂移
当客户端无法获取锁时,它应该在随机延迟后重试,以避免同时获取同一资源的多个客户端之间不同步(这可能会导致脑裂的情况:没人胜)。同样,客户端在大多数Redis实例中尝试获取锁的速度越快,出现裂脑情况(以及需要重试)的窗口就越小,因此理想情况下,客户端应尝试将SET命令发送到N个实例同时使用多路复用。
值得强调的是,对于未能获得大多数锁的客户端,尽快释放(部分)获得的锁有多么重要,这样就不必等待锁定期满才能再次获得锁(但是,如果发生了网络分区,并且客户端不再能够与Redis实例进行通信,则在等待密钥到期时需要付出可用性损失)。
2、redisson中的分布式锁
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
官方文档地址:https://github.com/redisson/redisson/wiki
2.1、可重入锁(Reentrant Lock)
基于Redis的Redisson分布式可重入锁RLock
Java对象实现了java.util.concurrent.locks.Lock
接口。
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,需要增加过期时间,但业务逻辑未执行完毕,就过期也会有问题所以,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout
来另行指定。
RLock
对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException
错误。
另外Redisson还通过加锁的方法提供了leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
源码追踪
RLock lock = redisson.getLock("myLock");
lock.lock();
进入lock方法内部
public void lock() {
try {
//进行方法 (租约时间,时间单位,线程是否可中断)
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
继续进入
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//获取当前线程的id
long threadId = Thread.currentThread().getId();
// 尝试获取锁,如果没取到锁,则返回锁的剩余超时时间
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
//如果ttl为null证明加锁成功
根据返回的ttl是否为空判断是否获取锁成功,为空则判断获取锁成功return,否则
1、subscribe(threadId);订阅消息,渠道为prefixName(“redisson_lock__channel”, this.getName());
2、先尝试加锁,失败则会根据ttl时间做休眠,让出CPU,等待订阅到释放锁则重新执行,也叫自旋,如果加锁成功则跳出
3、加锁成功后,unsubscribe(future, threadId);取消订阅
if (ttl != null) {
//// 订阅分布式锁, 解锁时进行通知
CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future);
} else {
entry = (RedissonLockEntry)this.commandExecutor.get(future);
}
try {
// 进入死循环,反复去调用tryAcquire尝试获取锁,直到获取到锁return
while(true) {
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var14) {
if (interruptibly) {
throw var14;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
} finally {
//直接取消订阅
this.unsubscribe(entry, threadId);
}
}
}
代码还是挺长的,不过流程也就两步,要么线程拿到锁返回成功;要么没拿到锁并且等待时间还没过就继续循环拿锁,同时监听锁是否被释放。
加锁核心方法
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
if (leaseTime > 0L) {
// 如果有设置锁的等待时长的话,就直接调用tryLockInnerAsync方法获取锁
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//开启一个监听器,如果发现拿到锁了,就开启定时任务不断去刷新该锁的过期时长
CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
// 如果ttlRemaining为空,也就是tryLockInnerAsync方法中的lua执行结果返回空,证明获取锁成功
if (ttlRemaining == null) {
if (leaseTime > 0L) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//如果没有设置锁的持有时间(leaseTime),则启动看门狗,定时给锁续期,防止业务逻辑未执行完成锁就过期了
this.scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper(f);
}
此处执行lua加锁脚本,逻辑是先判断锁是否存在,如果不存在则加锁返回nil,如果存在也是当前的线程的锁则进行重入+1,否则不是当前线程的锁返回pttl过期时间
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
代码的流程比较简单,大概就是开启一个定时任务,每隔internalLockLeaseTime / 3的时间(这个时间是10秒)就去检测锁是否还被当前线程持有,是的话就重新设置过期时长internalLockLeaseTime,也就是30秒的时间。
而这些定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap中,存储的key就为“线程ID:key名称”,如果发现expirationRenewalMap中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操作。
上面这段代码就是Redisson中所谓的”看门狗“程序,用一个异步线程来定时检测并执行的,以防手动解锁之前就过期了。
解锁
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
public RFuture<Void> unlockAsync(long threadId) {
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
//1、判断锁是否存在,不存在的话用publish命令发布释放锁的消息,订阅者收到后就能做下一步的拿锁处理;
// 2、锁存在但不是当前线程持有,返回空置nil;
// 3、当前线程持有锁,用hincrby命令将锁的可重入次数-1,然后判断重入次数是否大于0,是的话就重新刷新锁的过期时长,返回0,否则就删除锁,并发布释放锁的消息,返回1;
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
当线程完全释放锁后,就会调用cancelExpirationRenewal()方法取消"看门狗"的续时线程
void cancelExpirationRenewal() {
// expirationRenewalMap移除对应的key,就不会执行当前线程对应的"看门狗"程序了
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
task.cancel();
}
总结
1、redisson利用redis的lua保证原子操作性。
2、使用定时任务维持锁的超时
3、间歇性自旋等待锁的释放