ZK应用场景
监听+回调—NodeCache
// 节点监控测试
public class NodeWatcherTest {
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace("watcher")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
String path = "/nodecache";
zkFluentClient.start();
// 创建节点
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
// 创建监控watcher
final NodeCache nodeCache = new NodeCache(zkFluentClient, path);
nodeCache.start();// 开启监控
// 注册对应的listener
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
// 这里只是打印出来
System.out.println("Node Data updated,new Data:" + new String(nodeCache.getCurrentData().getData()));
}
});
// 做个实际的更新处理--set
zkFluentClient.setData().forPath(path,"update my data".getBytes());
TimeUnit.SECONDS.sleep(10);
// 清除对应的数据
zkFluentClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
TimeUnit.MICROSECONDS.sleep(Integer.MAX_VALUE);
}
}
这里关键是NodeCache,理解这个东西。
new NodeCache第三个参数,默认为false。如果为true,那么NodeCache在第一次启动的时候就会立刻从zk上读取对应节点的数据内容,并保存在Cache中。
NodeCache不仅可以用于监听数据节点的内容变更,也可以监听指定节点是否存在。如果原节点不存在,那么Cache就会在节点被创建后触发listener。但是如果数据节点被删除,就无法触发listener。
子节点监听--PathChildrenCache
注意,无法监听二级子节点。
子节点变化有三种事件:CHILD_ADD\CHILD_UPDATED\CHILD_REMOVED
import java.util.concurrent.TimeUnit;
public class PathChildrenCacheTest {
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace("child-watcher")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
zkFluentClient.start();
final String path = "/child3";
// 指定需要监控的父目录
final PathChildrenCache pathChildrenCache = new PathChildrenCache(zkFluentClient, path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
// 注册listener
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework,
PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED" + pathChildrenCacheEvent.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED" + pathChildrenCacheEvent.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED" + pathChildrenCacheEvent.getData().getPath());
break;
default:
break;
}
}
});
// 创建父节点,如果存在就跳过
//zkFluentClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
// TimeUnit.SECONDS.sleep(1);
zkFluentClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
zkFluentClient.setData().forPath(path + "/c1","update data".getBytes()); // update 没看到执行回调,但是数据内容已经更新了
zkFluentClient.delete().guaranteed().forPath(path + "/c1");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
这里会关注子节点的变化,但是update内容貌似并没有触发listener的执行,好神奇。