ZooKeeper开源客户端Curator的使用

Curator简介

Curator是Netllix公司开源的一套Zookeeper客户端框架,Curator解决了许多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,目前已经成为了Apache的顶级项目,是全世界范围内使用最广泛的Zookeeper客户端之一。
除了封装一些开发人员不需要特别关注的底层细节之外,Curator还在Zookeeper客户端原生API的基础上进行了包装,提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
除此之外,Curator还提供了Zookeeper各种应用场景(Recipe,如共享服务,Master选举机制和分布式计数器等)。

API简单使用

使用Curator的API,首先需要引入Curator的Maven依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.13.0</version>
</dependency>

使用Curator创建会话

public class CreateSession {

    public static void main(String[] args) throws Exception {
        //创建重试策略
        // 参数1:baseSleepTimeMs 初始sleep时间
        // 参数2:maxRetries 最大重试次数
        // 参数3:maxSleepMs 最大sleep时间(这里没有使用到该参数)
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // 参数1:connectString
        // 参数2:sessionTimeoutMs
        // 参数3:connectionTimeoutMs
        // 参数4:retryPolicy
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                "192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181", 5000, 300, retryPolicy);

        client.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

使用Fluent风格的API接口来创建会话

public class FluentCreateSession {

    public static void main(String[] args) throws Exception {

        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(300)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 定义独立命名空间为/base,该客户端对于Zookeeper上的数据节点进行操作都是基于/base节点进行的,从而实现不同业务的隔离
                .build();

        client.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

创建节点

public class CreateNode {

    private static String path = "/zk-book/c1";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.create()
                .creatingParentsIfNeeded()  //自动递归创建父节点,父节点都是持久节点
                .withMode(CreateMode.EPHEMERAL)  //节点属性:临时节点,默认为持久节点
                .forPath(path, "init".getBytes());  //创建一个节点,附带初始内容
    }
}

删除节点

public class DeleteNode {

    private static String path = "/zk-book";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, "init".getBytes());

        // 读取刚创建的节点数据
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);

        client.delete()
                //.guaranteed() // 强制删除,只要客户端会话优效,在后台反复重试,直到节点删除成功
                .deletingChildrenIfNeeded()
                .withVersion(stat.getVersion()) //删除指定版本的节点
                .forPath(path);
    }
}

读取数据

public class GetNodeData {

    private static String path = "/zk-book/c1";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, "init".getBytes());

        // 通过一个Stat变量存储服务端返回的最新的节点状态信息
        Stat stat = new Stat();
        // 读取一个节点的数据内容,并且获取到该节点的stat
        byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
        System.out.println("节点数据:" + new String(bytes));
    }
}

更新数据

public class SetNodeData {

    private static String path = "/zk-book/c1";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, "init".getBytes());

        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);

        // 第一次更新成功
        System.out.println("Success set node for: " + path + " new version: " +
                client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());

        // 第二次更新失败
        try {
            client.setData().withVersion(stat.getVersion()).forPath(path);
        } catch (Exception e) {
            System.out.println("Failed set node due to " + e.getMessage());
        }
    }
}

异步接口

public class AsyncCreateNode {

    private static String path = "/zk-book/asynccreatenode";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    private static CountDownLatch semaphore = new CountDownLatch(2);

    private static ExecutorService executorService = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {
        client.start();

        System.out.println("Main thread: " + Thread.currentThread().getName());

        // 使用自定义的线程池处理回调结果
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                // event[code: 0, type: CREATE]
                System.out.println("event[code: " + curatorEvent.getResultCode() + ", type: " + curatorEvent.getType() + "]");
                // Thread of processResult: pool-3-thread-1
                System.out.println("Thread of processResult: " + Thread.currentThread().getName());

                semaphore.countDown();
            }
        }, executorService).forPath(path, "init".getBytes());

        // 使用ZooKeeper默认的EventThread处理
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                // event[code: -110, type: CREATE]
                System.out.println("event[code: " + curatorEvent.getResultCode() + ", type: " + curatorEvent.getType() + "]");
                // Thread of processResult: main-EventThread
                System.out.println("Thread of processResult: " + Thread.currentThread().getName()); // main-EventThread

                semaphore.countDown();
            }
        }).forPath(path, "init".getBytes());

        semaphore.await();
        executorService.shutdown();
    }
}

典型使用场景

要使用Curator典型场景需要引入Maven依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.13.0</version>
</dependency>
事件监听

ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。Curator引入了Cache来实现对ZooKeeper服务端事件的监听。Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地视图和远程ZooKeeper视图的对比过程。同时Curator能自动为开发人员处理反复事件监听,从而大大简化了原生API开发的繁琐过程。Cache分为两类监听类型:节点监听和子节点监听。

节点监听NodeCache示例:

/**
 * 用于监听指定ZooKeeper数据节点本身的变化
 */
public class NodeCacheSample {

    private static String path = "/zk-book/nodecache";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args)  throws Exception {
        client.start();
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, "init".getBytes());

        // NodeCache: 用于监听指定ZooKeeper数据节点本身的变化
        final NodeCache nodeCache = new NodeCache(client, path, false);

        // true:在第一次启动的时候就会从ZooKeepe上读取对应节点的数据内容
        nodeCache.start(true);

        // 事件处理的回调函数
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("Node data updated, new data: " + new String(nodeCache.getCurrentData().getData()));
            }
        });

        client.setData().forPath(path, "u".getBytes());
        Thread.sleep(1000);
        // 节点被删除,无法触发NodeCacheListener
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

子节点监听PathChildrenCache示例:

/**
* 用于监听指定ZooKeeper数据节点的子节点的变化
* Curator无法监听二级子节点
*/
public class PathChildrenCacheSample {

   private static String path = "/zk-book";

   private static CuratorFramework client = CuratorFrameworkFactory.builder()
           .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
           .sessionTimeoutMs(5000)
           .connectionTimeoutMs(300)
           .retryPolicy(new ExponentialBackoffRetry(1000, 3))
           .build();

   public static void main(String[] args) throws Exception {
       client.start();

       // 对/zk-book节点进行子节点变更事件的监听
       PathChildrenCache cache = new PathChildrenCache(client, path, true);

       cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

       cache.getListenable().addListener(new PathChildrenCacheListener() {
           @Override
           public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
               switch (event.getType()) {
                   case CHILD_ADDED:   // 子节点创建事件
                       System.out.println("CHILD_ADDED: " + event.getData().getPath());
                       break;
                   case CHILD_UPDATED: // 子节点更新事件
                       System.out.println("CHILD_UPDATED: " + event.getData().getPath());
                       break;
                   case CHILD_REMOVED: // 子节点删除节点
                       System.out.println("CHILD_REMOVED: " + event.getData().getPath());
                       break;
                   default:
                       break;
               }
           }
       });

       client.create().withMode(CreateMode.PERSISTENT).forPath(path);
       Thread.sleep(1000);
       client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
       Thread.sleep(1000);
       client.delete().forPath(path + "/c1");
       Thread.sleep(1000);
       client.delete().forPath(path);
       Thread.sleep(Integer.MAX_VALUE);
   }
}
Master选举

大体思路:选择一个根节点,例如/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用ZooKeeper特性,最终只有一台机器创建成功,成功的那台机器就成为Master。
Curator也是基于这个思路,只不过它将节点创建、事件监听和自动选举过程进行了封装,开发人员只需要调用简单的API即可实现Master选举。

public class MasterSelect {

    // Master选举的根节点,本次Master选举都是在该节点下进行的
    private static String master_path = "/curator_recipes_master_path";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws InterruptedException {
        client.start();

        LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
            // 获取Master权力的时候回调该函数
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("成为Master角色");
                Thread.sleep(3000);
                System.out.println("完成Master操作,释放Master权力");
            }
        });

        selector.autoRequeue();
        selector.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}
分布式锁

一个典型的时间戳生成的并发问题:

/**
 * 没有使用分布式锁,生成时间戳,会有重复的生成
 */
public class NoLock {

    public static void main(String[] args) {

        final CountDownLatch down = new CountDownLatch(1);

        // 开启10个线程生成时间戳订单号
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        down.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的订单号是: " + orderNo);
                }
            }).start();
        }

        down.countDown();
    }
}

使用Curator实现分布式锁功能

/**
 * 使用Curator实现分布式锁功能,生成的时间戳不会重复
 * InterProcessMutex
 * lock.acquire()
 * lock.release()
 */
public class DistributedLock {

    private static String lock_path = "/curator_recipes_lock_path";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) {
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client, lock_path);    // InterProcessMutex:互斥锁
        final CountDownLatch down = new CountDownLatch(1);

        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        down.await();
                        lock.acquire(); // 获取分布式锁
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的订单号是: " + orderNo);
                    try {
                        lock.release(); // 释放分布式锁
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        down.countDown();
    }
}
分布式计数器
/**
 * 使用Curator分布式计数器
 * 典型应用场景:统计系统的在线人数
 * 思路:指定一个ZooKeeper数据节点作为计数器,多个应用实例在分布式锁的控制下,通过更新该数据节点的内容来实现计数功能
 */
public class DistributedCounter {

    private static String counter_path = "/curator_recipes_counter_path";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, counter_path, new RetryNTimes(3, 1000));

        AtomicValue<Integer> rc = atomicInteger.add(8);
        System.out.println("Result: " + rc.succeeded());
        System.out.println("最新值: " + rc.postValue());
        System.out.println("原始值: " + rc.preValue());
    }
}
分布式Barrier

方式一:主线程触发释放Barrier: DistributedBarrier

/**
 * 使用Curator实现分布式的CyclicBarrier
 * 主线程触发释放Barrier: DistributedBarrier
 */
public class MainDistributedBarrier {

    private static String barrier_path = "/curator_recipes_barrier_path";

    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        // 初始化分布式barrier
        DistributedBarrier barrier = new DistributedBarrier(client, barrier_path);

        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "号barrier设置");

                        // 设置分布式barrier
                        barrier.setBarrier();
                        // 等待主线程barrier释放
                        barrier.waitOnBarrier();

                        System.err.println("启动...");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();

        }
        Thread.sleep(2000);
        // 主线程释放barrier,触发所有等待该Barrier的线程同时进行各自的业务逻辑
        barrier.removeBarrier();
    }
}

方式二:线程自发触发释放Barrier: DistributedDoubleBarrier

/**
 * 使用Curator实现分布式的CyclicBarrier
 * 线程自发触发释放Barrier: DistributedDoubleBarrier
 * 指定Barrier的成员数阈值,控制同时进入和退出
 */
public class SelfDistributedBarrier {

    private static String barrier_path = "/curator_recipes_barrier2_path";

    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) {
        client.start();

        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 指定进入Barrier的成员数阈值5
                        DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, barrier_path, 5);

                        Thread.sleep(Math.round(Math.random() * 3000));
                        System.out.println(Thread.currentThread().getName() + "号进入barrier");

                        // 处于准备进入状态,一旦达到阈值5,同时触发进入
                        barrier.enter();
                        System.out.println("启动...");
                        Thread.sleep(Math.round(Math.random() * 3000));

                        // 处于准备退出状态,一旦达到阈值5,同时触发退出
                        barrier.leave();
                        System.out.println("退出...");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}
工具

ZKPaths

/**
 * 提供了一些简单的API来构建ZNode路径、递归创建和删除节点
 */
public class ZKPathsUtil {

    private static String path = "/zk-book/zkpathsutil";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        ZooKeeper zooKeeper = client.getZookeeperClient().getZooKeeper();

        System.out.println(ZKPaths.fixForNamespace(path, "/sub"));
        System.out.println(ZKPaths.makePath(path, "sub"));
        System.out.println(ZKPaths.getNodeFromPath(path + "/sub1"));

        PathAndNode pn = ZKPaths.getPathAndNode(path + "/sub1");
        System.out.println(pn.getPath());
        System.out.println(pn.getNode());

        String dir1 = path + "/child1";
        String dir2 = path + "/child2";
        ZKPaths.mkdirs(zooKeeper, dir1);
        ZKPaths.mkdirs(zooKeeper, dir2);
        System.out.println(ZKPaths.getSortedChildren(zooKeeper, path));

        ZKPaths.deleteChildren(zooKeeper, path, true);
    }
}

EnsurePath
EnsurePath采取了静默的节点创建方式,其内部实现就是试图创建指定节点,如果节点已经存在,那么就不进行任何操作,也不对外抛出异常,否则正常创建节点。

/**
 * EnsurePath提供了一种能够确保数据节点存在的机制
 */
public class EnsurePathUtil {
    private static String path = "/zk-book/EnsurePathUtil";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.usingNamespace("zk-book");

        EnsurePath ensurePath = new EnsurePath(path);
        ensurePath.ensure(client.getZookeeperClient());
        ensurePath.ensure(client.getZookeeperClient());

        EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/EnsurePathUtil");
        ensurePath2.ensure(client.getZookeeperClient());
    }
}

TestServer
为了方便开发人员进行ZooKeeper的开发与测试Curator提供了一种建议启动ZooKeeper服务和集群的方法—TestingServer、TestingServer。首先需要引入Maven依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.13.0</version>
</dependency>
/**
 * 在启动一个标准的ZK服务器,用于单元测试
 */
public class TestServer {

    private static String path = "/zookeeper";

    public static void main(String[] args) throws Exception {
        // 初始化ZK测试服务器
        TestingServer testingServer = new TestingServer(2181, new File("F:\\OneDrive\\study\\zookeeper\\testingserver\\data"));

        // 连接ZK测试服务器
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(testingServer.getConnectString())
                .sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        client.start();

        System.out.println(client.getChildren().forPath(path));

        testingServer.close();
    }
}

TestCluster

/**
* 在本地模拟一个ZK集群,用于单元测试
*/
public class TestCluster {

   public static void main(String[] args) throws Exception {
       // 搭建一个三台ZK服务器的集群
       TestingCluster cluster = new TestingCluster(3);
       // 运行集群
       cluster.start();

       Thread.sleep(2000);

       TestingZooKeeperServer leader = null;

       for (TestingZooKeeperServer zs : cluster.getServers()) {
           System.out.print(zs.getInstanceSpec().getServerId() + "-");
           System.out.print(zs.getQuorumPeer().getServerState() + "-");
           System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());

           if (zs.getQuorumPeer().getServerState().equals("leading")) {
               leader = zs;
           }
       }
       // Kill掉leader节点,重新进行master选举
       leader.kill();

       Thread.sleep(30000);

       System.out.println("--After leader kill:");
       for (TestingZooKeeperServer zs : cluster.getServers()) {
           System.out.print(zs.getInstanceSpec().getServerId() + "-");
           System.out.print(zs.getQuorumPeer().getServerState() + "-");
           System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());
       }
       // 关闭集群
       cluster.stop();
   }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352