springboot集成原生zookeeper及api使用

  • Pom文件

<!--zk原生api-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
        </dependency>

  • 配置zk连接类

@Slf4j
@Configuration
public class ZookeeperConfig {

    private  String connectString = "xxxxxx:2181,xxxxx:2181,xxxx:2181";

   //毫秒
    private  int timeout = 4000;

    /**
     * connectString:连接服务器的ip字符串,多个ip用逗号分隔
     * sessionTimeout:超时时间,心跳收不到了,那就超时
     * watcher:通知时间,如果有对应的事件触发,则会收到一个通知:如果不需要,那就设置为null
     * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,
     *              只是不能写,此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
     * sessionId:会话的id
     * sessionPasswd:会话密码 当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
     * @return
     */
    @Bean(name = "zkClient")
    public ZooKeeper zkClient(){
        ZooKeeper zooKeeper=null;
        try {
            //使一个线程等待其他线程各自执行完毕后在执行
            //使用在这边,zk连接需要时间,等待连接成功后在执行下面逻辑
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            //连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码
            zooKeeper = new ZooKeeper(connectString, timeout, event -> {
                log.info("接受到watcher通知:{}",event);
                if(Watcher.Event.KeeperState.SyncConnected==event.getState()){
                    //如果收到了服务端的响应事件,连接成功
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
            log.info("【初始化ZooKeeper连接状态....】={}",zooKeeper.getState());

        }catch (Exception e){
            log.error("初始化ZooKeeper连接异常....】={}",e);
        }
        return  zooKeeper;
    }
}

  • 测试类-基本api使用

package com.cheng.testone;

import com.cheng.testone.zk.WatcherApi;
import com.cheng.testone.zk.ZkApi;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.security.NoSuchAlgorithmException;
import java.security.acl.Acl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

@SpringBootTest
class TestOneApplicationTests {

    private static final Logger logger = LoggerFactory.getLogger(TestOneApplicationTests.class);


    @Test
    void contextLoads() {
    }

    @Autowired
    private ZooKeeper zkClient;

    public static final String path = "/cheng";
    public static final String data = "test-create";

    final CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
     * 参数:
     *      path:创建的路径
     *      data:存储的数据的byte[]
     *      acl:控制权限策略
     *          Ids.OPE_ACL_UNSAFE-->world:anyone:cdrwa
     *          CREATOR_ALL_ACL-->auth:user:password:cdrwa
     *      createMode:节点类型,是一个枚举
     *                  PERSISTENT:持久化节点
     *                  PERSISTENT_SEQUENTIAL:持久化顺序节点
     *                  EPHEMERAL:临时节点
     *                  EPHEMERAL_SEQUENTIAL:临时顺序节点
     */

    @Test
    public void createNode(){
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            //同步创建
            //zkClient.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            //异步创建
            String ctx = "{'create':'success'}";
            zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
                @Override
                public void processResult(int i, String path, Object ctx, String name) {
                    logger.info("异步创建节点通知:{},{},{},{}",i,path,ctx,name);
                    countDownLatch.countDown();
                }
            },ctx);
            countDownLatch.await();
            logger.info("创建节点成功=======");
        } catch (Exception e) {
            logger.error("【创建持久化节点异常】{},{},{}",path,data,e);
        }
    }

    /**
     * 修改节点数据
     */
    @Test
    public void updateData(){
        try {
            //zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.
            //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
            Stat stat = zkClient.setData(path, "xyz".getBytes(), 0);
            logger.info("更新节点:{}",stat);
            //版本号不对 ->KeeperErrorCode = BadVersion for /cheng
            //也有异步方式
        } catch (Exception e) {
            logger.error("【修改持久化节点异常】{},{},{}",path,data,e);
        }
    }

    /**
     * 删除节点
     */
    @Test
    public void deleteNode(){
        try {
            //创建一个删除节点
            zkClient.create("/delete-node","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
            //同步方式
            //zkClient.delete(path,-1);
            //异步方式
            zkClient.delete("/delete-node", 0, new AsyncCallback.VoidCallback() {
                @Override
                public void processResult(int i, String path, Object ctx) {
                    countDownLatch.countDown();
                    logger.info("节点删除成功回调:path:{},ctx:{}",path,ctx);
                }
            },"success");
            countDownLatch.await();
        } catch (Exception e) {
            logger.error("【删除持久化节点异常】{},{}",path,e);
        }
    }

    /**
     * 获取节点数据
     */
    @Test
    public void getNodeData(){
        //获取数据完之后 自动为state赋上值
        Stat stat=new Stat();
        try {
            byte[] bytes=zkClient.getData(path,true,stat);
            String data = new String(bytes);
            logger.info("获取节点数据data:{},stat:{}",data,stat);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取子节点列表
     */
    @Test
    public void getChildrenList(){
        try {
            List<String> list = zkClient.getChildren("/imooc", new WatcherApi());
            logger.info("获取子节点:{}",list);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 判断节点是否存在
     * boolean watcher :是否注册watcher事件,true会在new Zookeeper()构造函数中watcher参数 监听到
     * Watcher watcher : 会在监听类中监听到
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void nodeExist() throws KeeperException, InterruptedException {

        Stat stat = zkClient.exists("/imooc-hhh", new WatcherApi());
        if (stat == null){
            logger.info("节点不存在");
        }else {
            logger.info("当前节点版本号为:{}",stat.getVersion());
        }

    }

    /**
     * 权限操作
     * @throws NoSuchAlgorithmException
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void acl() throws NoSuchAlgorithmException, KeeperException, InterruptedException {
        //自定义权限
//        List<ACL> acls = new ArrayList<>();
//        Id imooc1 = new Id("digest",getDigestUserPwd("imooc1:123456"));
//        Id imooc2 = new Id("digest",getDigestUserPwd("imooc2:123456"));
//        acls.add(new ACL(ZooDefs.Perms.ALL,imooc1));
//        acls.add(new ACL(ZooDefs.Perms.READ,imooc2));
//        acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE,imooc2));
//        zkClient.create("/aclimooc/testdigest","testdigest".getBytes(),acls,CreateMode.PERSISTENT);

        //注册用户必须通过addAuthInfo才能操作节点,参考命令addauth
       // zkClient.addAuthInfo("digest","imooc2:123456".getBytes());
        //zkClient.create("/aclimooc/testdigest/children","children-value".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT);

        //设置值 没有修改权限 报错KeeperErrorCode = NoAuth for /aclimooc/testdigest
        zkClient.setData("/aclimooc/testdigest","123".getBytes(),-1);


    }


    private String getDigestUserPwd(String id) throws NoSuchAlgorithmException {
        return DigestAuthenticationProvider.generateDigest(id);
    }
}


©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。