本篇文章会介绍以下几点:
- 节点的基本操作
- 统一错误监听器
- 不可重入共享锁 & 可重入共享锁【例子 & 核心源码剖析】
- 可重入读写共享锁
- 信号量锁
- 多对象共享锁
简介
Curator基于原生的Zookeeper Api封装提供了更加丰富的功能,如:Leader选举、分布式锁等。
实际开发中,很少场景会直接使用Zookeeper原生Api开发的,这是因为Curator比原生的Api更加易用、功能更强大,另外对于监听回调机制也做了封装(Zookeeper原生 Api 回调监听一次后,后续就不会再回调,需要重新设置回调机制,实现上比较麻烦)
有意思的是,Curator的英文解释是博物馆馆长/动物园园长,和 Zookeeper 这个动物园管理员身份相近,但职责和功能更强大,这个命名十分符合这种关系,不得不说,歪果仁真会玩 :)
Curator的官方地址是:https://curator.apache.org/index.html
Curator Maven 依赖
先引入相关的依赖,接下来开始熟悉相关的Api操作吧~
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<!-- 屏蔽日志输出,便于本地学习 -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.6</version>
</dependency>
节点的基本操作
/**
* @Author: MuggleLee
* @Date: 2022/2/9 下午12:30
*/
public class CuratorUtils {
private static CuratorFramework client = null;
public static CuratorFramework getClient() {
if (client != null) {
return client;
}
client = CuratorFrameworkFactory.builder()
.connectString("zookeeper地址")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
//连接超时时间,默认15秒
.connectionTimeoutMs(15 * 1000)
//会话超时时间,默认60秒
.sessionTimeoutMs(60 * 1000)
.namespace("curator-study")
.build();
client.start();
return client;
}
/**
* 新建节点
*/
public static void create(CuratorFramework client, CreateMode createMode, String path, String content) throws Exception {
client.create().withMode(createMode).forPath(path, content.getBytes());
}
/**
* 查看节点
*/
public static String getData(CuratorFramework client, String path) throws Exception {
byte[] bytes = client.getData().forPath(path);
return new String(bytes);
}
/**
* 更新节点
*/
public static void setData(CuratorFramework client, String path, String content) throws Exception {
client.setData().forPath(path, content.getBytes());
}
/**
* 删除节点
*/
public static void delete(CuratorFramework client, String path) throws Exception {
client.delete().forPath(path);
}
public static void main(String[] args) throws Exception {
String path = "/curator-demo";
CuratorFramework client = CuratorUtils.getClient();
create(client, CreateMode.PERSISTENT, path, "curator-test");
String data = getData(client, path);
System.out.println(data);
setData(client, path, "new content");
String newData = getData(client, path);
System.out.println(newData);
delete(client, path);
}
}
通过上面的例子,就能够发现,实现节点的crud是如此简单。不过这只是最基础简单的需求,如果项目上有其他需求,比如新建TTL节点等等,看下Curator的Api很快就能上手~
分布式锁
为了保证分布式环境下的原子性,这时候就轮到分布式锁上场啦!
业界中,分布式锁最常用的无非就是 Curator 和 Redisson ,从性能上来说,正常情况下 Redisson 会优于 Curator,因为 Curator 需要创建临时节点,客户端运气不好的话,只是请求到 Follower 节点,那么 Follower 节点还要请求转发到 Leader 节点并执行事务,肯定比 Redisson 这种纯内存操作的慢;但从可靠性和有效性来说,Curator 会比 Redisson 好,因为据说 Redisson 底层使用 RedLock 算法保证原子性,但在极端情况下却无法保证,另外加锁节点宕机后,可能最长等待30秒后才能释放锁,而通过 Curator 分布式加锁创建的节点都是临时节点,当客户端下线后,临时节点也会消失,时效性好。
上面只是简单的比较 Curator 和 Redisson,想对分布式锁有更深入的读者,可以参考这篇 分布式锁看这篇就够了
接下来,先从官方文档看下 Curator 分布式锁是怎么玩的
由官方文档描述可知,Curator分布式锁有5种类型:
- Shared Reentrant Lock:共享可重入锁
- Shared Lock :共享锁
- Shared Reentrant Read Write Lock:共享可重入读写锁
- Shared Semaphore:共享信号量
- Multi Shared Lock:多对象共享锁
这几种类型的锁都有一个共同点:Sharded。官方的一段描述:Fully distributed locks that are globally synchronous(全局同步的分布式锁),这也是意味着全部的客户端都可见,可以知道锁当前是否有被占用。
那么这几种类型的锁,有什么区别,在什么场景下使用合适呢?
其实从锁名称就可以猜到,Sharded Reentrant Lock 和 Sharded Lock 区别在与是否可重入
;当存在读多写少
的场景,使用 Shared Reentrant Read Write Lock 会更加合适;当需要限制获取某个资源的数量
,可以使用 Shared Semaphore 设置信号量;如果需要同时对多个对象同时加锁
,比如发红包的时候,除了要对红包这个对象加锁,还要给账号钱包这个对象加锁,这样才能保证在抢红包的同时,账号钱包的钱也不会因为其他并发线程修改金额导致数据不一致了,这时候就可以使用 Multi Sharded Lock。
ps.不太了解可重入锁概念的同学,可以简单理解为同一个客户端可重复的加锁;如果对信号量的使用不太了解的话,可以简单理解类似令牌的东西,不能超过设定的数量线程获取到锁,作用是保护共享资源。
可重入锁和信号量锁可分别看下我过往写过的文章:
多线程之ReentrantLock源码剖析
多线程之并发类CountDownLatch、CyclicBarrier和Semaphor的使用
统一错误监听器
查看官方 Curator 分布式锁的时候,发现每个类型的分布式锁介绍,都有一处相同的说明
Error Handling
It is strongly recommended that you add aConnectionStateListener
and watch for SUSPENDED and LOST state changes. If a SUSPENDED state is reported you cannot be certain that you still hold the lock unless you subsequently receive a RECONNECTED state. If a LOST state is reported it is certain that you no longer hold the lock.
大概的意思就是,开发者可以实现一个统一错误处理的监听器,监听器用于监听 SUSPENDED
和 LOST
状态,当发生 SUSPENDED 或 LOST 会告知我们相应的信息。
监听器的实现并不难,实现 ConnectionStateListener
接口就行,示例代码如下:
public class CuratorConnectionStateListener implements ConnectionStateListener {
// 节点路径
private String zkPath;
// 节点内容
private String content;
public CuratorConnectionStateListener(String zkPath, String content) {
this.zkPath = zkPath;
this.content = content;
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST) {
System.out.println("====== 连接丢失 ======");
while(true){
try {
System.err.println("====== 尝试重新连接 ======");
if(client.getZookeeperClient().blockUntilConnectedOrTimedOut()){
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkPath, content.getBytes("UTF-8"));
break;
}
} catch (InterruptedException e) {
break;
} catch (Exception e){
// 可以根据业务,在这里设置告警、故障恢复等设置
e.printStackTrace();
}
}
} else if (newState == ConnectionState.CONNECTED) {
System.out.println("====== 新建连接 ======");
} else if (newState == ConnectionState.RECONNECTED) {
System.out.println("====== 重新连接 ======");
}
}
}
不可重入共享锁 & 可重入共享锁
不可重入共享锁核心类:InterProcessSemaphoreMutex
可重入共享锁核心类:InterProcessMutex
/**
* 验证目的:
* 1.同一个线程是否可重复获取锁
* 2.不同线程获取锁是否互斥
*/
public static void main(String[] args) throws Exception {
String lockPath = "/sharded-lock";
CuratorFramework client = CuratorUtils.getClient();
// 添加连接状态的监听器
CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "不可重入的共享锁");
client.getConnectionStateListenable().addListener(connectionStateListener);
// A场景:使用不可重入共享锁
InterProcessSemaphoreMutex thread1Locks = new InterProcessSemaphoreMutex(client, lockPath);
InterProcessSemaphoreMutex thread2Locks = new InterProcessSemaphoreMutex(client, lockPath);
// B场景:使用可重入共享锁
// InterProcessMutex thread1Locks = new InterProcessMutex(client, lockPath);
// InterProcessMutex thread2Locks = new InterProcessMutex(client, lockPath);
new Thread(() -> {
try {
if (thread1Locks.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + ":第一次加锁成功");
if (thread1Locks.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + ":重复加锁成功");
} else {
System.out.println(Thread.currentThread().getName() + ":重复加锁失败");
}
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "执行业务逻辑,花费2秒");
} else {
System.out.println(Thread.currentThread().getName() + ":第一次加锁失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
thread1Locks.release();
System.out.println(Thread.currentThread().getName() + ":释放锁成功!");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程1").start();
// 休眠的目的是让线程1先执行
Thread.sleep(500);
new Thread(() -> {
try {
if (thread2Locks.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + ":加锁成功");
} else {
System.out.println(Thread.currentThread().getName() + ":加锁失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (thread2Locks.isAcquiredInThisProcess()) {
thread2Locks.release();
System.out.println(Thread.currentThread().getName() + ":释放锁成功!");
} else {
System.out.println(Thread.currentThread().getName() + ":释放锁失败!原因是当前锁对象并没有占用锁");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程2").start();
}
输出结果:
线程1:第一次加锁成功
线程1:重复加锁失败
线程2:加锁失败
线程2:释放锁失败!原因是当前锁对象并没有占用锁
线程1执行业务逻辑,花费2秒
线程1:释放锁成功!
从输出结果可以看出,同一个线程不允许重复加锁,不同的线程会存在竞争关系,相互互斥。
接下来,关注一下细节部分。使用共享锁的核心类是InterProcessSemaphoreMutex
,从命名上来看,我第一次看官方文档是一脸懵逼的,为什么有 Semaphore 这一个词?难不成底层是用 JUC 的 Semaphore 并发锁保证锁互斥的?
结论先行:实际上并不是使用 JUC 的 Semaphore,不过底层实现是相似的思想,通过类似信号量的概念,限制加锁的次数。
咱们先看下,当执行 new InterProcessSemaphoreMutex(client, path) 的时候,做了什么事情。为了验证上面的猜想,省略部分源码。
public class InterProcessSemaphoreMutex implements InterProcessLock {
// 省略其他源码
public InterProcessSemaphoreMutex(CuratorFramework client, String path) {
// 省略其他源码
this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
}
}
再重点看下InterProcessSemaphoreV2
对象创建的流程
public class InterProcessSemaphoreV2 {
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {
this(client, path, maxLeases, null);
}
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
// 注意这两行代码,锁对象是通过 InterProcessMutex 类创建的,而 maxLeases 是通过初始化时传 1 进来
lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
// 省略其他源码
}
}
从 InterProcessSemaphoreV2 对象初始化过程可知,InterProcessSemaphoreV2 实际上组合使用了 InterProcessMutex 可重入共享锁,通过 maxLeases
参数限制只能成功加锁一次,使用起来就类似信号量,限制加锁次数。所以从共享锁的命名来看,使用 Semaphore 就感觉合理了...
底层的加锁逻辑,实际上都是调用 InterProcessMutex 这个类的方法,这里先卖个关子,先不看加锁的逻辑,而是看下 release() 方法是怎样释放锁的。
public void release() throws Exception {
// 省略其他源码
Lease lease = this.lease;
this.lease = null;
lease.close();
}
lease 的英文解释是租约、租契,咱们可以理解为加锁的租契,加锁成功。释放锁的源码中,将lease设置为null,就相当于丢弃了这个租契,那么再次获取锁的时候发现 lease 并没有租契,就相当于没有线程占用,就可以尝试竞争加锁了。
至于 InterProcessMutex 内的加锁逻辑,先看完 InterProcessMutex 的例子再具体看下源码
只需要将上面例子中的A场景代码注释调,然后打开B场景的代码,其实就是使用可重入共享锁,先看下执行后的输出结果为:
线程1:第一次加锁成功
线程1:重复加锁成功
线程2:加锁失败
线程2:释放锁失败!原因是当前锁对象并没有占用锁
线程1执行业务逻辑,花费2秒
线程1:释放锁成功!
结论先行:使用 InterProcessMutex 可以让同一个线程重复加锁,但不同线程获取锁是互斥。这个也是符合前面 InterProcessMutex 类的描述!
剖析可重入锁底层加锁和释放锁源码
ps. 本文为了篇幅和着重核心代码,所以会省略部分源码。强烈建议读者可以根据上述例子debug源码,这样才能从被动吸收变成主动吸收知识,这样对于底层逻辑的印象和理解才会更深入~
在 InterProcessMutex 内会为每一个第一次加锁成功的线程封装为一个 LockData 对象,并将当前 LockData 对象加入 ConcurrentMap 中
// 并发集合,key是加锁成功的线程对象,vlaue则是lockData对象
private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData = Maps.newConcurrentMap();
/**
* 加锁的对象。owningThread代表加锁的线程对象、lockPath代表在ZK中加锁的路径、lockCount代表加锁的次数(为可重入铺垫)
*/
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
接下来再看一下,调用 acquire 方法实际上做了什么
public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
// 从集合中获取当前线程的lockData对象
InterProcessMutex.LockData lockData = threadData.get(currentThread);
// 如果集合已存在当前线程的lockData对象,证明当前线程已加锁成功且还没释放锁
if (lockData != null) {
// re-entering,可重入,直接返回true代表重复加锁成功
lockData.lockCount.incrementAndGet();
return true;
}
// 走到这里,就证明当前线程还没成功加锁。那么就尝试加锁,如果加锁失败返回null
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
// 加锁成功,返回的路径不是null;先初始化lockData对象,然后插入到threadData集合中
if (lockPath != null) {
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
// 走到这里,就证明lockPath为null,加锁失败返回false
return false;
}
结合注释,可以知道可重入锁实际上是判断当前线程是否存在 LockData 对象,存在的话就自增 LockData 对象内的 lockCount 参数;不存在就走 attemptLock
方法尝试加锁,加锁成功就返回 true 告知客户端加锁成功。现在,咱们逐渐接近真相了,加锁最终逻辑就是在attemptLock
方法内!
// 省略部分源码,请结合源码综合查看
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
String ourPath = null;
boolean hasTheLock = false;
// 给当前尝试加锁的客户端创建临时的顺序节点,并返回节点的路径
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 尝试加锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
if (hasTheLock) {
return ourPath;
}
return null;
}
// 省略部分源码,请结合源码综合查看
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
List<String> children = getSortedChildren();
// 当前顺序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 尝试获取锁
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
// 从顺序节点中获取上一个节点
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized (this) {
// 给上一个节点添加监听器
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
// 调用wait方法等待
}
}
}
return haveTheLock;
}
再进一步看下 driver.getsTheLock
的流程(StandardLockInternalsDriver 类)
// 省略部分源码
public class StandardLockInternalsDriver implements LockInternalsDriver {
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
// 当前节点在这个子目录下的顺序
int ourIndex = children.indexOf(sequenceNodeName);
// maxLeases是初始化 InterProcessMutex 的时候传的,值固定为1。
// 什么情况下 getsTheLock 为 true?
// 只有当 ourIndex 为0才可能是true,也就是在当前目录下,临时顺序节点的索引是0才算加锁成功
// 换言之,只有排在目录的第一位才能加锁成功;当释放锁的时候会删除第一位临时顺序节点,这样的话才可以让下一位成功加锁
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
}
到此,加锁的主流程源码都已剖析了。最后再看下释放锁的主流程:
// 省略部分源码
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = threadData.get(currentThread);
try {
// 释放锁
internals.releaseLock(lockData.lockPath);
} finally {
threadData.remove(currentThread);
}
}
// 省略部分源码
final void releaseLock(String lockPath) throws Exception {
// 删掉路径上的临时节点
deleteOurPath(lockPath);
}
释放锁的流程相对加锁就简单多了,重点就是删除临时节点,当临时节点删除后就会触发回调,通知下一个节点加锁。
通过前面一大轮的核心源码剖析,再回头想想整个加锁和释放锁核心逻辑,小结一下:
创建 InterProcessMutex对象时,固定 maxLeases 为 1,表明只有一个线程加锁成功
为每一个尝试加锁的线程创建临时顺序节点
如果当前顺序节点处于该目录下的第一位,则加锁成功;否则就调用wait方法等待,且给上一个临时顺序节点添加监听器(上一个节点释放后回调)
当释放锁的时候会删除临时顺序节点,并回调通知下一个临时顺序节点,这样的话,下一个临时顺序节点就能感知自己可以尝试加锁
上面剖析了核心流程,实际上很少直接调用 acquire 方法请求加锁,因为线程请求 acquire 方法就会阻塞线程,在并发量大的情况下就会导致服务器内大量线程被占用,最终会耗尽资源导致服务异常。一般的业务场景都会设置请求加锁的过期时间,这样在短时间内无法加锁成功就会走加锁失败的逻辑,就不会在一直等待。感兴趣的读者可以细看acquire(long time, TimeUnit unit)
的流程,核心流程和上面说的一样,只不过是加上一些时间判断,如果超时就会删除临时顺序节点而已。
可重入读写共享锁
/**
* 读写锁简单例子
*/
public static void main(String[] args) throws Exception {
String lockPath = "/read-write-sharded-lock";
CuratorFramework client = CuratorUtils.getClient();
// 添加连接状态的监听器
CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "读写锁");
client.getConnectionStateListenable().addListener(connectionStateListener);
// 可重入读写共享锁
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, lockPath);
InterProcessMutex readLock = readWriteLock.readLock();
InterProcessMutex writeLock = readWriteLock.writeLock();
new Thread(() -> {
try {
readLock.acquire();
System.out.println(Thread.currentThread().getName() + "获取读锁成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (readLock.isOwnedByCurrentThread()) {
readLock.release();
System.out.println(Thread.currentThread().getName() + "释放读锁成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程1").start();
new Thread(() -> {
try {
writeLock.acquire();
System.out.println(Thread.currentThread().getName() + "获取写锁成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (writeLock.isOwnedByCurrentThread()) {
writeLock.release();
System.out.println(Thread.currentThread().getName() + "释放写锁成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程2").start();
}
输出结果:
线程1获取读锁成功
线程1释放读锁成功
线程2获取写锁成功
线程2释放写锁成功
上面只是读写锁的简单例子,但读写锁真实的使用场景会稍微复杂点,比如下面四个问题,需要从源码中找到答案!
不同线程获取读锁是否互斥?答:不互斥
不同线程获取写锁是否互斥?答:互斥
同一线程,可不可以先获取读锁、再获取写锁?答:不可以。如果同一线程先加读锁成功,后再请求加写锁,会阻塞加写锁请求
同一线程,可不可以先获取写锁、再获取读锁?答:可以。如果同一线程先加写锁成功,后再请求读锁,并不会阻塞读锁的加锁请求
如果对上述结论有疑问的读者,可以带着疑问去阅读源码。接下来结合下面的部分源码剖析以上四种结论在源码内怎么实现的!
可重入读写共享锁底层加锁和释放锁源码
InterProcessReadWriteLock 内创建了一个继承 InterProcessMutex 的 InternalInterProcessMutex 内部类,然后分别创建了readMutex
和writeMutex
两个对象。各位有没有发现什么?读写锁加锁逻辑还是通过 InterProcessMutex 呀!所以只要理解上面 InterProcessMute 的流程,理解读写锁的核心逻辑就很简单啦~
private final InterProcessMutex readMutex;
private final InterProcessMutex writeMutex;
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
// 初始化写锁
writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(
client,
basePath,
WRITE_LOCK_NAME,// 设置节点名字
lockData,
1, // 写锁的 maxLeases 设置为1
new InterProcessReadWriteLock.SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);
// 初始化读锁
readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(
client,
basePath,
READ_LOCK_NAME,// 设置节点名字
lockData,
Integer.MAX_VALUE, // 读锁的 maxLeases 设置为无限大
new InterProcessReadWriteLock.SortingLockInternalsDriver() {
// 当执行getsTheLock加锁的时候实际执行到这里
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return readLockPredicate(children, sequenceNodeName);
}
}
);
}
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
// 如果当前线程拥有写锁,则当前线程请求读锁也成功。留意下面初始化 PredicateResults 对象的 getsTheLock 参数为true,就代表加读锁成功
if (writeMutex.isOwnedByCurrentThread()) {
return new PredicateResults(null, true);
}
// 省略部分源码
}
// 注意:这里继承 InterProcessMutex
private static class InternalInterProcessMutex extends InterProcessMutex {
// 省略源码
}
以下说明的前提是,先成功获取锁,但都没有执行到释放锁的期间,有其他线程并发的请求加锁操作。如果先获取锁再释放锁,其他线程肯定可以加锁成功呀!
不同线程获取读锁不互斥。实际上是因为读锁的 maxLeases 设置为 Integer.MAX_VALUE,当请求获取加读锁的时候,执行 StandardLockInternalsDriver 类的 getsTheLock 方法,根据 boolean getsTheLock = ourIndex < maxLeases 判断是否加锁成功,ourIndex 指的是当前临时顺序节点在目录下的索引位置,肯定是小于 maxLeases,所以返回 true,代表加读锁成功。
不同线程获取写锁互斥。细心的读者肯定留意到创建写锁的时候,maxLeases 设置为 1,同样执行到 boolean getsTheLock = ourIndex < maxLeases 这一行的时候,除非当前临时顺序节点处于该目录下的第一位,否则都会返回 false 代表加锁失败。这也说明了只有一个且最先获取写锁的线程能成功加锁,其他的加写锁都会阻塞。
同一线程,不可以先获取读锁、再获取写锁。在时间顺序上来说,先创建读锁的临时顺序节点,再到写锁的临时顺序节点。当执行加写锁的操作时候,执行 boolean getsTheLock = ourIndex < maxLeases 发现 ourIndex 为 1,而写锁的 maxLeases 也是 1,所以返回 false 加写锁失败。
同一线程,可以先获取写锁、再获取读锁。在时间顺序上来说,先创建写锁的临时顺序节点,再创建读锁的临时顺序节点。加写锁成功后,再到执行加读锁的操作,因为读锁的 maxLeases 的值无限大嘛,所以 boolean getsTheLock = ourIndex < maxLeases 会返回 true 加读锁成功。
信号量锁
/**
* maxLease代表可成功加锁的数量
*/
public static void main(String[] args) {
String lockPath = "/semaphore-sharded-lock";
int maxLeases = 1;
CuratorFramework client = CuratorUtils.getClient();
// 添加连接状态的监听器
CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "读写锁");
client.getConnectionStateListenable().addListener(connectionStateListener);
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, maxLeases);
new Thread(() -> {
Lease lease = null;
try {
lease = semaphore.acquire(100, TimeUnit.MILLISECONDS);
if (null != lease) {
System.out.println(Thread.currentThread().getName() + "获取共享锁成功");
} else {
System.out.println(Thread.currentThread().getName() + "获取共享锁失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != lease) {
semaphore.returnLease(lease);
System.out.println(Thread.currentThread().getName() + "释放共享锁成功");
}
}
}, "线程1").start();
new Thread(() -> {
Lease lease = null;
try {
lease = semaphore.acquire(100, TimeUnit.MILLISECONDS);
if (null != lease) {
System.out.println(Thread.currentThread().getName() + "获取共享锁成功");
} else {
System.out.println(Thread.currentThread().getName() + "获取共享锁失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != lease) {
semaphore.returnLease(lease);
System.out.println(Thread.currentThread().getName() + "释放共享锁成功");
}
}
}, "线程2").start();
}
输出结果:
线程1获取共享锁失败
线程2获取共享锁成功
线程2释放共享锁成功
看一下初始化 InterProcessSemaphoreV2 的源码,我们还是可以发现底层依然使用的是 InterProcessMutex 类加锁!
// 省略部分源码
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
}
剩下加锁释放锁的源码就不用多说了,只要理解上面 maxLeases 在 InterProcessMutex 的妙用,就可以知道在 InterProcessSemaphoreV2 内是通过 maxLeases 控制获取锁的数量。不过信号量锁在获取锁和释放锁的方式上和其它共享锁稍微有点不一样,释放锁的时候是调用 returnLease 方法,参数是从加锁成功后返回的 Lease 对象,不过其释放锁的底层原理和上面的 release 方法大同小异,就不再赘述了。
多对象共享锁
public static void main(String[] args) throws Exception {
String lockPathA = "/multi-sharded-lock-A";
String lockPathB = "/multi-sharded-lock-B";
String lockPathC = "/multi-sharded-lock-C";
CuratorFramework client = CuratorUtils.getClient();
// 单对象共享锁
InterProcessMutex processMutex = new InterProcessMutex(client, lockPathC);
// 多对象共享锁
InterProcessMultiLock multiLock = new InterProcessMultiLock(client, Arrays.asList(lockPathA, lockPathB));
new Thread(() -> {
try {
multiLock.acquire();
System.out.println(Thread.currentThread().getName() + "获取锁成功");
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
multiLock.release();
System.out.println(Thread.currentThread().getName() + "释放锁成功");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程1").start();
// 为了让线程1优先执行
Thread.sleep(1000);
new Thread(() -> {
try {
if (processMutex.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + "获取锁成功");
} else {
System.out.println(Thread.currentThread().getName() + "获取锁失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (processMutex.isOwnedByCurrentThread()) {
processMutex.release();
System.out.println(Thread.currentThread().getName() + "释放锁成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "线程2").start();
}
输出结果:
线程1获取锁成功
线程2获取锁成功
线程2释放锁成功
线程1释放锁成功
只要将 InterProcessMutex 加锁的对象改为 lockPathA 或 lockPathB 就可以发现,线程2加锁失败。这也能印证使用InterProcessMultiLock
多对象共享锁能够限制对象只能有一个线程占有。接下来简单看下底层的原理,其实底层依旧用到的是 InterProcessMutex 共享锁,加锁和释放锁逻辑上面都剖析过,就不再赘述。咱们只需要关注初始化 InterProcessMultiLock 的时候做了什么事情。
// 初始化 InterProcessMultiLock
public InterProcessMultiLock(CuratorFramework client, List<String> paths) {
this(makeLocks(client, paths));
}
// 分别对多个对象加上 InterProcessMutex 共享锁
private static List<InterProcessLock> makeLocks(CuratorFramework client, List<String> paths) {
ImmutableList.Builder<InterProcessLock> builder = ImmutableList.builder();
for (String path : paths) {
InterProcessLock lock = new InterProcessMutex(client, path);
builder.add(lock);
}
return builder.build();
}
// locks用于记录加锁的对象,之后的加锁和释放锁都会根据这个对象循环加锁、释放锁
public InterProcessMultiLock(List<InterProcessLock> locks) {
this.locks = ImmutableList.copyOf(locks);
}
简单来说,就是将多个对象分别用 InterProcessMutex 加锁,锁住资源;然后在加锁和释放锁的时候,循环这些对象加锁或释放锁就可以了!
总结:
这篇文章重点介绍了 Curator 的几种分布式锁使用和核心源码。不难发现,其实 Curator 只是在 ZK 的基础上扩展了更多功能,其底层其实就是通过临时顺序节点来控制加锁的顺序和锁的释放。灵活运用临时顺序节点的特性,加上 Curator 内设置 maxLeases 等熟悉,演变出可重入锁、读写锁、信号量锁等类型的分布式锁。另外,本文重点介绍了 InterProcessMutex
的核心源码,只要掌握这个类,其它的分布式锁也能快速了解其底层原理。在实际工作中,可以根据业务自定义类来封装 Curator 等核心类,用于扩展适合自身业务的场景。
如果觉得文章不错的话,麻烦点个赞哈,你的鼓励就是我的动力!对于文章有哪里不清楚或者有误的地方,欢迎在评论区留言~
参考资料:
Curator 官网:https://curator.apache.org/curator-recipes/index.html