目录
概述
基于watch实现监听
DEMO
基于cache实现监听
Path Cache介绍
Node Cache介绍
Tree Cache介绍
DEMO
1概述
在笔记一中已经对Curator与原生客户端的监听方式进行了介绍,本文主要介绍Curator对zk监听的实现。在maven中引入recipes包,Curator封装Zookeeper的典型场景使用都放在了recipes包中。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
2 基于watch实现监听
利用Watcher来对节点进行监听操作,但此监听操作只能监听一次,与原生API并无太大差异。如有典型业务场景需要使用可考虑,但一般情况不推荐使用。下面是具体的使用demo。
public class CuratorWatchEvent {
public static CuratorFramework build(){
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("172.30.241.205:2181")
.namespace(ZKConstant.ZK_NAMESPACE)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
return curatorFramework;
}
public static void main(String[] args) throws Exception{
String path = "/watchtest";
CuratorFramework client = CuratorWatchEvent.build();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path);
build().getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听器watchedEvent:" + watchedEvent);
}
}).forPath(path);
client.setData().forPath(path,"new content".getBytes());
// 第二次变更节点数据
client.setData().forPath(path,"second content".getBytes());
client.close();
}
}
执行结果:
监听器watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest
基于cache实现监听
cache是一种缓存机制,Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。
curator支持的cache种类有3种
- Path Cache
- Node Cache
- Tree Cache
Path Cache
Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
Path Cache是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。
Node Cache
Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
Node Cache是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。
Tree Cache
可以看做是上两种的合体,Tree Cache观察的是ZNode及子节点。
DEMO
public class CacheListenerDemo {
public static CuratorFramework build(){
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("172.30.241.205:2181")
.namespace(ZKConstant.ZK_NAMESPACE)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
return curatorFramework;
}
public static void main(String[] args) throws Exception{
try {
String testPath="pathChildrenCacheTest";
//创建连接
CuratorFramework client= CacheListenerDemo.build();
//如果testPath存在,删除路径
Stat stat = client.checkExists().forPath("/"+testPath);
if(stat != null)
{
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath);
}
//创建testPath
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath,testPath.getBytes());
//创建PathChildrenCache
//参数:true代表缓存数据到本地
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/" + testPath,true);
//BUILD_INITIAL_CACHE 代表使用同步的方式进行缓存初始化。
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
pathChildrenCache.getListenable().addListener((cf, event) -> {
PathChildrenCacheEvent.Type eventType = event.getType();
switch (eventType) {
case CONNECTION_RECONNECTED:
pathChildrenCache.rebuild();
break;
case CONNECTION_SUSPENDED:
break;
case CONNECTION_LOST:
System.out.println("Connection lost");
break;
case CHILD_ADDED:
System.out.println("Child added");
break;
case CHILD_UPDATED:
System.out.println("Child updated");
break;
case CHILD_REMOVED:
System.out.println("Child removed");
break;
default:
}
});
//创建子节点1
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/1",testPath.getBytes());
Thread.sleep(1000);
//创建子节点1
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/2",testPath.getBytes());
Thread.sleep(1000);
//删除子节点1
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/1");
Thread.sleep(1000);
//删除子节点2
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/2");
Thread.sleep(1000);
pathChildrenCache.close();
} catch (Exception e) {
e.printStackTrace();
// TODO: handle exception
}
}
}