Redisson分布式锁提供了WatchDog功能,如果你使用了分布式锁且没有设置超时时间Ression会为你设置一个默认的超时时间,且在你没有主动释放锁之前会不断续期。这样既可以保证在持锁期间的代码不会被其他线程执行,也可以防止死锁的发生。
不过最近在做项目的时候发现我的Redisson断线重连后WatchDog居然失效了。跟了一下Redisson的代码发现了原因,在这里分享一下。
问题重现
String name = "REDIS_LOCK"
try{
if(!redissonClient.getLock(name).tryLock()){
return;
}
doSomething();
}catch(Exception e){
RLock rLock = redissonClient.getLock(name);
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
rLock.unlock();
}
}
项目中用的是tryLock()
,线程会不断地尝试拿到锁,拿到锁之后线程就会开始执行业务代码。当一个线程拿到锁之后不主动释放,WatchDog
就会生效,不断地为这个锁续时。这个时候我们让网络断开一段时间,Redisson
就会报以下这个错,这个时候因为连不上redis了WatchDog
会在默认的时间内失效,锁也会被释放。
2020-11-06 14:56:53.682 [redisson-timer-4-1] ERROR org.redisson.RedissonLock - Can't update lock REDIS_LOCK expiration
org.redisson.client.RedisResponseTimeoutException: Redis server response timeout (3000 ms) occured after 3 retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: null, params: null, channel: [id: 0x1e676dd8, L:/192.168.20.49:58477 - R:/192.168.2.21:6379]
at org.redisson.command.RedisExecutor$3.run(RedisExecutor.java:333)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
当我们网络正常后程序再执行上面的代码,某个线程持有的REDIS_LOCK
这个锁并不会像往常一样一直持有,过了30秒之后就会自动失效,也就是说WatchDog
不再为你续时了。反复测试几次都是这样的结果,这个可能是Redisson
的一个bug,目前用的是最新的redisson 3.13.6 版本,可能未来的新版本不会有这个问题。
分析原因
下载redisson
源码打开RedissonLock
这个类,找到我们用的tryLock
方法
@Override
public boolean tryLock() {
return get(tryLockAsync());
}
发现trylock()
和lock()
最终实现的方法是tryAcquireOnceAsync()
这个方法,我们看一下这个方法的逻辑
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//判断有没有设置超时时间(-1表示没有设置)
if (leaseTime != -1) {
//异步执行redis加锁脚本
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
//异步执行redis加锁脚本,且根据异步结果判断是否加锁成功
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//这里获取watchdog的配置时间来作为锁的超时时间
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
//redis脚本执行成功就会执行watchdog的需时任务
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
当没有设置锁的超时时间且加锁成功的时候就会执行scheduleExpirationRenewal(threadId)
这个方法。
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//重新续时逻辑
renewExpiration();
}
}
WatchDog
重新续时逻辑
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
//报错了timer就不会再执行了
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
可以看到renewExpiration()
方法核心是一个timer定时任务, 每次执行完延迟watchdog
配置时间/3的时间再执行一次。也就是说watchdog默认配置是30000毫秒,这里就是就是每十秒执行一次。但要注意是这个定时任务并不会一直执行下去。
if (e != null) {
//报错了timer就不会再执行了
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
当上一次redis续时脚本发生异常的时候就不再执行了,也就是我们在文章开头看到的那个错误ERROR org.redisson.RedissonLock - Can't update lock REDIS_LOCK expiration
。这个设计也是合理的,可以防止资源浪费,那么程序重新trylock()
成功的时候应该为重新启动这个定时任务才对。但其实不是,scheduleExpirationRenewal
方法是有判断的
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
//当ExpirationEntry在EXPIRATION_RENEWAL_MAP里存在就只会添加线程ID,不会重新执行续时逻辑
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//重新续时逻辑
renewExpiration();
}
可以看到核心判断是getEntryName()
这个方法,作为key存放在EXPIRATION_RENEWAL_MAP
中,如果getEntryName()
一直不变renewExpiration()
就永远不会再执行。问题应该就出在这里。
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
protected String getEntryName() {
return entryName;
}
可以看到this.entryName = id + ":" + name;
,其中id
是RedissonClient
创建生成的一个UUID,name就是我们使用锁的名字。我们一般会把RedissonClient
的单例对象注入到spring
容器里,id在springboot
启动后就不会再变了。我们每使用一个分布式锁都会起一个固定的name。也就是说在锁名称不变的情况下entryName
也不会变,redisson
在重新加锁的时候判断entryName
已经存在就不会再续时了。
总结一下:不管是trylock()
还是lock()
方法,同一个锁redisson会设置一个watchdog给它续时,并把续时信息缓存起来,正常情况下执行unlock()
会清除这个缓存。但当客户端与redis断开连接后报"Can't update lock " + getName() + " expiration"
错之后watchdog就会失效,断线重连后再执行trylock()
或者lock()
方法后会因为这个锁的缓存不再执行watchdog的续时逻辑。
解决办法
1.增加watchdog超时时长
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson(RedissonProperties properties) throws IOException {
ObjectMapper mapper = new ObjectMapper();
String jsonString = mapper.writeValueAsString(properties);
Config config = Config.fromJSON(jsonString);
config.setLockWatchdogTimeout(150000);
return Redisson.create(config);
}
watchdog默认超时时间是30000毫秒,它的执行逻辑是30000/3毫秒执行一次续时,也就是说断线后在1-10000毫秒期间重连成功watchdog下次执行后就不会再报错。我们可以把默认的30000毫秒改成150000毫秒,可以提供断线重连的容错几率。但这样并不能完全解决这个问题。
2.修改redisson源码
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName()); //添加异常删除缓存逻辑
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
修改RedissonLock
类里的renewExpiration()
方法,在 if (e != null) {}
判断里加上EXPIRATION_RENEWAL_MAP.remove(getEntryName())
清除缓存逻辑,这样断线重连后就不会因为缓存问题不再执行renewExpiration()
这个方法了。
以上的代码已经提交PR到了Redisson最新的版本,使用最新的Redisson 3.14.0将不会有这个问题。