4 Zookeeper Curator客户端使用

一 Curator介绍

Curator 是 Netflix 公司开源的一个 Zookeeper 客户端,目前由 Apache 进行维护。与 Zookeeper 原生客户端相比,Curator 的抽象层次更高,功能也更加丰富,是目前 Zookeeper 使用范围最广的 Java 客户端

二 CRUD使用

  1. 引入maven
 <!--Curator 相关依赖-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.1.0</version>
    <exclusions>
        <exclusion>
            <artifactId>zookeeper</artifactId>
            <groupId>org.apache.zookeeper</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.10</version>
</dependency>

<!--日志-->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.21</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.21</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.10</version>
    <scope>test</scope>
</dependency>
  1. 创建连接
    在test中新建测试类,使用@Before创建连接,@After释放连接,后面测试代码写在里面用于测试。
public class CuratorExample {

    private static final String ZK_ADDRESS = "192.168.59.130:2181";
    private static final int SESSION_TIMEOUT = 60 * 1000;
    private static final int CONNECTION_TIMEOUT = 15 * 1000;
    private static final String NAME_SPACE = "my_space";

    private CuratorFramework client;

    @Before
    public void connect() throws InterruptedException {
        client = CuratorFrameworkFactory
                .builder()
                .connectString(ZK_ADDRESS)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .connectionTimeoutMs(CONNECTION_TIMEOUT)
                .retryPolicy(new ExponentialBackoffRetry(3*1000,10,60*1000))
                .namespace(NAME_SPACE)
                .build();
        client.start();
        client.blockUntilConnected();
    }

    @After
    public void destroy() {
        if (client != null) {
            client.close();
        }
    }
}

ExponentialBackoffRetry:是客户端异常断开后重试策略的一种,其他策略还有。
RetryNTimes(重连N次策略)
RetryForever(永远重试策略)
BoundedExponentialBackoffRetry(有边界的基于backoff的重连策略,即,设定最大sleep时间)。

下面我们分析ExponentialBackoffRetry为例子:

//这是ExponentialBackoffRetry类中获取睡眠时间的代码:
//重试时间会根据重试次数retryCount的增加而增加,同时会和maxSleepMs最大等待时间做比较
protected long getSleepTimeMs(int retryCount, long elapsedTimeMs) {
    long sleepMs = (long)(this.baseSleepTimeMs * Math.max(1, this.random.nextInt(1 << retryCount + 1)));
    if (sleepMs > (long)this.maxSleepMs) {
        log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, this.maxSleepMs));
        sleepMs = (long)this.maxSleepMs;
    }
    return sleepMs;
}
  1. 创建节点
  • 没有设置节点类型,创建模式默认为持久化节点
@Test
public void createWithPersistData() throws Exception {
    client.create().forPath("/dir1", "data1".getBytes(StandardCharsets.UTF_8));
}
  • CreateMode是一个枚举类型,默认类型:持久化
@Test
public void createWithEphemeralData() throws Exception {
    client.create().withMode(CreateMode.EPHEMERAL).forPath("/dir2", "data2".getBytes(StandardCharsets.UTF_8));
    // 在程序结束前,可以看到/dir2目录,程序结束后会自动删除
    TimeUnit.SECONDS.sleep(100);
}
  • 创建多级节点 creatingParentsIfNeeded() 表示如果没有父节点 则自动创建父节点,如果使用之前的创建方法,父目录不存在则会抛出异常
@Test
public void createMultiData() throws Exception {
    client.create().creatingParentsIfNeeded().forPath("/dir3/node1", "data1".getBytes(StandardCharsets.UTF_8));
}
  1. 删除节点
  • 删除叶子节点,存在子节点会抛出异常,节点不存在也会抛异常
client.delete().forPath("/dir1");
  • 删除节点,并且递归删除其所有的子节点,节点不存在也会抛异常
client.delete().deletingChildrenIfNeeded().forPath("/dir1");
  • 强制指定版本进行删除
@Test
public void deleteNodeByVersion() throws Exception {
    Stat stat = client.checkExists().forPath("/dir1");
    if(stat!=null){
        client.delete().withVersion(stat.getVersion()).forPath("/dir1");
    }
}
  • 必须成功的删除 (防止网络抖动,重试删除)
client.delete().guaranteed().forPath("/dir1");
  1. 读取节点数据
  • 读取一个节点的数据内容,返回值是byte[]
byte[] data = client.getData().forPath("/dir1");
  • 读取一个节点的数据内容,同时返回stat
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath("/dir1");
  1. 更新节点数据
  • 更新一个节点的数据内容,返回一个Stat实例
Stat stat = client.setData().forPath("/dir1", "data".getBytes());
  • 更新一个节点的数据内容,强制指定版本进行更新
Stat stat = client.checkExists().forPath("/dir1");
if(stat!=null){
    stat = client.setData().withVersion(stat.getVersion()).forPath("/dir1", "data".getBytes());
}
  1. 检查节点是否存在,不存在返回null
Stat stat = client.checkExists().forPath("/dir1");
  1. 查询节点下的子节点列表,返回子节点path集合,非递归查询
List<String> paths = client.getChildren().forPath("/");
  1. 事务操作,存在一个失败即全部失败
CuratorOp update = client.transactionOp().setData().forPath("/dir1", "data2".getBytes());
CuratorOp delete = client.transactionOp().delete().forPath("/dir1/dir2");
List<CuratorTransactionResult> resultList = client.transaction().forOperations(update, delete);

三 事件监听

  1. NodeCache
    NodeCache用于监听指定ZooKeeper数据节点本身的变化,数据变化或者节点被删除都会触发回调方法。
@Test
public void nodeCache() throws  Exception {
    String path = "/dir1";
    NodeCache cache = new NodeCache(client,path,false);
    cache.start(true);
    cache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            ChildData currentData = cache.getCurrentData();
            if(currentData==null){
                System.out.println("节点被删除");
            }else{
                String data = new String(currentData.getData());
                System.out.println("=====> Node data update, new Data: "+data);
            }
        }
    });

    client.setData().forPath(path,"data2".getBytes());
    //client.delete().forPath(path);
    TimeUnit.SECONDS.sleep(5);
    cache.close();
}
  1. PathChildrenCache
    PathChildrenCache是用来监听指定节点 的子节点变化情况。
@Test
public void PathChildrenCache() throws Exception {
    String path = "/dir1";

    PathChildrenCache cache = new PathChildrenCache(client,path,true);
    //如果目录存在子节点,则所有子节点都会触发通知,同时会触发INITIALIZED事件,建议使用
    cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    //较POST_INITIALIZED_EVENT,少了INITIALIZED事件
    //cache.start(PathChildrenCache.StartMode.NORMAL);
    //如果目录存在子节点,已存在的子节点不触发通知,仅捕获后续发生的变更事件
    //cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
    
    cache.getListenable().addListener(new PathChildrenCacheListener() {
        public void childEvent(CuratorFramework ient, PathChildrenCacheEvent event) throws Exception {
            if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
                System.out.println("子节点初始化成功");
            }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
                System.out.println("添加子节点路径:"+event.getData().getPath());
                System.out.println("子节点数据:"+new String(event.getData().getData()));
            }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
                System.out.println("删除子节点:"+event.getData().getPath());
            }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                System.out.println("修改子节点路径:"+event.getData().getPath());
                System.out.println("修改子节点数据:"+new String(event.getData().getData()));
            }
        }
    });

    //client.create().forPath("/dir1/dir2","data1".getBytes());
    TimeUnit.SECONDS.sleep(5);
    cache.close();
}
  1. TreeCache
    结合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听。
@Test
public void TreeCache() throws Exception {
    String path = "/dir1";
    TreeCache cache = new TreeCache(client,path);
    cache.start();
    //添加错误监听器
    cache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
        @Override
        public void unhandledError(String s, Throwable throwable) {
            System.out.println("======>  错误原因:" + throwable.getMessage());
        }
    });

    cache.getListenable().addListener(new TreeCacheListener() {
        @Override
        public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
            switch (treeCacheEvent.getType()) {
                case INITIALIZED:
                    System.out.println("=====> INITIALIZED :  初始化");
                    break;
                case NODE_ADDED:
                    System.out.println("=====> NODE_ADDED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ (treeCacheEvent.getData().getData()==null?"":new String(treeCacheEvent.getData().getData())));
                    break;
                case NODE_REMOVED:
                    System.out.println("=====> NODE_REMOVED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ (treeCacheEvent.getData().getData()==null?"":new String(treeCacheEvent.getData().getData())));
                    if("/dir1/dir2".equals(treeCacheEvent.getData().getPath())){
                        throw new RuntimeException("=====> 测试异常监听UnhandledErrorListener");
                    }
                    break;
                case NODE_UPDATED:
                    System.out.println("=====> NODE_UPDATED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ (treeCacheEvent.getData().getData()==null?"":new String(treeCacheEvent.getData().getData())));
                    break;
                default:
                    System.out.println("=====> treeCache Type:" + treeCacheEvent.getType());
                    break;
            }
        }
    });

    client.delete().forPath("/dir1/dir2");
    client.delete().forPath("/dir1/dir3");
    client.delete().forPath("/dir1/dir4");

    TimeUnit.SECONDS.sleep(5);
    cache.close();
}

打印信息:

=====> NODE_ADDED : /dir1/dir2  数据:
=====> NODE_ADDED : /dir1/dir3  数据:
=====> NODE_ADDED : /dir1/dir4  数据:data4
=====> INITIALIZED :  初始化
=====> NODE_REMOVED : /dir1/dir2  数据:
======>  错误原因:=====> 测试异常监听UnhandledErrorListener
=====> NODE_REMOVED : /dir1/dir3  数据:
=====> NODE_REMOVED : /dir1/dir4  数据:data4

四 分布式锁

Curator还进一步地提供了非常丰富的分布式锁特性,具体包括:

  • InterProcessMutex 分布式可重入互斥锁
  • InterProcessReadWriteLock 分布式可重入读写锁
  • InterProcessSemaphoreMutex 分布式不可重入互斥锁
  • InterProcessSemaphoreV2 分布式信号量
  1. InterProcessMutex分布式可重入互斥锁
    InterProcessMutex是一个分布式的可重入的互斥锁,示例代码如下所示
@Test
public void interProcessMutex() throws Exception {
    String lockPath = "/lockPath";

    ExecutorService threadPool = Executors.newFixedThreadPool(10);
    System.out.println("---------------------- 系统上线 ----------------------");
    for(int i=1; i<=3; i++) {
        String taskName = "任务#"+i;
        Task task = new Task(taskName, client, lockPath);
        threadPool.execute(task);
    }

    TimeUnit.SECONDS.sleep(100);
    System.out.println("---------------------- 系统下线 ----------------------");
}

private static class Task implements Runnable {
    private String taskName;

    private InterProcessMutex lock;

    public Task(String taskName, CuratorFramework zkClient, String zkLockPath) {
        this.taskName = taskName;
        this.lock = new InterProcessMutex(zkClient, zkLockPath);
    }

    @Override
    public void run() {
        try{
            lock.acquire();
            info(taskName + ": 成功获取锁 #1");
            // 模拟业务耗时
            TimeUnit.SECONDS.sleep(1);
            methodA();
        } catch (Exception e) {
            System.out.println( taskName + ": Happen Exception: " + e.getMessage());
        } finally {
            info(taskName + ": 释放锁 #1\n");
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void methodA() {
        try{
            lock.acquire();
            info(taskName + ": 成功获取锁 #2");

            // 模拟业务耗时
            TimeUnit.SECONDS.sleep(1);
        } catch (Exception e) {
            System.out.println(taskName + ": Happen Exception: " + e.getMessage());
        } finally {
            info(taskName + ": 释放锁 #2");
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

private static void info(String msg) {
    String time = formatter.format(LocalTime.now());
    String thread = Thread.currentThread().getName();
    String log = "["+time+"] "+ " <"+ thread +"> " + msg;
    System.out.println(log);
}

打印结果如下:

---------------------- 系统上线 ----------------------
[17:26:00.714]  <pool-4-thread-2> 任务#2: 成功获取锁 #1
[17:26:01.721]  <pool-4-thread-2> 任务#2: 成功获取锁 #2
[17:26:02.724]  <pool-4-thread-2> 任务#2: 释放锁 #2
[17:26:02.725]  <pool-4-thread-2> 任务#2: 释放锁 #1

[17:26:02.736]  <pool-4-thread-3> 任务#3: 成功获取锁 #1
[17:26:03.739]  <pool-4-thread-3> 任务#3: 成功获取锁 #2
[17:26:04.743]  <pool-4-thread-3> 任务#3: 释放锁 #2
[17:26:04.743]  <pool-4-thread-3> 任务#3: 释放锁 #1

[17:26:04.749]  <pool-4-thread-1> 任务#1: 成功获取锁 #1
[17:26:05.752]  <pool-4-thread-1> 任务#1: 成功获取锁 #2
[17:26:06.756]  <pool-4-thread-1> 任务#1: 释放锁 #2
[17:26:06.756]  <pool-4-thread-1> 任务#1: 释放锁 #1

---------------------- 系统下线 ----------------------

Process finished with exit code 0
  1. InterProcessReadWriteLock分布式可重入读写锁
    InterProcessReadWriteLock是一个分布式可重入读写锁,其中读锁为共享锁、写锁为互斥锁。示例代码如下所示:
/**
 * 测试: 读锁为共享锁
 */
@Test
public void testRead() {
    String lockPath = "/lockPath";
    ExecutorService threadPool = Executors.newFixedThreadPool(10);
    System.out.println("\n---------------------- Test 1 : Read ----------------------");
    for(int i=1; i<=3; i++) {
        String taskName = "读任务#"+i;
        Runnable task = new ReadTask(taskName, client, lockPath);
        threadPool.execute( task );
    }
    // 主线程等待所有任务执行完毕
    try{ TimeUnit.SECONDS.sleep(100); } catch (Exception e) {}
}

/**
 * 测试: 写锁为互斥锁
 */
@Test
public void testWrite() {
    ExecutorService threadPool = Executors.newFixedThreadPool(10);
    String lockPath = "/lockPath";
    System.out.println("\n---------------------- Test 2 : Write ----------------------");
    for(int i=1; i<=3; i++) {
        String taskName = "写任务#"+i;
        Runnable task = new WriteTask(taskName, client, lockPath);
        threadPool.execute( task );
    }
    // 主线程等待所有任务执行完毕
    try{ TimeUnit.SECONDS.sleep(100);  } catch (Exception e) {}
}

/**
 * 测试: 读写互斥
 */
@Test
public void testReadWrite() {
    String lockPath = "/lockPath";
    ExecutorService threadPool = Executors.newFixedThreadPool(10);

    System.out.println("\n---------------------- Test 3 : Read Write ----------------------");
    for(int i=1; i<=8; i++) {
        Runnable task = null;

        Boolean isReadTask = new Random().nextBoolean();
        if( isReadTask ) {
            task = new ReadTask( "读任务#"+i, client, lockPath );
        } else {
            task = new WriteTask( "写任务#"+i, client, lockPath );
        }
        threadPool.execute( task );
    }
    // 主线程等待所有任务执行完毕
    try{ TimeUnit.SECONDS.sleep(100); } catch (Exception e) {}
}

/**
 * 读任务
 */
private static class ReadTask implements Runnable {
    private String taskName;

    private InterProcessMutex readLock;

    public ReadTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
        this.taskName = taskName;
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
        this.readLock = interProcessReadWriteLock.readLock();
    }

    @Override
    public void run() {
        try{
            readLock.acquire();
            info(taskName + ": 成功获取读锁 #1");
            // 模拟业务耗时
            TimeUnit.SECONDS.sleep(1);
        } catch (Exception e) {
            System.out.println( taskName + ": Happen Exception: " + e.getMessage());
        } finally {
            info(taskName + ": 释放读锁 #1");
            try {
                readLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 写任务
 */
private static class WriteTask implements Runnable {
    private String taskName;

    private InterProcessMutex writeLock;

    public WriteTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
        this.taskName = taskName;
        InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
        this.writeLock = interProcessReadWriteLock.writeLock();
    }

    @Override
    public void run() {
        try{
            writeLock.acquire();
            info(taskName + ": 成功获取写锁 #1");
            // 模拟业务耗时
            TimeUnit.SECONDS.sleep(1);
        } catch (Exception e) {
            System.out.println( taskName + ": Happen Exception: " + e.getMessage());
        } finally {
            info(taskName + ": 释放写锁 #1\n");
            try {
                writeLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

private static void info(String msg) {
    String time = formatter.format(LocalTime.now());
    String thread = Thread.currentThread().getName();
    String log = "["+time+"] "+ " <"+ thread +"> " + msg;
    System.out.println(log);
}

打印结果如下:

---------------------- Test 1 : Read ----------------------
[17:33:45.827]  <pool-4-thread-1> 读任务#1: 成功获取读锁 #1
[17:33:45.827]  <pool-4-thread-2> 读任务#2: 成功获取读锁 #1
[17:33:45.827]  <pool-4-thread-3> 读任务#3: 成功获取读锁 #1
[17:33:46.834]  <pool-4-thread-3> 读任务#3: 释放读锁 #1
[17:33:46.834]  <pool-4-thread-1> 读任务#1: 释放读锁 #1
[17:33:46.834]  <pool-4-thread-2> 读任务#2: 释放读锁 #1

---------------------- Test 2 : Write ----------------------
[17:37:21.157]  <pool-4-thread-2> 写任务#2: 成功获取写锁 #1
[17:37:22.157]  <pool-4-thread-2> 写任务#2: 释放写锁 #1

[17:37:22.166]  <pool-4-thread-3> 写任务#3: 成功获取写锁 #1
[17:37:23.165]  <pool-4-thread-3> 写任务#3: 释放写锁 #1

[17:37:23.171]  <pool-4-thread-1> 写任务#1: 成功获取写锁 #1
[17:37:24.174]  <pool-4-thread-1> 写任务#1: 释放写锁 #1

---------------------- Test 3 : Read Write ----------------------
[17:41:30.158]  <pool-4-thread-3> 读任务#3: 成功获取读锁 #1
[17:41:30.158]  <pool-4-thread-4> 读任务#4: 成功获取读锁 #1
[17:41:30.158]  <pool-4-thread-2> 读任务#2: 成功获取读锁 #1
[17:41:31.162]  <pool-4-thread-3> 读任务#3: 释放读锁 #1
[17:41:31.162]  <pool-4-thread-2> 读任务#2: 释放读锁 #1
[17:41:31.162]  <pool-4-thread-4> 读任务#4: 释放读锁 #1
[17:41:31.176]  <pool-4-thread-5> 写任务#5: 成功获取写锁 #1
[17:41:32.176]  <pool-4-thread-5> 写任务#5: 释放写锁 #1

[17:41:32.181]  <pool-4-thread-6> 读任务#6: 成功获取读锁 #1
[17:41:32.182]  <pool-4-thread-1> 读任务#1: 成功获取读锁 #1
[17:41:33.184]  <pool-4-thread-6> 读任务#6: 释放读锁 #1
[17:41:33.184]  <pool-4-thread-1> 读任务#1: 释放读锁 #1
[17:41:33.187]  <pool-4-thread-8> 写任务#8: 成功获取写锁 #1
[17:41:34.191]  <pool-4-thread-8> 写任务#8: 释放写锁 #1

[17:41:34.195]  <pool-4-thread-7> 写任务#7: 成功获取写锁 #1
[17:41:35.198]  <pool-4-thread-7> 写任务#7: 释放写锁 #1
  1. 可重入性
    由于读锁、写锁分别是基于InterProcessMutex实现的,故这二者自然也是支持可重入的。示例代码可参考上述事例1:InterProcessMutex分布式可重入互斥锁
  2. 锁升级、锁降级
  • 锁升级:指的是读锁升级为写锁。
    当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;
  • 锁降级:指的是写锁降级为读锁,是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。
    锁升级示例代码如下所示:
@Test
public void lockUp() throws Exception {
    System.out.println("---------------------- Test 1 : Read -> Write ----------------------\n");
    String lockPath = "/lockPath";

    InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, lockPath);
    InterProcessMutex readLock = interProcessReadWriteLock.readLock();
    InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
    try {
        readLock.acquire();
        info("成功获取读锁");
        // 模拟业务耗时
        TimeUnit.SECONDS.sleep(1);

        writeLock.acquire();
        info("成功获取写锁");
        // 模拟业务耗时
        TimeUnit.SECONDS.sleep(1);

        readLock.release();
        info("成功释放读锁");
        // 模拟业务耗时
        TimeUnit.SECONDS.sleep(1);

        writeLock.release();
        info("成功释放写锁");
    } catch (Exception e) {
        System.out.println("Happen Exception: " + e.getMessage());
    }
    System.out.println("---------------------- 系统下线 ----------------------");
}

打印结果如下:
会一直住,无法获取写锁,所以不能锁升级。

---------------------- Test 1 : Read -> Write ----------------------

[15:26:00.022]  <main> 成功获取读锁

锁降级示例代码如下所示:

@Test
public void lockDown() throws Exception {
    System.out.println("---------------------- Test 2 : Write -> Read ----------------------\n");
    String lockPath = "/lockPath";
    InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, lockPath);
    InterProcessMutex readLock = interProcessReadWriteLock.readLock();
    InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();

    try {
        writeLock.acquire();
        info("成功获取写锁");
        // 模拟业务耗时
        TimeUnit.SECONDS.sleep(1);

        readLock.acquire();
        info("成功获取读锁");
        // 模拟业务耗时
        TimeUnit.SECONDS.sleep(1);

        writeLock.release();
        info("成功释放写锁");
        // 模拟业务耗时
        TimeUnit.SECONDS.sleep(1);

        readLock.release();
        info("成功释放读锁");
    } catch (Exception e) {
        System.out.println("Happen Exception: " + e.getMessage());
    }

    System.out.println("---------------------- 系统下线 ----------------------");
}

打印结果如下:

---------------------- Test 2 : Write -> Read ----------------------

[15:29:36.336]  <main> 成功获取写锁
[15:29:37.347]  <main> 成功获取读锁
[15:29:38.355]  <main> 成功释放写锁
[15:29:39.358]  <main> 成功释放读锁
---------------------- 系统下线 ----------------------
  1. InterProcessSemaphoreMutex分布式不可重入互斥锁
    InterProcessSemaphoreMutex互斥锁,示例代码如下所示:
@Test
public void interProcessSemaphoreMutexTest1() throws Exception {
    System.out.println("\n---------------------- Test 1 ----------------------");
    String lockPath = "/lockPath";
    Runnable task = () -> {
        InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, lockPath);

        try{
            lock.acquire();
            info("成功获取锁 #1");
            // 模拟业务耗时
            TimeUnit.SECONDS.sleep(1);
        } catch (Exception e) {
            System.out.println("Happen Exception: " + e.getMessage());
        } finally {
            info("释放锁 #1\n");
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };

    for(int i=1; i<=3; i++) {
        TimeUnit.SECONDS.sleep(1);
    }
    TimeUnit.SECONDS.sleep(10);
}

打印结果如下:

---------------------- Test 1 ----------------------
[15:40:41.164]  <pool-4-thread-3> 成功获取锁 #1
[15:40:42.170]  <pool-4-thread-3> 释放锁 #1

[15:40:42.177]  <pool-4-thread-1> 成功获取锁 #1
[15:40:43.180]  <pool-4-thread-1> 释放锁 #1

[15:40:43.186]  <pool-4-thread-2> 成功获取锁 #1
[15:40:44.191]  <pool-4-thread-2> 释放锁 #1

Process finished with exit code 0

InterProcessSemaphoreMutex不可重入锁,示例代码如下所示:

@Test
public void interProcessSemaphoreMutexTest2() throws Exception {
    System.out.println("\n---------------------- Test 2 ----------------------");
    String lockPath = "/lockPath";
    InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, lockPath);

    try{
        lock.acquire();
        info("成功获取锁 #1");
        // 模拟业务耗时
        TimeUnit.SECONDS.sleep(1);

        lock.acquire();
        info("成功获取锁 #2");
        // 模拟业务耗时
        TimeUnit.SECONDS.sleep(1);

        lock.release();
        info("释放锁 #1\n");

        lock.release();
        info("释放锁 #1\n");
    } catch (Exception e) {
        System.out.println("Happen Exception: " + e.getMessage());
    }
}

打印结果如下:会一直卡住,由于不可重入,无法再次获取锁。

---------------------- Test 2 ----------------------
[15:43:07.918]  <main> 成功获取锁 #1
  1. InterProcessSemaphoreV2分布式信号量
    可以设置同时获取最大锁并发数:
@Test
public void testInterProcessSemaphoreV2() throws Exception {
    String lockPath = "/lockPath";
    ExecutorService threadPool = Executors.newFixedThreadPool(10);
    int maxLimit = 2;

    IntStream.rangeClosed(1,6)
            .mapToObj( num -> new UserReq("用户#"+num, client, lockPath, maxLimit) )
            .forEach( threadPool::execute );

    // 主线程等待所有任务执行完毕
    TimeUnit.SECONDS.sleep(10);
    System.out.println("---------------------- 系统下线 ----------------------");
}

private static class UserReq implements Runnable {

    private String name;

    private InterProcessSemaphoreV2 interProcessSemaphoreV2;

    private Integer maxLimit;

    public UserReq(String name, CuratorFramework zkClient, String zkLockPath, Integer maxLimit) {
        this.name = name;
        this.maxLimit = maxLimit;
        this.interProcessSemaphoreV2 = new InterProcessSemaphoreV2(zkClient, zkLockPath, maxLimit);
    }

    @Override
    public void run() {
        try {
            // 模拟用户不定时发起请求
            TimeUnit.SECONDS.sleep(1);
            String msg = name + ": 发起请求";
            info(msg);

            // 阻塞等待,直到获取许可
            Lease lease = interProcessSemaphoreV2.acquire();

            info(name + ": 系统开始处理请求");
            // 模拟业务耗时
            TimeUnit.SECONDS.sleep(1);

            // 用户请求处理完毕,释放许可
            interProcessSemaphoreV2.returnLease( lease );
            info(name + ": 系统处理完毕");
        }catch (Exception e) {
            System.out.println("Happen Exception: " + e.getMessage());
        }
    }
}

结果如下:最多两个并发执行

[16:02:41.587]  <pool-4-thread-3> 用户#3: 发起请求
[16:02:41.587]  <pool-4-thread-4> 用户#4: 发起请求
[16:02:41.587]  <pool-4-thread-2> 用户#2: 发起请求
[16:02:41.587]  <pool-4-thread-6> 用户#6: 发起请求
[16:02:41.587]  <pool-4-thread-1> 用户#1: 发起请求
[16:02:41.587]  <pool-4-thread-5> 用户#5: 发起请求
[16:02:41.670]  <pool-4-thread-1> 用户#1: 系统开始处理请求
[16:02:41.679]  <pool-4-thread-5> 用户#5: 系统开始处理请求
[16:02:42.677]  <pool-4-thread-1> 用户#1: 系统处理完毕
[16:02:42.681]  <pool-4-thread-6> 用户#6: 系统开始处理请求
[16:02:42.686]  <pool-4-thread-5> 用户#5: 系统处理完毕
[16:02:42.688]  <pool-4-thread-2> 用户#2: 系统开始处理请求
[16:02:43.688]  <pool-4-thread-6> 用户#6: 系统处理完毕
[16:02:43.690]  <pool-4-thread-3> 用户#3: 系统开始处理请求
[16:02:43.694]  <pool-4-thread-2> 用户#2: 系统处理完毕
[16:02:43.696]  <pool-4-thread-4> 用户#4: 系统开始处理请求
[16:02:44.692]  <pool-4-thread-3> 用户#3: 系统处理完毕
[16:02:44.700]  <pool-4-thread-4> 用户#4: 系统处理完毕
---------------------- 系统下线 ----------------------
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容