一、CountDownLatch基本原理
- countDownLatch最基本的原理其实就是,现在有4个客户端,分别是A、B、C、D,客户端A进行加锁后,设置三个线程来获取锁,那么,必须让接下来的三个客户端BCD都获取锁成功后,客户端A的逻辑才会继续向下走
- 如果说,指定3个客户端获取锁,获取锁的客户端数量没有到达3的话,客户端A是不会逻辑是不会向下走的,会被阻塞住
源码
代码片段一、demo
public static void main(String[] args) throws Exception {
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://192.168.0.107:7001")
.addNodeAddress("redis://192.168.0.107:7002")
.addNodeAddress("redis://192.168.0.110:7003")
.addNodeAddress("redis://192.168.0.110:7004")
.addNodeAddress("redis://192.168.0.111:7005")
.addNodeAddress("redis://192.168.0.111:7006");
final RedissonClient redisson = Redisson.create(config);
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
// 这里会设置几个客户端来获取锁成功的数量,代码片段二、
latch.trySetCount(3);
System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]设置了必须有3个线程执行countDown,进入等待中。。。");
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
public void run() {
try {
System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]在做一些操作,请耐心等待。。。。。。");
Thread.sleep(3000);
RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch”);
// 代码片段三、
localLatch.countDown();
System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]执行countDown操作");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 代码片段四、
latch.await();
System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]收到通知,有3个线程都执行了countDown操作,可以继续往下走");
}
代码片段二、RedissonCountDownLatch
- 参数:
KEYS[1]= “anyCountDownLatch”
ARGV[2] = 3,其实就是count参数的值
@Override
public boolean trySetCount(long count) {
// 这里设置的客户端获取锁的个数为3
return get(trySetCountAsync(count));
}
// count = 3
@Override
public RFuture<Boolean> trySetCountAsync(long count) {
return commandExecutor.evalWriteAsync(getName(),
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//1.exists anyCountDownLatch 客户端A进来不存在,
// 进入if逻辑
"if redis.call('exists', KEYS[1]) == 0 then “
//1.set anyCountDownLatch 3
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); “
//1.返回1代表成功
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()),
newCountMessage, count);
}
代码片段三、RedissonCountDownLatch
参数
KEYS[1] = “anyCountDownLatch”
@Override
public void countDown() {
get(countDownAsync());
}
@Override
public RFuture<Void> countDownAsync() {
return commandExecutor.evalWriteAsync(getName(),
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//1.decr anyCountDownLatch ,其实就是anyCountDownLatch这KEY对应的值,
// 第一次是3,减去1,变成2,
// 这样的话,后面还允许2个客户端获取锁
"local v = redis.call('decr', KEYS[1]);” +
//1.如果v,第一次执行后为2小于等于0的话,就直接删除key,所以可以看到,
// 当执行到第三个客户端的时候,这里才会成立
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
}
代码片段四、RedissonCountDownLatch
public void await() throws InterruptedException {
RFuture<RedissonCountDownLatchEntry> future = subscribe();
try {
commandExecutor.syncSubscription(future);
// 这里就是说,如果我们业务逻辑里设置的成功获取锁的客户端为3,如果三个客户端都已经成功获取锁,
//那么KEY就不存在了,如代码片段三中的分析,而如果获取锁的客户端没有达到3的话,这里其实就会进入到一个死循环
// 不停的等待,直到KEY的值为0,参会继续走下面的逻辑,否则就会一直阻塞在这里
while (getCount() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
entry.getLatch().await();
}
}
} finally {
unsubscribe(future);
}
}
三、demo执行结果图
- 刚开始main线程设置必须有三个线程/客户端执行countDown,也就是获取锁成功
- 接下来三个线程/客户端成功的获取了锁
- 最后三个线程获取锁执行,执行countDown逻辑后,主线程才会继续执行,否则就会一直阻塞住
四、总结
- 到此为止,Redisson的源码基本上已经分析差不多了,其实还有一些环境没有发出来,因为一篇文章不想太长,所以后面会陆陆续续的把redisson源码的其他部分发出来
- 接下来就要分析zk的分布式锁了,其实对比Redis分布式锁和zk的分布式锁的优缺点,全方面的进行对比之后,在我们的实际业务开发中,我们才会知道,到底哪个分布式锁更加适合我们,因为,没有最好,只有最合适。
- 大家一起努力,加油。