转:http://throwable.coding.me/2018/12/16/zookeeper-curator-usage
前提
因为最近项目需要使用Zookeeper这个中间件,提前了解一下它的客户端Curator的使用。
简介
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的由来是比较有趣的,下面的片段摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与实践》一书:
Zookeeper最早起源于雅虎的研究院的一个研究小组。在当时,研究人员发现,在雅虎内部很多大型的系统需要依赖一个类似的系统进行分布式协调,但是这些系统往往存在分布式单点问题。所以雅虎的开发人员就试图开发一个通用的无单点问题的分布式协调框架。在立项初期,考虑到很多项目都是用动物的名字来命名的(例如著名的Pig项目),雅虎的工程师希望给这个项目也取一个动物的名字。时任研究院的首席科学家Raghu Ramakrishnan开玩笑说:再这样下去,我们这儿就变成动物园了。此话一出,大家纷纷表示就叫动物园管理员吧——因为各个以动物命名的分布式组件放在一起,雅虎的整个分布式系统看上去就像一个大型的动物园了,而Zookeeper正好用来进行分布式环境的协调——于是,Zookeeper的名字由此诞生了。
Curator无疑是Zookeeper客户端中的瑞士军刀,它译作”馆长”或者’’管理者’’,不知道是不是开发小组有意而为之,笔者猜测有可能这样命名的原因是说明Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包含了几个包:
- curator-framework:对zookeeper的底层api的一些封装。
- curator-client:提供一些客户端的操作,例如重试策略等。
- curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败):
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
Curator的基本Api
创建会话
1.使用静态工程方法创建客户端
一个例子如下:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
connectionInfo,
5000,
3000,
retryPolicy);
newClient静态工厂方法包含四个主要参数:
参数名 | 说明 |
---|---|
connectionString | 服务器列表,格式host1:port1,host2:port2,… |
retryPolicy | 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口 |
sessionTimeoutMs | 会话超时时间,单位毫秒,默认60000ms |
connectionTimeoutMs | 连接创建超时时间,单位毫秒,默认60000ms |
2.使用Fluent风格的Api创建会话
核心参数变为流式设置,一个列子如下:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
3.创建包含隔离命名空间的会话
为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例子)当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多个应用共用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离十分有意义。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
启动客户端
当创建会话成功,得到client的实例然后可以直接调用其start( )方法:
client.start();
数据节点操作
创建数据节点
Zookeeper的节点创建模式:
- PERSISTENT:持久化
- PERSISTENT_SEQUENTIAL:持久化并且带序列号
- EPHEMERAL:临时
- EPHEMERAL_SEQUENTIAL:临时并且带序列号
创建一个节点,初始内容为空
client.create().forPath("path");
注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空
创建一个节点,附带初始化内容
client.create().forPath("path","init".getBytes());
创建一个节点,指定创建模式(临时节点),内容为空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
创建一个节点,指定创建模式(临时节点),附带初始化内容
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes())
创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点。
删除数据节点
删除一个节点
client.delete().forPath("path");
注意,此方法只能删除叶子节点,否则会抛出异常。
删除一个节点,并且递归删除其所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
删除一个节点,强制指定版本进行删除
client.delete().withVersion(10086).forPath("path");
删除一个节点,强制保证删除
client.delete().guaranteed().forPath("path");
guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
注意:上面的多个流式接口是可以自由组合的,例如:
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");
读取数据节点数据
读取一个节点的数据内容
client.getData().forPath("path");
注意,此方法返的返回值是byte[ ];
读取一个节点的数据内容,同时获取到该节点的stat
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");
更新数据节点数据
更新一个节点的数据内容
client.setData().forPath("path","data".getBytes());
注意:该接口会返回一个Stat实例
更新一个节点的数据内容,强制指定版本进行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
检查节点是否存在
client.checkExists().forPath("path");
注意:该方法返回一个Stat实例,用于检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()
指定要操作的ZNode
获取某个节点的所有子节点路径
client.getChildren().forPath("path");
注意:该方法的返回值为List<string style="box-sizing: border-box;">,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode</string>
事务
CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。一个例子如下:
client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
异步接口
上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
事件类型 | 对应CuratorFramework实例的方法 |
---|---|
CREATE | #create() |
DELETE | #delete() |
EXISTS | #checkExists() |
GET_DATA | #getData() |
SET_DATA | #setData() |
CHILDREN | #getChildren() |
SYNC | #sync(String,Object) |
GET_ACL | #getACL() |
SET_ACL | #setACL() |
WATCHED | #Watcher(Watcher) |
CLOSING | #close() |
响应码(#getResultCode())
响应码 | 意义 |
---|---|
0 | OK,即调用成功 |
-4 | ConnectionLoss,即客户端与服务端断开连接 |
-110 | NodeExists,即节点已经存在 |
-112 | SessionExpired,即会话过期 |
一个异步创建节点的例子如下:
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("path");
注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。
Curator食谱(高级特性)
提醒:首先你必须添加curator-recipes依赖,下文仅仅对recipes一些特性的使用进行解释和举例,不打算进行源码级别的探讨
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
重要提醒:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,curator-recipes下的所有Api将会失效或者过期,尽管后面所有的例子都没有使用到ConnectionStateListener。
Path Cache
Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。
实际使用时会涉及到四个类:
- PathChildrenCache
- PathChildrenCacheEvent
- PathChildrenCacheListener
- ChildData
通过下面的构造函数创建Path Cache:
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
想使用cache,必须调用它的start
方法,使用完后调用close
方法。 可以设置StartMode来实现启动的模式,
StartMode有下面几种:
- NORMAL:正常初始化。
- BUILD_INITIAL_CACHE:在调用
start()
之前会调用rebuild()
。 - POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件
public void addListener(PathChildrenCacheListener listener)
可以增加listener监听缓存的变化。
getCurrentData()
方法返回一个List<ChildData>
对象,可以遍历所有的子节点。
设置/更新、移除其实是使用client (CuratorFramework)来操作, 不通过PathChildrenCache操作:
public class PathCacheDemo {
private static final String PATH = "/example/pathCache";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
cache.start();
PathChildrenCacheListener cacheListener = (client1, event) -> {
System.out.println("事件类型:" + event.getType());
if (null != event.getData()) {
System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(cacheListener);
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
Thread.sleep(10);
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
Thread.sleep(10);
client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
Thread.sleep(10);
for (ChildData data : cache.getCurrentData()) {
System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
}
client.delete().forPath("/example/pathCache/test01");
Thread.sleep(10);
client.delete().forPath("/example/pathCache/test02");
Thread.sleep(1000 * 5);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:如果new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。
注意:示例中的Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不全,这可能与PathCache的实现原理有关,不能太过频繁的触发事件!
Node Cache
Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点。它涉及到下面的三个类:
-
NodeCache
- Node Cache实现类 -
NodeCacheListener
- 节点监听器 -
ChildData
- 节点数据
注意:使用cache,依然要调用它的start()
方法,使用完后调用close()
方法。
getCurrentData()将得到节点当前的状态,通过它的状态可以得到当前的值。
public class NodeCacheDemo {
private static final String PATH = "/example/cache";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH);
final NodeCache cache = new NodeCache(client, PATH);
NodeCacheListener listener = () -> {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
} else {
System.out.println("节点被删除!");
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:示例中的Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不全,这可能与NodeCache的实现原理有关,不能太过频繁的触发事件!
注意:NodeCache只能监听一个节点的状态变化。
Tree Cache
Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合,主要涉及到下面四个类:
- TreeCache - Tree Cache实现类
- TreeCacheListener - 监听器类
- TreeCacheEvent - 触发的事件类
- ChildData - 节点数据
public class TreeCacheDemo {
private static final String PATH = "/example/cache";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH);
TreeCache cache = new TreeCache(client, PATH);
TreeCacheListener listener = (client1, event) ->
System.out.println("事件类型:" + event.getType() +
" | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:在此示例中没有使用Thread.sleep(10),但是事件触发次数也是正常的。
注意:TreeCache在初始化(调用start()
方法)的时候会回调TreeCacheListener
实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()
很有可能导致空指针异常,这里应该主动处理并避免这种情况。