目前主流有三种zookeeper客户端去操作zookeeper,即zookeeper shell、zookeeper原生zookeeper API、apache curator API。 curator作为一个apache顶级项目,是目前为止最好用的操作zookeeper的(第三方)客户端,其在原生zookeeper API基础上经过封装改造而来,具有易用友好的fluent风格。
相比于原生zookeeper API,curator的易用友好特点表现在:
- curator客户端可以递归创建删除节点(原生zookeeper API创建节点时需要代码显式地逐级创建父节点,删除节点时需要代码显式地逐级删除子节点);
- curator客户端具有自动重连session机制;
- curator的watcher可以触发多次(原生zookeeper API只能触发一次,触发后若要继续监听需要再次设定watcher);
- 其他。
关于zookeeper原生zookeeper API操作zookeeper节点可以参考博客:
https://www.jianshu.com/p/99bd28aad4e6
curator框架包含多个依赖jar,开发人员可以根据自身的设计需求来选择,各依赖jar之间的关系如下:
从上图可知,curator-recipe(菜单)依赖包含了大多数的应用场景,本文就以curator-recipe依赖包作为例子。
pom.xml中添加依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
完整Java 程序:
/**
* curator操作zookeeper的工具类
**/
package com.example.curatorzkaccess;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CuratorTest {
Logger logger = LoggerFactory.getLogger(CuratorTest.class);
// 初始化curator client
public CuratorFramework initCuratorClient(String connectString, String namespace){
// namespace是指根节点,重试策略:重试3次每次间隔10s
// 创建的节点路径:/namespace/given-path
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(connectString)
.namespace(namespace)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(6000)
.retryPolicy(new RetryNTimes(3, 10))
.build();
return curatorFramework;
}
// 创建节点
public void createNode(CuratorFramework curatorClient, String path, String jsonStr) throws Exception {
// 必须先保证节点不存在
if (curatorClient.checkExists().forPath(path) == null){
// 递归创建节点
curatorClient.create().creatingParentsIfNeeded().forPath(path, jsonStr.getBytes());
}
else {
logger.warn(String.format("node already existed:[%s]", path));
}
}
// 读取节点数据
public byte[] getNodeData(CuratorFramework curatorClient, String path) throws Exception {
byte[] data = null;
if (curatorClient.checkExists().forPath(path) != null){
data = curatorClient.getData().forPath(path);
}
else {
logger.warn(String.format("node does not exists:[%s]", path));
}
return data;
}
// 修改节点数据
public void setNodeData(CuratorFramework curatorClient, String path, String jsonStr) throws Exception {
// 必须先保证节点存在
if(curatorClient.checkExists().forPath(path) != null){
curatorClient.setData().forPath(path, jsonStr.getBytes());
}
else{
logger.warn(String.format("node does not exists:[%s]", path));
}
}
// 删除节点
public void deleteNode(CuratorFramework curatorClient, String path) throws Exception {
// 必须先保证节点存在
if(curatorClient.checkExists().forPath(path) != null) {
// 递归删除节点
curatorClient.delete().deletingChildrenIfNeeded().forPath(path);
}
else {
logger.warn(String.format("node does not exists:[%s]", path));
}
}
}
/**
* 主类
**/
package com.example.curatorzkaccess;
import org.apache.curator.framework.CuratorFramework;
public class CuratorMain {
public static void main(String[] args) throws Exception {
String connectString = "localhost:2182";
String namespace = "datalake";
String path = "/metadb/tenant1/meta/instance1/schema1/table1";
String path2 = "/metadb/tenant1/meta/instance1/schema1/table2";
String jsonString = "{name:test}";
CuratorTest curatorTest = new CuratorTest();
CuratorFramework curatorClient = curatorTest.initCuratorClient(connectString, namespace);
// 启动curator client访问zookeeper server的session
curatorClient.start();
curatorTest.createNode(curatorClient, path, jsonString);
curatorTest.createNode(curatorClient, path2, jsonString);
curatorTest.deleteNode(curatorClient, path);
curatorTest.setNodeData(curatorClient, path2, jsonString);
System.out.println(new String(curatorTest.getNodeData(curatorClient, path2), "UTF-8"));
// 关闭session
curatorClient.close();
}
}
执行结果:
在zookeeper shell中查询节点:
[zk: localhost:2181(CONNECTED) 1] get /datalake/metadb/tenant1/meta/instance1/schema1/table2
{name:test}