<!--zk原生api-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
@Slf4j
@Configuration
public class ZookeeperConfig {
private String connectString = "xxxxxx:2181,xxxxx:2181,xxxx:2181";
//毫秒
private int timeout = 4000;
/**
* connectString:连接服务器的ip字符串,多个ip用逗号分隔
* sessionTimeout:超时时间,心跳收不到了,那就超时
* watcher:通知时间,如果有对应的事件触发,则会收到一个通知:如果不需要,那就设置为null
* canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,
* 只是不能写,此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
* sessionId:会话的id
* sessionPasswd:会话密码 当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
* @return
*/
@Bean(name = "zkClient")
public ZooKeeper zkClient(){
ZooKeeper zooKeeper=null;
try {
//使一个线程等待其他线程各自执行完毕后在执行
//使用在这边,zk连接需要时间,等待连接成功后在执行下面逻辑
final CountDownLatch countDownLatch = new CountDownLatch(1);
//连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码
zooKeeper = new ZooKeeper(connectString, timeout, event -> {
log.info("接受到watcher通知:{}",event);
if(Watcher.Event.KeeperState.SyncConnected==event.getState()){
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
}
});
countDownLatch.await();
log.info("【初始化ZooKeeper连接状态....】={}",zooKeeper.getState());
}catch (Exception e){
log.error("初始化ZooKeeper连接异常....】={}",e);
}
return zooKeeper;
}
}
package com.cheng.testone;
import com.cheng.testone.zk.WatcherApi;
import com.cheng.testone.zk.ZkApi;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.security.NoSuchAlgorithmException;
import java.security.acl.Acl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@SpringBootTest
class TestOneApplicationTests {
private static final Logger logger = LoggerFactory.getLogger(TestOneApplicationTests.class);
@Test
void contextLoads() {
}
@Autowired
private ZooKeeper zkClient;
public static final String path = "/cheng";
public static final String data = "test-create";
final CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
* 参数:
* path:创建的路径
* data:存储的数据的byte[]
* acl:控制权限策略
* Ids.OPE_ACL_UNSAFE-->world:anyone:cdrwa
* CREATOR_ALL_ACL-->auth:user:password:cdrwa
* createMode:节点类型,是一个枚举
* PERSISTENT:持久化节点
* PERSISTENT_SEQUENTIAL:持久化顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
*/
@Test
public void createNode(){
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
//同步创建
//zkClient.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//异步创建
String ctx = "{'create':'success'}";
zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int i, String path, Object ctx, String name) {
logger.info("异步创建节点通知:{},{},{},{}",i,path,ctx,name);
countDownLatch.countDown();
}
},ctx);
countDownLatch.await();
logger.info("创建节点成功=======");
} catch (Exception e) {
logger.error("【创建持久化节点异常】{},{},{}",path,data,e);
}
}
/**
* 修改节点数据
*/
@Test
public void updateData(){
try {
//zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.
//version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
Stat stat = zkClient.setData(path, "xyz".getBytes(), 0);
logger.info("更新节点:{}",stat);
//版本号不对 ->KeeperErrorCode = BadVersion for /cheng
//也有异步方式
} catch (Exception e) {
logger.error("【修改持久化节点异常】{},{},{}",path,data,e);
}
}
/**
* 删除节点
*/
@Test
public void deleteNode(){
try {
//创建一个删除节点
zkClient.create("/delete-node","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
//同步方式
//zkClient.delete(path,-1);
//异步方式
zkClient.delete("/delete-node", 0, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int i, String path, Object ctx) {
countDownLatch.countDown();
logger.info("节点删除成功回调:path:{},ctx:{}",path,ctx);
}
},"success");
countDownLatch.await();
} catch (Exception e) {
logger.error("【删除持久化节点异常】{},{}",path,e);
}
}
/**
* 获取节点数据
*/
@Test
public void getNodeData(){
//获取数据完之后 自动为state赋上值
Stat stat=new Stat();
try {
byte[] bytes=zkClient.getData(path,true,stat);
String data = new String(bytes);
logger.info("获取节点数据data:{},stat:{}",data,stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取子节点列表
*/
@Test
public void getChildrenList(){
try {
List<String> list = zkClient.getChildren("/imooc", new WatcherApi());
logger.info("获取子节点:{}",list);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 判断节点是否存在
* boolean watcher :是否注册watcher事件,true会在new Zookeeper()构造函数中watcher参数 监听到
* Watcher watcher : 会在监听类中监听到
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void nodeExist() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/imooc-hhh", new WatcherApi());
if (stat == null){
logger.info("节点不存在");
}else {
logger.info("当前节点版本号为:{}",stat.getVersion());
}
}
/**
* 权限操作
* @throws NoSuchAlgorithmException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void acl() throws NoSuchAlgorithmException, KeeperException, InterruptedException {
//自定义权限
// List<ACL> acls = new ArrayList<>();
// Id imooc1 = new Id("digest",getDigestUserPwd("imooc1:123456"));
// Id imooc2 = new Id("digest",getDigestUserPwd("imooc2:123456"));
// acls.add(new ACL(ZooDefs.Perms.ALL,imooc1));
// acls.add(new ACL(ZooDefs.Perms.READ,imooc2));
// acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE,imooc2));
// zkClient.create("/aclimooc/testdigest","testdigest".getBytes(),acls,CreateMode.PERSISTENT);
//注册用户必须通过addAuthInfo才能操作节点,参考命令addauth
// zkClient.addAuthInfo("digest","imooc2:123456".getBytes());
//zkClient.create("/aclimooc/testdigest/children","children-value".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT);
//设置值 没有修改权限 报错KeeperErrorCode = NoAuth for /aclimooc/testdigest
zkClient.setData("/aclimooc/testdigest","123".getBytes(),-1);
}
private String getDigestUserPwd(String id) throws NoSuchAlgorithmException {
return DigestAuthenticationProvider.generateDigest(id);
}
}