- 客户端操作zookeeper学习完后,我们要使用java控制zookeeper。我们先用apache提供的zookeeper功能来操作zookeeper服务器。
- 我们创建的每个zookeeper都是一个客户端链接
先看代码,然后解释
1.创建一个maven项目,导入Apache的zookeeper依赖
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.37</version>
</dependency>
</dependencies>
2.编写客户端连接
package com.zookeeper.test;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class ZookeeperConnection {
private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
private static CountDownLatch countDownLatch = new CountDownLatch(1);
//连接过程:not connection connecting connected close
public static void main(String[] args) throws Exception {
// testConnecting();
testConnected();
}
public static void testConnecting() throws Exception{
ZooKeeper zk = new ZooKeeper(ZK_CONNECTION, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
System.out.println(zk.getState());
}
public static void testConnected() throws Exception{
ZooKeeper zk = new ZooKeeper(ZK_CONNECTION, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
countDownLatch.countDown();
System.out.println(watchedEvent.getState());
}
}
});
countDownLatch.await();
System.out.println(zk.getState());
}
}
zookeeper连接过程
- testConnected方法输出的是connected
在使用countDounLatch的时候我们等待连接完成才输出 - testConnecting方法输出的是connecting
没有使用线程等待,直接输出。出现这种情况是因为连接是异步的,主线程执行完毕,连接还在进行中。
3.节点的增删改查
public static void main(String[] args) throws Exception {
// testConnecting();
ZooKeeper zk = testConnected();
//创建节点:节点路径,节点携带的值,节点的权限,创建持久化节点
zk.create("/node","abc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//获得节点内容:
Stat stat = new Stat();
byte[] data = zk.getData("/node",false,stat);
System.out.println(new String(data));
//修改节点
zk.setData("/node","cba".getBytes(),-1);
Stat stat1 = new Stat();
byte[] data1 = zk.getData("/node",false,stat1);
System.out.println(new String(data1));
//删除节点
zk.delete("/node",-1);
}
4.zookeeper的watch功能(重点)
watch类似订阅发布模式。监听某个节点,当节点发生变化监听节点就会收到通知。直接上代码
- watch监听类
public class MyWatch implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent watchedEvent) {
try {
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
if(Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()){
countDownLatch.countDown();
System.out.println("连接中..."+watchedEvent.getState());
}
//节点数据发生变化
else if(watchedEvent.getType() == Event.EventType.NodeDataChanged){
String path = watchedEvent.getPath();
//能够注册事件的方法getData
System.out.println("节点数据变化事件--路径:"+watchedEvent.getPath());
}
//创建节点
else if(watchedEvent.getType() == Event.EventType.NodeCreated){
System.out.println("创建节点:路径:"+watchedEvent.getPath());
}
//创建子节点
else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
String path = watchedEvent.getPath();
//能够注册事件的方法getData
System.out.println("子节点数据变化事件--路径:"+watchedEvent.getPath());
}
//节点删除
else if(watchedEvent.getType() == Event.EventType.NodeDeleted){
System.out.println("删除节点");
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}
}
}
- 实现一个watch只需要实现Watcher类就可以了,然后实现里面的process方法
- WatchedEvent是事件对象,可以根据不同事件类型进行处理
- 测试watch类
public class ZkClient {
private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
public static void main(String[] args) throws Exception {
MyWatch myWatch = new MyWatch();
ZooKeeper zk1 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
Stat stat = zk1.exists("/watchNode",true);
if(stat == null){
zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
stat = zk1.exists("/watchNode",true);
if(stat == null){
zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}else{
System.out.println("客户端2注册了watchNode节点的getDate watch");
zk1.getData("/watchNode",true,stat);
System.out.println("客户端1修改watchNode节点数据");
zk1.setData("/watchNode","1234".getBytes(),-1);
}
}
}
-
运行结果:
上面我们写了最简单的事件监听。单个客户端监听/watchNode这个节点。zookeeper的监听是单次通知,第二次操作/watchNode时需要二次监听。zookeeper提供监听的方法有三个:exists;getData;getChildren。
- 多客户端监听同一节点
public class ZkClient2 {
private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
public static void main(String[] args) throws Exception {
MyWatch myWatch = new MyWatch();
ZooKeeper zk1 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
ZooKeeper zk2 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
//zk1监听了/watchNode节点事件
Stat stat1 = zk1.exists("/watchNode",true);
//zk2监听了/watchNode节点事件
Stat stat2 = zk2.exists("/watchNode",true);
if(stat1 == null){
zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
//当zk1创建节点后,zk2也收到了通知
}
}
-
执行结果: