添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
创建会话
方式一
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
5000,
3000,
retryPolicy);
client.start();
方式二
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
创建节点
// 创建一个节点,初始内容为空
client.create().forPath("path");
// 创建一个节点,附带初始内容
client.create().forPath("path", "init".getBytes(StandardCharsets.UTF_8));
// 创建一个节点,节点类型是临时节点
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
// 创建一个节点,节点类型是临时节点,并自动递归创建父节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("path");
删除节点
// 删除一个节点,注意该接口只能删除子节点
client.delete().forPath("path");
// 删除一个节点,并递归删除其所有子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
// 删除一个节点,强制指定版本进行删除
client.delete().withVersion(Integer.MAX_VALUE).forPath("path");
// 删除一个节点,只要客户端会话有效,curator会在后台持续的进行删除操作,直到节点删除成功
client.delete().guaranteed().forPath("path");
读取节点内容
Stat stat = new Stat();
// 读取节点内容
client.getData().forPath("path");
// 读取节点内容,返回服务端最新的节点状态信息
client.getData().storingStatIn(stat).forPath("path");
// 删除一个节点,强制指定版本进行删除
client.delete().withVersion(Integer.MAX_VALUE).forPath("path");
// 删除一个节点,只要客户端会话有效,curator会在后台持续的进行删除操作,直到节点删除成功
client.delete().guaranteed().forPath("path");
更新节点内容
// 更新节点内容
client.setData().forPath("path");
// 设置节点内容,强制指定版本进行更新
client.setData().withVersion(Integer.MAX_VALUE).forPath("path");
异步接口
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
static CountDownLatch latch = new CountDownLatch(2);
static ExecutorService es = Executors.newFixedThreadPool(3);
@Test
public void asyncTest() throws Exception {
client.start();
System.out.println("Main thread: " + Thread.currentThread().getName());
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("event: code=" + curatorEvent.getResultCode() + ",type=" + curatorEvent.getType());
System.out.println("thread of processResult=" + Thread.currentThread().getName());
latch.countDown();
}
})
.forPath(path, "init".getBytes(StandardCharsets.UTF_8));
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("event: code=" + curatorEvent.getResultCode() + ",type=" + curatorEvent.getType());
System.out.println("thread of processResult=" + Thread.currentThread().getName());
latch.countDown();
}
}, es)
.forPath(path, "init".getBytes(StandardCharsets.UTF_8));
latch.await();
es.shutdown();
}