ZooKeeper实现了 一 个 层 次 命 名 空 间 的 数 据 模 型 , 也 可 以 认 为 它 就 是 一 个 小 型 的 、 精 简 的文件系统。它的每个节点称为znode, znode除了本身能够包含一部 分 数 据 之 外 ,还 能 够 拥 有 子节点,当节点上的数据发生变化,或者其子节点发生变化时,基于watcher机制,会发出相应
的通知给订阅其状态变化的客户端。
首先,实例化 一 个 Z ooKeeper对象,指定其三个参数。url为Zoo Keeper服务器的地址。
sessionTimeOut为会话的超时时间,Zoo Keeper的会话超时时间的长度由客户端来确定,但是
ZooKeeper的Server端会有两个配置,minSessionTimeout和 maxSess1onTimeout minSessionTimeout的值默认为2 倍 tickTim e , maxSessionTimeout的值默认为20 倍 tickTime ,
,几位都为ms。tickTime也是服务端的 一 个 配 置 项 ,是 S erv er 内 部 控 制 时 间 逻 辑 的 最 小 时 间 单 位 ,如果客户瑙发来的session Timeout超过minSessionTimeout、maxSessionTimeout这个范围,Server会自动取minSessionTimeout或maxSessionTimeout作为sessionTimeout, 然后为这个Client新建
个session对象。最后 一 个 参 数 为 默 认 的 watcher 。如 果 包 含 boo lean watch的读方法中传入true,则将默认watcher注册为所关ii:小件的watcher, 如果传入false, 则不注册任何watcher, 这里暂且定为空。
ZooKeeper zooKeeper=new ZooKeeper(url,sessionTimeOut,null) ;
1 创建节点
通过 ZooKeeper 的A PI新增一 个znode 节点,节点在被创建时,需要指定节点的路径 (此处为/root) 包含的字节数据 (这里指定的是 "root data" 这个字符串)、访问权限 (如果不想设置权限,则指定为 Ids.OPEN _ACL _UNSAFE), 以及创建的节点类型.
创建节点示例:
//创建/root节点,其包含的数据为吐00七data", 访问权限为开放, 所有人均可以访问,//创建模式为持久化节点。
watcher的实现示例:
需要注意的是,ZooKeeper的watcher是一次性的,也就是说,每次在处理完状态变化事件之后,需要重新注册watcher,这一点很让人抓狂。这个特性也使得在处理事件和重新加上watcher这段时间发生的节点状态变化将无法被感知。
ZooKeeper常常发生下面两种系统异常:
org.apache.ZooKeeper.KeeperException.ConnectionLossException, 客户端与其中的一台服务器socket连接出现异常,连接丢失:
org.apache.ZooKeeper.KeeperException.SessionExpiredException, 客户端的session已经超过sessionTimeout, 未进行任何操作。
ConnectionLossException异常可以通过重试进行处理,客户端会根据初始化ZooKeeper时传递的服务列表,自动尝试下一个服务端节点,而在这段时间内,服务端节点变更的事件就丢失。
SessionExpiredException异常不能通过重试进行解决,需要应用重新创建一 个新的客户端(new Zoo keeper()), 这时所有的watcher和EPHEMERAL节点都将失效。
zkClient
在生产环境中常常会遇到session expire这类异常,需要在异常发生后进行重新连接,重新建立session。直接使用ZooKeeper的API来实现,可能会比较烦琐。又因为ZooKeeper的watcher是一 次性的,如果要基于wather实现发布/订阅模式,需要做额外的一 些编码,以实现每次watcher失效后重新注册,将一 次性订阅包装成持久订阅。并且,ZooKeeper的 API接口中,节点数据默认为二进制byte数组,如果想直接保存对象类型的数据,需要将对象转换为二进制类型,也
就是还需要进行相关的序列化工作。这里介绍一 个ZooKeeper的第三方客户端工具包zkClient,可以比较好地解决上述问题。
zkClient解决了watcher的一 次性注册问题,将znode 的事件重新定义为子节点的变化、数据的变化、连接及状态的变化三类,由zkClient统一 将 watcher的W atchedEvent转换到以上=种情况中去处理,watcher执行后重新读取数据的同时,再注册相同的watcher。zkClient在发生session expire异常时会自动创建新的ZooKeeper实例进行重连。当然,这时原来所有的watcher和EPHEMERAL节点都将失效,可以在zkClient定义的连接状态变化的接口lzkStateListener里面的handleNewSession方法中进行相应的处理。同时,zkClient还提供了ZkSerializer接口,可进行序列化和反序列化的相关操作,简化了znode的数据存储。
ZkClient zkClient = new ZkClient(serverList);
一些简单的示例:
//创建节点
zkClient.createPersistent(PATH);
//创建子节点
zkClient.create(PATH + "/child", "child znode", CreateMode.EPHEMERAL); //荻得子节点
Lis七<String> children = zkClient.getChildren(PATH);
zkClient将ZooKeeper的watcher机制转换为一种更加容易理解的订阅形式,并且这种关系是可以保持的,而非一次性的。也就是说,当watcher使用完后,zkClient会自动再增加一个相同的watcher。节点有三种状态可供订阅,一类是子节点的变化,一类是数据的变化,还有一类是状态的变化。
订阅子节点状态变化的示例代码:
//订阅子节点的变化
zkClient.subscribeChildChanges(PATH, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception{
}
订阅节点数据变化的示例代码:
//订阅数据的变化
zkClient.subscribeDataChanges(PATH,new IZkDataListener() {
@Override
public void handleDataChange(StringdataPath, Objectdata) throws Exception {
}