1、ZkClient简介
ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能;目前已经应用到了很多项目中,比如Dubbo、Kafka、Helix;
Github:https://github.com/sgroschupf/zkclient
Maven依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version></version>
</dependency>
或者
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version></version>
</dependency>
2、ZkClient组件
IZKConnection:是一个ZkClient与Zookeeper之间的一个适配器;在代码里直接使用的是ZKClient,实质上还是委托了zookeeper来处理了。
在ZKClient中,根据事件类型,分为
- 节点事件(数据事件),对应的事件处理器是IZKDataListener;
- 子节点事件,对应的事件处理器是IZKChildListener;
- Session事件,对应的事件处理器是IZKStatusListener;
ZkEventThread:是专门用来处理事件的线程
3、API介绍
- 启动ZKClient:在创建ZKClient对象时,就完成了到ZooKeeper服务器连接的建立
1、启动时,制定好connection string,连接超时时间,序列化工具等
2、创建并启动eventThread,用于接收事件,并调度事件监听器Listener的执行
3、连接到Zookeeper服务器,同时将ZKClient自身作为默认的Watcher
- 为节点注册Watcher
Zookeeper 原始API的三个方法:getData,getChildren、exists,ZKClient都提供了相应的代理方法,比如exists,
hasListeners是看有没有与该数据节点绑定的listener
所以,默认情况下,都会自动的为指定的path注册watcher,并且是默认的watcher(ZKClient),那么怎样才能让hasListeners值为true呢,也就是怎么才能为path绑定Listener呢?
ZKClient提供了订阅功能,一个新建的会话,只需要在取得响应的数据节点后,调用subscribeXXX就可以订阅上相应的事件了。
- Zookeeper的CURD(节点的增删查改)
Zookeeper中提供的变更操作有:节点的创建、删除,节点数据的修改
1、创建操作,节点分为4种,所以ZKClient分别为他们提供了相应的代理
2、删除节点操作
3、修改节点数据
updateDataSerialized:修改已系列化的数据;执行过程是,先读取数据,然后DataUpdater对数据修改,最后调用writeData将修改后的数据发送给服务端;
writeDataReturnStat:写数据并返回数据的状态;
4、客户端处理变更流程
ZKClient是默认的Watcher(ZKClient实现了Watcher接口),并且在为各个数据节点注册的Watcher都是这个默认的Watcher,那么该如何将各种事件通知给相应的Listener呢:
1、判断变更类型,变更类型分为state变更、ChildNode变更、NodeData变更;
2、取出与path关联的Listeners,并为每一个Listener创建一个ZKEvent,将ZkEvent,将ZkEvent交给ZkEventThread处理;
3、ZkEventThread线程,拿到ZkEvent后,只需要调用ZkEvent的run方法进行处理就可以了,所以,具体的如何调用Listener,还要依赖于ZkEvent的run()实现
5、序列化处理
Zookeeper中,会涉及到序列化、反序列化的操作有两种:getData/setData;在ZkClient中,分别用readData/WriteData来替代了。
- ReadData:先调用zookeeper的getData,然后使用ZKSerializer进行反序列化工作
- WriteData:先使用ZKSerializer将对象序列化后,再调用zookeeper的setData
6、注册监听
在ZkClient中客户端可以通过注册相关的事件监听来实现对Zookeeper服务端事件的订阅。
7、demo
package com.xxx.api.zkclient;
import com.xxx.ZookeeperUtil;
import com.xxx.api.natives.crud.User;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class ZkClientCrud<T> {
ZkClient zkClient ;
final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class);
public ZkClientCrud(ZkSerializer zkSerializer) {
logger.info("链接zk开始");
// zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout);
zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer);
}
public void createEphemeral(String path,Object data){
zkClient.createEphemeral(path,data);
}
/***
* 支持创建递归方式
* @param path
* @param createParents
*/
public void createPersistent(String path,boolean createParents){
zkClient.createPersistent(path,createParents);
}
/***
* 创建节点 跟上data数据
* @param path
* @param data
*/
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
/***
* 子节点
* @param path
* @return
*/
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
public T readData(String path){
return zkClient.readData(path);
}
public void writeData(String path,Object data){
zkClient.writeData(path,data);
}
//递归删除
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
}
package com.xxx.api.zkclient;
import com.xxx.api.natives.crud.User;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class ZkClientCrudTest {
final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class);
public static void main(String[] args) {
ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer());
String path="/root";
zkClientCrud.deleteRecursive(path);
zkClientCrud.createPersistent(path,"hi");
/* zkClientCrud.createPersistent(path+"/a/b/c",true);//递归创建 但是不能设在value
//zkClientCrud.createPersistent(path,"hi");
logger.info(zkClientCrud.readData(path));
//更新
zkClientCrud.writeData(path,"hello");
logger.info(zkClientCrud.readData(path));
logger.info(String.valueOf(zkClientCrud.getChildren(path)));
//子节点
List<String> list=zkClientCrud.getChildren(path);
for(String child:list){
logger.info("子节点:"+child);
}*/
User user=new User();
user.setUserid(1);
user.setUserName("张三");
zkClientCrud.writeData(path,user);
System.out.println(zkClientCrud.readData(path).getUserName());;
}
}
package com.xxx.api.zkclient;
import com.xxx.ZookeeperUtil;
import org.I0Itec.zkclient.*;
import org.apache.zookeeper.Watcher;
import java.util.List;
public class ZkClientWatcher {
ZkClient zkClient;
public ZkClientWatcher() {
zkClient= new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);
}
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
public void writeData(String path,Object object){
zkClient.writeData(path,object);
}
public void delete(String path){
zkClient.delete(path);
}
public boolean exists(String path){
return zkClient.exists(path);
}
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
//对父节点添加监听数据变化。
public void subscribe(String path){
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath,data );
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.printf("删除的节点为:%s\r\n", dataPath );
}
});
}
//对父节点添加监听子节点变化。
public void subscribe2(String path){
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("父节点: " + parentPath+",子节点:"+currentChilds+"\r\n");
}
});
}
//客户端状态
public void subscribe3(String path) {
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if(state== Watcher.Event.KeeperState.SyncConnected){
//当我重新启动后start,监听触发
System.out.println("连接成功");
}else if(state== Watcher.Event.KeeperState.Disconnected){
System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
}else
System.out.println("其他状态"+state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("重建session");
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
}
});
}
/* @Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
}*/
}
package com.xxx.api.zkclient;
import java.util.concurrent.TimeUnit;
public class ZkClientWatcherTest {
public static void main(String[] args) throws InterruptedException {
ZkClientWatcher zkClientWatche=new ZkClientWatcher();
String path="/root";
zkClientWatche.deleteRecursive(path);
zkClientWatche.createPersistent(path,"hello");
zkClientWatche.subscribe(path);
zkClientWatche.subscribe2(path);
// zkClientWatche.subscribe3(path);//需要启服务
// Thread.sleep(Integer.MAX_VALUE);
zkClientWatche.createPersistent(path+"/root2","word");
TimeUnit.SECONDS.sleep(1);
zkClientWatche.writeData(path,"hi");
TimeUnit.SECONDS.sleep(1);
//zkClientWatche.delete(path);//如果目录下有内容 不能删除 会报 Directory not empty for /root的异常
zkClientWatche.deleteRecursive(path);
TimeUnit.SECONDS.sleep(1); //这个main线程就结束
}
}
package com.xxx;
public class ZookeeperUtil {
/** zookeeper服务器地址 */
public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";
/** 定义session失效时间 */
public static final int sessionTimeout = 5000;
public static final String path = "/root";
}