Redisson简介
Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
https://redisson.org/
https://redisson.org/Redisson.pdf
https://github.com/redisson/redisson/
Redisson实现的分布式锁
Redisson获取锁的方式
RedissonLock
加锁
- 使用redisson加锁的代码示例
public void getlock() {
Config config = new Config();
config.useSingleServer()
.setTimeout(1000000)
.setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
RLock testLock = redissonClient.getLock("test_lock");
// 加锁
testLock.lock();
try {
Thread.sleep(100000L);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
testLock.unlock();
}
}
- org.redisson.Redisson#getLock
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
- RedissonLock获取锁的源码部分
org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程Id
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// 获取锁失败
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}
org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
- 加锁过程使用的lua脚本(其中KEYS[1]为锁在redis中的key值,ARGV[1]为锁的租期,对应redis中hash字段的value值,ARGV[2]为锁在redis中的hash对象的字段名称)
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
-
redis中锁的结构
释放锁
org.redisson.RedissonLock#unlock
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
org.redisson.RedissonLock#unlockAsync(long)
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
if (e != null) {
// 取消分布式的watchDog续期
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
- 释放锁执行的lua脚本( KEYS[1]:redis中hash对象的key值;KEYS[2]:释放锁的channelName;ARGV[1]:LockPubSub.UNLOCK_MESSAGE;ARGV[2]:锁的过期时间;ARGV[3]:redis中hash对象的Field内容 )
先查询redis中是否存在这个锁
如果这个锁不存在,则直接return;
如果锁存在,则将锁的占用-1
如果锁的引用计数为0,则证明没有线程/进程占用锁,删除锁,并通知客户端
如果锁的引用计数大于0,则证明还有线程/进程在占用锁,则重新设置锁的过期时间
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
锁续期
如果同时有线程A和线程B在作业。
线程A先获取到锁,开始执行业务逻辑,但是线程A的业务逻辑因为种种原因导致在锁的超时时间内没有完成。如果没有锁续期机制,则会直接丢失锁
这个时候,线程B获取到锁,线程B执行业务的过程中,线程A执行完毕,会尝试释放锁。这个时候可能会将线程B持有的锁释放
为了解决这种问题,redisson引入了锁续期机制
- org.redisson.RedissonLock#renewExpirationAsync
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
- 锁续期执行的lua脚本
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
RedLock
上述加锁、锁续期、释放锁的过程应该可以满足
redis集群稳定
的大部分场景的需求。
但是还有一种场景:获取锁之后,redis集群的master挂了,slaver变为新的master。由于redis的主从同步是异步的,无法保证slave节点的数据跟master的数据是完全一致的。
这个时候,如果某个线程加锁成功,锁的信息保存在master上,还未来得及同步到slave上。这个时候master挂了,slave变为新的master。就会出现锁丢失的问题。等到老的master的恢复之后,会存在锁被重复获取的问题。
- 为了解决上述问题,我们可以选择
RedLock
的方案.Redisson正好就为我们实现的RedLock的整个过程。