一 Curator介绍
Curator 是 Netflix 公司开源的一个 Zookeeper 客户端,目前由 Apache 进行维护。与 Zookeeper 原生客户端相比,Curator 的抽象层次更高,功能也更加丰富,是目前 Zookeeper 使用范围最广的 Java 客户端
二 CRUD使用
- 引入maven
<!--Curator 相关依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.10</version>
</dependency>
<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
- 创建连接
在test中新建测试类,使用@Before创建连接,@After释放连接,后面测试代码写在里面用于测试。
public class CuratorExample {
private static final String ZK_ADDRESS = "192.168.59.130:2181";
private static final int SESSION_TIMEOUT = 60 * 1000;
private static final int CONNECTION_TIMEOUT = 15 * 1000;
private static final String NAME_SPACE = "my_space";
private CuratorFramework client;
@Before
public void connect() throws InterruptedException {
client = CuratorFrameworkFactory
.builder()
.connectString(ZK_ADDRESS)
.sessionTimeoutMs(SESSION_TIMEOUT)
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(3*1000,10,60*1000))
.namespace(NAME_SPACE)
.build();
client.start();
client.blockUntilConnected();
}
@After
public void destroy() {
if (client != null) {
client.close();
}
}
}
ExponentialBackoffRetry:是客户端异常断开后重试策略的一种,其他策略还有。
RetryNTimes(重连N次策略)
RetryForever(永远重试策略)
BoundedExponentialBackoffRetry(有边界的基于backoff的重连策略,即,设定最大sleep时间)。
下面我们分析ExponentialBackoffRetry为例子:
//这是ExponentialBackoffRetry类中获取睡眠时间的代码:
//重试时间会根据重试次数retryCount的增加而增加,同时会和maxSleepMs最大等待时间做比较
protected long getSleepTimeMs(int retryCount, long elapsedTimeMs) {
long sleepMs = (long)(this.baseSleepTimeMs * Math.max(1, this.random.nextInt(1 << retryCount + 1)));
if (sleepMs > (long)this.maxSleepMs) {
log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, this.maxSleepMs));
sleepMs = (long)this.maxSleepMs;
}
return sleepMs;
}
- 创建节点
- 没有设置节点类型,创建模式默认为持久化节点
@Test
public void createWithPersistData() throws Exception {
client.create().forPath("/dir1", "data1".getBytes(StandardCharsets.UTF_8));
}
- CreateMode是一个枚举类型,默认类型:持久化
@Test
public void createWithEphemeralData() throws Exception {
client.create().withMode(CreateMode.EPHEMERAL).forPath("/dir2", "data2".getBytes(StandardCharsets.UTF_8));
// 在程序结束前,可以看到/dir2目录,程序结束后会自动删除
TimeUnit.SECONDS.sleep(100);
}
- 创建多级节点 creatingParentsIfNeeded() 表示如果没有父节点 则自动创建父节点,如果使用之前的创建方法,父目录不存在则会抛出异常
@Test
public void createMultiData() throws Exception {
client.create().creatingParentsIfNeeded().forPath("/dir3/node1", "data1".getBytes(StandardCharsets.UTF_8));
}
- 删除节点
- 删除叶子节点,存在子节点会抛出异常,节点不存在也会抛异常
client.delete().forPath("/dir1");
- 删除节点,并且递归删除其所有的子节点,节点不存在也会抛异常
client.delete().deletingChildrenIfNeeded().forPath("/dir1");
- 强制指定版本进行删除
@Test
public void deleteNodeByVersion() throws Exception {
Stat stat = client.checkExists().forPath("/dir1");
if(stat!=null){
client.delete().withVersion(stat.getVersion()).forPath("/dir1");
}
}
- 必须成功的删除 (防止网络抖动,重试删除)
client.delete().guaranteed().forPath("/dir1");
- 读取节点数据
- 读取一个节点的数据内容,返回值是byte[]
byte[] data = client.getData().forPath("/dir1");
- 读取一个节点的数据内容,同时返回stat
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath("/dir1");
- 更新节点数据
- 更新一个节点的数据内容,返回一个Stat实例
Stat stat = client.setData().forPath("/dir1", "data".getBytes());
- 更新一个节点的数据内容,强制指定版本进行更新
Stat stat = client.checkExists().forPath("/dir1");
if(stat!=null){
stat = client.setData().withVersion(stat.getVersion()).forPath("/dir1", "data".getBytes());
}
- 检查节点是否存在,不存在返回null
Stat stat = client.checkExists().forPath("/dir1");
- 查询节点下的子节点列表,返回子节点path集合,非递归查询
List<String> paths = client.getChildren().forPath("/");
- 事务操作,存在一个失败即全部失败
CuratorOp update = client.transactionOp().setData().forPath("/dir1", "data2".getBytes());
CuratorOp delete = client.transactionOp().delete().forPath("/dir1/dir2");
List<CuratorTransactionResult> resultList = client.transaction().forOperations(update, delete);
三 事件监听
- NodeCache
NodeCache用于监听指定ZooKeeper数据节点本身的变化,数据变化或者节点被删除都会触发回调方法。
@Test
public void nodeCache() throws Exception {
String path = "/dir1";
NodeCache cache = new NodeCache(client,path,false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = cache.getCurrentData();
if(currentData==null){
System.out.println("节点被删除");
}else{
String data = new String(currentData.getData());
System.out.println("=====> Node data update, new Data: "+data);
}
}
});
client.setData().forPath(path,"data2".getBytes());
//client.delete().forPath(path);
TimeUnit.SECONDS.sleep(5);
cache.close();
}
- PathChildrenCache
PathChildrenCache是用来监听指定节点 的子节点变化情况。
@Test
public void PathChildrenCache() throws Exception {
String path = "/dir1";
PathChildrenCache cache = new PathChildrenCache(client,path,true);
//如果目录存在子节点,则所有子节点都会触发通知,同时会触发INITIALIZED事件,建议使用
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//较POST_INITIALIZED_EVENT,少了INITIALIZED事件
//cache.start(PathChildrenCache.StartMode.NORMAL);
//如果目录存在子节点,已存在的子节点不触发通知,仅捕获后续发生的变更事件
//cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework ient, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子节点初始化成功");
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
System.out.println("添加子节点路径:"+event.getData().getPath());
System.out.println("子节点数据:"+new String(event.getData().getData()));
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除子节点:"+event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子节点路径:"+event.getData().getPath());
System.out.println("修改子节点数据:"+new String(event.getData().getData()));
}
}
});
//client.create().forPath("/dir1/dir2","data1".getBytes());
TimeUnit.SECONDS.sleep(5);
cache.close();
}
- TreeCache
结合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听。
@Test
public void TreeCache() throws Exception {
String path = "/dir1";
TreeCache cache = new TreeCache(client,path);
cache.start();
//添加错误监听器
cache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
@Override
public void unhandledError(String s, Throwable throwable) {
System.out.println("======> 错误原因:" + throwable.getMessage());
}
});
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
switch (treeCacheEvent.getType()) {
case INITIALIZED:
System.out.println("=====> INITIALIZED : 初始化");
break;
case NODE_ADDED:
System.out.println("=====> NODE_ADDED : "+ treeCacheEvent.getData().getPath() +" 数据:"+ (treeCacheEvent.getData().getData()==null?"":new String(treeCacheEvent.getData().getData())));
break;
case NODE_REMOVED:
System.out.println("=====> NODE_REMOVED : "+ treeCacheEvent.getData().getPath() +" 数据:"+ (treeCacheEvent.getData().getData()==null?"":new String(treeCacheEvent.getData().getData())));
if("/dir1/dir2".equals(treeCacheEvent.getData().getPath())){
throw new RuntimeException("=====> 测试异常监听UnhandledErrorListener");
}
break;
case NODE_UPDATED:
System.out.println("=====> NODE_UPDATED : "+ treeCacheEvent.getData().getPath() +" 数据:"+ (treeCacheEvent.getData().getData()==null?"":new String(treeCacheEvent.getData().getData())));
break;
default:
System.out.println("=====> treeCache Type:" + treeCacheEvent.getType());
break;
}
}
});
client.delete().forPath("/dir1/dir2");
client.delete().forPath("/dir1/dir3");
client.delete().forPath("/dir1/dir4");
TimeUnit.SECONDS.sleep(5);
cache.close();
}
打印信息:
=====> NODE_ADDED : /dir1/dir2 数据:
=====> NODE_ADDED : /dir1/dir3 数据:
=====> NODE_ADDED : /dir1/dir4 数据:data4
=====> INITIALIZED : 初始化
=====> NODE_REMOVED : /dir1/dir2 数据:
======> 错误原因:=====> 测试异常监听UnhandledErrorListener
=====> NODE_REMOVED : /dir1/dir3 数据:
=====> NODE_REMOVED : /dir1/dir4 数据:data4
四 分布式锁
Curator还进一步地提供了非常丰富的分布式锁特性,具体包括:
- InterProcessMutex 分布式可重入互斥锁
- InterProcessReadWriteLock 分布式可重入读写锁
- InterProcessSemaphoreMutex 分布式不可重入互斥锁
- InterProcessSemaphoreV2 分布式信号量
- InterProcessMutex分布式可重入互斥锁
InterProcessMutex是一个分布式的可重入的互斥锁,示例代码如下所示
@Test
public void interProcessMutex() throws Exception {
String lockPath = "/lockPath";
ExecutorService threadPool = Executors.newFixedThreadPool(10);
System.out.println("---------------------- 系统上线 ----------------------");
for(int i=1; i<=3; i++) {
String taskName = "任务#"+i;
Task task = new Task(taskName, client, lockPath);
threadPool.execute(task);
}
TimeUnit.SECONDS.sleep(100);
System.out.println("---------------------- 系统下线 ----------------------");
}
private static class Task implements Runnable {
private String taskName;
private InterProcessMutex lock;
public Task(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
this.lock = new InterProcessMutex(zkClient, zkLockPath);
}
@Override
public void run() {
try{
lock.acquire();
info(taskName + ": 成功获取锁 #1");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
methodA();
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 #1\n");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void methodA() {
try{
lock.acquire();
info(taskName + ": 成功获取锁 #2");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
System.out.println(taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 #2");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
打印结果如下:
---------------------- 系统上线 ----------------------
[17:26:00.714] <pool-4-thread-2> 任务#2: 成功获取锁 #1
[17:26:01.721] <pool-4-thread-2> 任务#2: 成功获取锁 #2
[17:26:02.724] <pool-4-thread-2> 任务#2: 释放锁 #2
[17:26:02.725] <pool-4-thread-2> 任务#2: 释放锁 #1
[17:26:02.736] <pool-4-thread-3> 任务#3: 成功获取锁 #1
[17:26:03.739] <pool-4-thread-3> 任务#3: 成功获取锁 #2
[17:26:04.743] <pool-4-thread-3> 任务#3: 释放锁 #2
[17:26:04.743] <pool-4-thread-3> 任务#3: 释放锁 #1
[17:26:04.749] <pool-4-thread-1> 任务#1: 成功获取锁 #1
[17:26:05.752] <pool-4-thread-1> 任务#1: 成功获取锁 #2
[17:26:06.756] <pool-4-thread-1> 任务#1: 释放锁 #2
[17:26:06.756] <pool-4-thread-1> 任务#1: 释放锁 #1
---------------------- 系统下线 ----------------------
Process finished with exit code 0
- InterProcessReadWriteLock分布式可重入读写锁
InterProcessReadWriteLock是一个分布式可重入读写锁,其中读锁为共享锁、写锁为互斥锁。示例代码如下所示:
/**
* 测试: 读锁为共享锁
*/
@Test
public void testRead() {
String lockPath = "/lockPath";
ExecutorService threadPool = Executors.newFixedThreadPool(10);
System.out.println("\n---------------------- Test 1 : Read ----------------------");
for(int i=1; i<=3; i++) {
String taskName = "读任务#"+i;
Runnable task = new ReadTask(taskName, client, lockPath);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ TimeUnit.SECONDS.sleep(100); } catch (Exception e) {}
}
/**
* 测试: 写锁为互斥锁
*/
@Test
public void testWrite() {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
String lockPath = "/lockPath";
System.out.println("\n---------------------- Test 2 : Write ----------------------");
for(int i=1; i<=3; i++) {
String taskName = "写任务#"+i;
Runnable task = new WriteTask(taskName, client, lockPath);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ TimeUnit.SECONDS.sleep(100); } catch (Exception e) {}
}
/**
* 测试: 读写互斥
*/
@Test
public void testReadWrite() {
String lockPath = "/lockPath";
ExecutorService threadPool = Executors.newFixedThreadPool(10);
System.out.println("\n---------------------- Test 3 : Read Write ----------------------");
for(int i=1; i<=8; i++) {
Runnable task = null;
Boolean isReadTask = new Random().nextBoolean();
if( isReadTask ) {
task = new ReadTask( "读任务#"+i, client, lockPath );
} else {
task = new WriteTask( "写任务#"+i, client, lockPath );
}
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ TimeUnit.SECONDS.sleep(100); } catch (Exception e) {}
}
/**
* 读任务
*/
private static class ReadTask implements Runnable {
private String taskName;
private InterProcessMutex readLock;
public ReadTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
this.readLock = interProcessReadWriteLock.readLock();
}
@Override
public void run() {
try{
readLock.acquire();
info(taskName + ": 成功获取读锁 #1");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放读锁 #1");
try {
readLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 写任务
*/
private static class WriteTask implements Runnable {
private String taskName;
private InterProcessMutex writeLock;
public WriteTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
this.writeLock = interProcessReadWriteLock.writeLock();
}
@Override
public void run() {
try{
writeLock.acquire();
info(taskName + ": 成功获取写锁 #1");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放写锁 #1\n");
try {
writeLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
打印结果如下:
---------------------- Test 1 : Read ----------------------
[17:33:45.827] <pool-4-thread-1> 读任务#1: 成功获取读锁 #1
[17:33:45.827] <pool-4-thread-2> 读任务#2: 成功获取读锁 #1
[17:33:45.827] <pool-4-thread-3> 读任务#3: 成功获取读锁 #1
[17:33:46.834] <pool-4-thread-3> 读任务#3: 释放读锁 #1
[17:33:46.834] <pool-4-thread-1> 读任务#1: 释放读锁 #1
[17:33:46.834] <pool-4-thread-2> 读任务#2: 释放读锁 #1
---------------------- Test 2 : Write ----------------------
[17:37:21.157] <pool-4-thread-2> 写任务#2: 成功获取写锁 #1
[17:37:22.157] <pool-4-thread-2> 写任务#2: 释放写锁 #1
[17:37:22.166] <pool-4-thread-3> 写任务#3: 成功获取写锁 #1
[17:37:23.165] <pool-4-thread-3> 写任务#3: 释放写锁 #1
[17:37:23.171] <pool-4-thread-1> 写任务#1: 成功获取写锁 #1
[17:37:24.174] <pool-4-thread-1> 写任务#1: 释放写锁 #1
---------------------- Test 3 : Read Write ----------------------
[17:41:30.158] <pool-4-thread-3> 读任务#3: 成功获取读锁 #1
[17:41:30.158] <pool-4-thread-4> 读任务#4: 成功获取读锁 #1
[17:41:30.158] <pool-4-thread-2> 读任务#2: 成功获取读锁 #1
[17:41:31.162] <pool-4-thread-3> 读任务#3: 释放读锁 #1
[17:41:31.162] <pool-4-thread-2> 读任务#2: 释放读锁 #1
[17:41:31.162] <pool-4-thread-4> 读任务#4: 释放读锁 #1
[17:41:31.176] <pool-4-thread-5> 写任务#5: 成功获取写锁 #1
[17:41:32.176] <pool-4-thread-5> 写任务#5: 释放写锁 #1
[17:41:32.181] <pool-4-thread-6> 读任务#6: 成功获取读锁 #1
[17:41:32.182] <pool-4-thread-1> 读任务#1: 成功获取读锁 #1
[17:41:33.184] <pool-4-thread-6> 读任务#6: 释放读锁 #1
[17:41:33.184] <pool-4-thread-1> 读任务#1: 释放读锁 #1
[17:41:33.187] <pool-4-thread-8> 写任务#8: 成功获取写锁 #1
[17:41:34.191] <pool-4-thread-8> 写任务#8: 释放写锁 #1
[17:41:34.195] <pool-4-thread-7> 写任务#7: 成功获取写锁 #1
[17:41:35.198] <pool-4-thread-7> 写任务#7: 释放写锁 #1
- 可重入性
由于读锁、写锁分别是基于InterProcessMutex实现的,故这二者自然也是支持可重入的。示例代码可参考上述事例1:InterProcessMutex分布式可重入互斥锁 - 锁升级、锁降级
- 锁升级:指的是读锁升级为写锁。
当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁; - 锁降级:指的是写锁降级为读锁,是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。
锁升级示例代码如下所示:
@Test
public void lockUp() throws Exception {
System.out.println("---------------------- Test 1 : Read -> Write ----------------------\n");
String lockPath = "/lockPath";
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, lockPath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
try {
readLock.acquire();
info("成功获取读锁");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
writeLock.acquire();
info("成功获取写锁");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
readLock.release();
info("成功释放读锁");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
writeLock.release();
info("成功释放写锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
System.out.println("---------------------- 系统下线 ----------------------");
}
打印结果如下:
会一直住,无法获取写锁,所以不能锁升级。
---------------------- Test 1 : Read -> Write ----------------------
[15:26:00.022] <main> 成功获取读锁
锁降级示例代码如下所示:
@Test
public void lockDown() throws Exception {
System.out.println("---------------------- Test 2 : Write -> Read ----------------------\n");
String lockPath = "/lockPath";
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, lockPath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
try {
writeLock.acquire();
info("成功获取写锁");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
readLock.acquire();
info("成功获取读锁");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
writeLock.release();
info("成功释放写锁");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
readLock.release();
info("成功释放读锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
System.out.println("---------------------- 系统下线 ----------------------");
}
打印结果如下:
---------------------- Test 2 : Write -> Read ----------------------
[15:29:36.336] <main> 成功获取写锁
[15:29:37.347] <main> 成功获取读锁
[15:29:38.355] <main> 成功释放写锁
[15:29:39.358] <main> 成功释放读锁
---------------------- 系统下线 ----------------------
- InterProcessSemaphoreMutex分布式不可重入互斥锁
InterProcessSemaphoreMutex互斥锁,示例代码如下所示:
@Test
public void interProcessSemaphoreMutexTest1() throws Exception {
System.out.println("\n---------------------- Test 1 ----------------------");
String lockPath = "/lockPath";
Runnable task = () -> {
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, lockPath);
try{
lock.acquire();
info("成功获取锁 #1");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #1\n");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
};
for(int i=1; i<=3; i++) {
TimeUnit.SECONDS.sleep(1);
}
TimeUnit.SECONDS.sleep(10);
}
打印结果如下:
---------------------- Test 1 ----------------------
[15:40:41.164] <pool-4-thread-3> 成功获取锁 #1
[15:40:42.170] <pool-4-thread-3> 释放锁 #1
[15:40:42.177] <pool-4-thread-1> 成功获取锁 #1
[15:40:43.180] <pool-4-thread-1> 释放锁 #1
[15:40:43.186] <pool-4-thread-2> 成功获取锁 #1
[15:40:44.191] <pool-4-thread-2> 释放锁 #1
Process finished with exit code 0
InterProcessSemaphoreMutex不可重入锁,示例代码如下所示:
@Test
public void interProcessSemaphoreMutexTest2() throws Exception {
System.out.println("\n---------------------- Test 2 ----------------------");
String lockPath = "/lockPath";
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, lockPath);
try{
lock.acquire();
info("成功获取锁 #1");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
lock.acquire();
info("成功获取锁 #2");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
lock.release();
info("释放锁 #1\n");
lock.release();
info("释放锁 #1\n");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}
打印结果如下:会一直卡住,由于不可重入,无法再次获取锁。
---------------------- Test 2 ----------------------
[15:43:07.918] <main> 成功获取锁 #1
- InterProcessSemaphoreV2分布式信号量
可以设置同时获取最大锁并发数:
@Test
public void testInterProcessSemaphoreV2() throws Exception {
String lockPath = "/lockPath";
ExecutorService threadPool = Executors.newFixedThreadPool(10);
int maxLimit = 2;
IntStream.rangeClosed(1,6)
.mapToObj( num -> new UserReq("用户#"+num, client, lockPath, maxLimit) )
.forEach( threadPool::execute );
// 主线程等待所有任务执行完毕
TimeUnit.SECONDS.sleep(10);
System.out.println("---------------------- 系统下线 ----------------------");
}
private static class UserReq implements Runnable {
private String name;
private InterProcessSemaphoreV2 interProcessSemaphoreV2;
private Integer maxLimit;
public UserReq(String name, CuratorFramework zkClient, String zkLockPath, Integer maxLimit) {
this.name = name;
this.maxLimit = maxLimit;
this.interProcessSemaphoreV2 = new InterProcessSemaphoreV2(zkClient, zkLockPath, maxLimit);
}
@Override
public void run() {
try {
// 模拟用户不定时发起请求
TimeUnit.SECONDS.sleep(1);
String msg = name + ": 发起请求";
info(msg);
// 阻塞等待,直到获取许可
Lease lease = interProcessSemaphoreV2.acquire();
info(name + ": 系统开始处理请求");
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
// 用户请求处理完毕,释放许可
interProcessSemaphoreV2.returnLease( lease );
info(name + ": 系统处理完毕");
}catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}
}
结果如下:最多两个并发执行
[16:02:41.587] <pool-4-thread-3> 用户#3: 发起请求
[16:02:41.587] <pool-4-thread-4> 用户#4: 发起请求
[16:02:41.587] <pool-4-thread-2> 用户#2: 发起请求
[16:02:41.587] <pool-4-thread-6> 用户#6: 发起请求
[16:02:41.587] <pool-4-thread-1> 用户#1: 发起请求
[16:02:41.587] <pool-4-thread-5> 用户#5: 发起请求
[16:02:41.670] <pool-4-thread-1> 用户#1: 系统开始处理请求
[16:02:41.679] <pool-4-thread-5> 用户#5: 系统开始处理请求
[16:02:42.677] <pool-4-thread-1> 用户#1: 系统处理完毕
[16:02:42.681] <pool-4-thread-6> 用户#6: 系统开始处理请求
[16:02:42.686] <pool-4-thread-5> 用户#5: 系统处理完毕
[16:02:42.688] <pool-4-thread-2> 用户#2: 系统开始处理请求
[16:02:43.688] <pool-4-thread-6> 用户#6: 系统处理完毕
[16:02:43.690] <pool-4-thread-3> 用户#3: 系统开始处理请求
[16:02:43.694] <pool-4-thread-2> 用户#2: 系统处理完毕
[16:02:43.696] <pool-4-thread-4> 用户#4: 系统开始处理请求
[16:02:44.692] <pool-4-thread-3> 用户#3: 系统处理完毕
[16:02:44.700] <pool-4-thread-4> 用户#4: 系统处理完毕
---------------------- 系统下线 ----------------------