1.ZkClient
ZkClient是Github上⼀个开源的zookeeper客户端,在Zookeeper原⽣API接⼝之上进⾏了包装,是⼀个更易⽤的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能
添加依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency
1.1创建会话
⽤ZkClient可以轻松的创建会话,连接到服务端,zkClient通过对zookeeperAPI内部包装,将这个异步的会话创建过程同步化了.
package com.hust.grid.leesf.zkclient.examples;
import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
public class CreateSession {
/*
创建⼀个zkClient实例来进⾏连接
注意:zkClient通过对zookeeperAPI内部包装,将这个异步的会话创建过程同步化了
*/
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("ZooKeeper session established.");
}
}
1.2创建节点
*ZkClient提供了递归创建节点的接⼝,即其帮助开发者先完成⽗节点的创建,再创建⼦节点
*不同种类的节点有不同create方法
package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Create_Node_Sample {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("ZooKeeper session established.");
//第二个参数,createParents的值设置为true,可以递归创建节点
zkClient.createPersistent("/lg-zkClient/lg-c1",true);
System.out.println("success create znode.");
}
}
1.3删除节点
deleteRecursive(path)
ZkClient提供了递归删除节点的接⼝,即其帮助开发者先删除所有⼦节点(存在),再删除⽗节点
package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Del_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/lg-zkClient/lg-c1";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
zkClient.deleteRecursive(path);
System.out.println("success delete znode.");
}
}
1.4 获取子节点
package com.hust.grid.leesf.zkclient.examples;
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
public class Get_Children_Sample {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
List<String> children = zkClient.getChildren("/lg-zkClient");
System.out.println(children);
//注册监听事件
zkClient.subscribeChildChanges(path, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String>
currentChilds) throws Exception {
System.out.println(parentPath + " 's child changed,
currentChilds:" + currentChilds);
}
});
zkClient.createPersistent("/lg-zkClient");
Thread.sleep(1000);
zkClient.createPersistent("/lg-zkClient/c1");
Thread.sleep(1000);
zkClient.delete("/lg-zkClient/c1");
Thread.sleep(1000);
zkClient.delete(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
运行结果如下
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null
结果表明:
客户端可以对⼀个不存在的节点进⾏⼦节点变更的监听。⼀旦客户端对⼀个节点注册了⼦节点列表变更监听之后,那么当该节点的⼦节点列表发⽣变更时,服务端都会通知客户端,并将最新的⼦节点列表发送给客户端;该节点本身的创建或删除也会通知到客户端。
1.5 判断节点是否存在、获取节点内容 更新 删除
void handleDataChange(String path, Object data)
s:path
o:变化之后的节点内容
当节点内容发生变化时候执行的回调方法
void handleDataDeleted(String s):节点被删除时的回调方法
zkClient.subscribeDataChanges(path, new IZkDataListener() {
/*
s:path
o:变化之后的节点内容
当节点内容发生变化时候执行的回调方法
*/
public void handleDataChange(String path, Object data) throws Exception {
System.out.println(path+"该节点内容被更新,更新后的内容"+data);
}
//节点被删除时的回调方法
public void handleDataDeleted(String s) throws Exception {
System.out.println(s+" 该节点被删除");
}
});
boolean exists = zkClient.exists(path);
//获取节点内容
Object o = zkClient.readData(path);
//更新
zkClient.writeData(path,"4567");
//删除
zkClient.delete(path);
2. Curator客户端
curator是Netflix公司开源的⼀套Zookeeper客户端框架,和ZKClient⼀样,Curator解决了很多Zookeeper客户端⾮常底层的细节开发⼯作,包括连接重连,反复注WatcherNodeExistsException异常等,是最流⾏的Zookeeper客户端之⼀。从编码⻛格上来讲,它提供了基于Fluent的编程⻛格持
添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
fluen风格简介
2.1 创建会话
Curator的创建会话⽅式与原⽣的API和ZkClient的创建⽅式区别很⼤。Curator创建客户端是通过
CuratorFrameworkFactory⼯⼚类来实现的。具体如下:
1.使⽤CuratorFramework这个⼯⼚类的两个静态⽅法来创建⼀个客户端
public static CuratorFramework newClient(String connectString, RetryPolicy
retryPolicy)
public static CuratorFramework newClient(String connectString, int
sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
其中参数RetryPolicy提供重试策略的接⼝,可以让⽤户实现⾃定义的重试策略,默认提供了以下实现,分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、
RetryForever(永远重试策略);两个静态方法底层一样,只是封装程度不同
2.通过调⽤CuratorFramework中的start()⽅法来启动会话
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
5000,1000,retryPolicy);
client.start();
其实进⼀步查看源代码可以得知,其实这两种⽅法内部实现⼀样,只是对外包装成不同的⽅法。它们的底层都是通过第三个⽅法builder来实现的,如果使用fluent风格,不使用提供的静态方法,则如下
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
.connectString("server1:2181,server2:2181,server3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
参数:
namespave("base"):隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相
对/base⽬录进⾏的,这有利于实现不同的Zookeeper的业务之间的隔离
2.2 创建节点
Fluent⻛格的接⼝,创建如下
(1)创建⼀个初始内容为空的节点
Curator默认创建的是持久节点,内容为空。
client.create().forPath(path);
(2)创建⼀个包含内容的节点
Curator和ZkClient不同的是依旧采⽤Zookeeper原⽣API的⻛格,内容使⽤byte[]作为⽅法参数。
client.create().forPath(path,"我是内容".getBytes());
(3)递归创建⽗节点,并选择节点类型
在使⽤Curator 之后,通过调⽤creatingParentsIfNeeded 接⼝,Curator 就能够⾃动地递归创建所有需要的⽗节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPa
th(path);
2.3删除节点
基于Fluent⽅式来进⾏操作,不同类型的操作调⽤ 新增不同的⽅法调⽤即可。
(1)删除⼀个⼦节点
client.delete().forPath(path);
(2)删除节点并递归删除其⼦节点
client.delete().deletingChildrenIfNeeded().forPath(path);
(3)指定版本进⾏删除
如果此版本已经不存在,则删除异常,异常信息如下,org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for
client.delete().withVersion(1).forPath(path);
(4)强制保证删除⼀个节点
只要客户端会话有效,那么Curator会在后台持续进⾏删除操作,直到节点删除成功。⽐如遇到⼀些⽹络异常的情况,此guaranteed的强制删除就会很有效果。
client.delete().guaranteed().forPath(path);
2.4 获取数据
获取节点数据内容API相当简单,同时Curator提供了传⼊⼀个Stat变量的⽅式来存储服务器端返回的最新的节点状态信息
// 普通查询
client.getData().forPath(path);
// 包含状态查询
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
2.5 更新数据
更新数据,如果未传⼊version参数,那么更新当前最新版本,如果传⼊version则更新指定version,如果version已经变更,则抛出异常
// 普通更新
client.setData().forPath(path,"新内容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path,"新内容".getBytes());
版本不⼀致异常信息:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for