一、说明
1 .Redisson的官网文档地址:https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#83-multilock 其实也有中文,如果刚开始英文有点困难,可以尝试中英文对比着看,这对于阅读英文文档也许有一定的帮助
- redisson分布式锁这块是支持MultiLock这个机制的,可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁,一次性锁定多个资源,再去处理一些事情,然后一次性释放所有的资源对应的锁
- 在项目里使用的时候,很多时候一次性要锁定多个资源,比如说锁掉一个库存,锁掉一个订单,锁掉一个积分,一次性锁掉多个资源,多个资源都不让别人随意修改,然后你再一次性更新多个资源,释放多个锁
二、源码
代码片段一、
public static void main(String[] args) throws Exception {
Config config = new Config();
// 1. 这里的Redis集群是我本地搭建的一套集群,因为是研究源码,所以配置直接硬编码到代码里
config.useClusterServers()
.addNodeAddress("redis://192.168.0.107:7001")
.addNodeAddress("redis://192.168.0.107:7002")
.addNodeAddress("redis://192.168.0.110:7003")
.addNodeAddress("redis://192.168.0.110:7004")
.addNodeAddress("redis://192.168.0.111:7005")
.addNodeAddress("redis://192.168.0.111:7006");
RedissonClient redisson = Redisson.create(config);
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1,lock2,lock3);
// 代码片段二、
lock.lock();
// 代码片段六、
lock.unlock();
}
代码片段二、
RedissonMultiLock类中
public void lock(long leaseTime, TimeUnit unit) {
try {
// 1. 代码片段三、
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
代码片段三、
@Override
public void lockInterruptibly() throws InterruptedException {
// 这里的-1后面会用到,具体-1代表是什么意思,后面的代码分析,参考代码片段四、
lockInterruptibly(-1, null);
}
代码片段四、
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 1. 通过代码片段三可以知道,leaseTime为-1 unit=null
// baseWaitTime = 锁的个数(3个) * 1500 = 4500毫秒
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
// leaseTime肯定是-1,所以这里成立,不走else逻辑了,这里的代码写的就感觉很有意思,上面等于-1,下面等于-1还if判断
if (leaseTime == -1) {
// waitTime= 4500毫秒
waitTime = baseWaitTime;
unit = TimeUnit.MILLISECONDS;
} else {
waitTime = unit.toMillis(leaseTime);
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS);
}
// 这里有个死循环逻辑,其实就是不停的去获取锁
while (true) {
// 1. 代码片段五、waitTime = 4500毫秒,leaseTime = -1
if (tryLock(waitTime, leaseTime, unit)) {
return;
}
}
}
代码片段五、
// waitTime = 4500毫秒,leaseTime = -1
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }
// 1. newLeaseTime = -1,其实这里的参数值,都会影响对程序的逻辑以及加锁释放锁
// 1.现在是真的想不通这个逻辑,先等于-1,然后在if判断,和下面的remainTime一样
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
// 当前时间
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
// 这里其实就是返回remainTime,calcLockWaitTime是给他什么参数,返回什么参数,也挺有意思的。
long lockWaitTime = calcLockWaitTime(remainTime);
// 这里会返回一个固定的值0
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
// 1. 拿到锁的迭代器
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// waitTime = 4500毫秒,leaseTime = -1 参数传递进来的,所以会走else逻辑
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
// 这里去lockWaitTime和remainTime中的最小值(lockWaitTime = 0,就是上面那个固定值,remainTime=-1),
// 所以awaitTime=-1,这个-1其实很关键,在tryLock中,-1代表了如果获取锁成功了,就会启动一个lock watchDog,不停的刷新锁的生存时间
long awaitTime = Math.min(lockWaitTime, remainTime);
// 这里就是获取锁,等待awaitTime=4500毫秒,获取锁成功,启动一个watchDog
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
if (remainTime != -1) {
// 如果获取锁成功,当前时间减去获取锁耗费的时间time
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime <= 0) {
// 如果remainTime <0 说明获取锁超时,那么就释放掉这个锁
unlockInner(acquiredLocks);
// 返回false,说明加锁失败
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
代码片段六、
// 释放锁的话,就是依次调用所有的锁的释放的逻辑,lua脚本,同步等待所有的锁释放完毕,才会返回
@Override
public void unlock() {
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());
for (RLock lock : locks) {
// 代码片段七、
futures.add(lock.unlockAsync());
}
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
}
代码片段七、
这里的释放锁的底层lua脚本,和加锁很类似,就不做具体的分析了,一眼看上去,其实还是很简单的
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
总结
其实Redisson中的MultiLock的加锁与释放锁相对来说还是比较简单的,这也归根于Redisson的源码写的比较优雅又关系
最后释放锁的Lua 脚本就不一行一行的分析注释了,只要之前跟着之前的文章,这些lua脚本相对来说还是比较简单的