ZooKeeper(四)ZooKeeper客户端Java zkClient和Curator

  • zkClient:开源的zk客户端,在原生API基础上封装,是一个更易于使用的zookeeper客户端
    1. 创建会话(同步,重试)
    2. 创建节点(同步,递归创建)
    3. 删除节点(同步,递归删除)
    4. 代码示例
      import net.lc7.model.User;
      import net.lc7.util.ZkPropertiesUtil;
      import org.I0Itec.zkclient.IZkChildListener;
      import org.I0Itec.zkclient.IZkDataListener;
      import org.I0Itec.zkclient.ZkClient;
      import org.I0Itec.zkclient.serialize.SerializableSerializer;
      import org.apache.zookeeper.CreateMode;
      import org.apache.zookeeper.data.Stat;
    
      import java.util.List;
    
      /**
       * @Description:
       * @Author: Jason.zhu
       * @Create: 2019/05/22 17:37
       */
    
      public class ZkClientMain {
          private static ZkClient zkClient;
    
    
          public static void main(String[] args) throws Exception{
              String ip_port = ZkPropertiesUtil.getZkServerIp();
              int sessionTimeout = 10000;
              int connetionTimeout = ZkPropertiesUtil.getZktimeout();
              zkClient = new ZkClient(ip_port,sessionTimeout,connetionTimeout,new SerializableSerializer());
    
              updateNodeData();
          }
    
          /**
           * 新建节点
           */
          private static void createNode(){
              User user = User.builder().age(18).id(1).name("jason").build();
              String node = zkClient.create("/user",user,CreateMode.PERSISTENT);
              System.out.println(node);
          }
    
          //获取节点
          private static void getNodeData(){
              Stat stat = new Stat();
              User user1 = zkClient.readData("/user", stat);
              System.out.println("name : " + user1.getName() );
              System.out.println("stat : " + stat);
          }
    
          /**
           * 创建子节点
           */
          private static void createChildrenNode(){
              String node = zkClient.create("/user/1","user1",CreateMode.PERSISTENT);
              System.out.println(node);
              node = zkClient.create("/user/2","user2",CreateMode.PERSISTENT);
              System.out.println(node);
              node = zkClient.create("/user/3","user3",CreateMode.PERSISTENT);
              System.out.println(node);
          }
    
          /**
           * 获取子节点
           */
          private static void getChildNode(){
              String node = "/user";
              boolean exist = zkClient.exists(node);
              if(exist){
                  List<String> childNodes = zkClient.getChildren(node);
                  childNodes.stream().forEach(System.out::println);
              }
          }
    
          private static void delNode(){
              String node = "/user";
              boolean exist = zkClient.exists(node);
              if(exist){
      //           zkClient.delete(node);//删除当前节点,有子节点无法删除
                  zkClient.deleteRecursive(node);//删除当前节点,有子节点删除子节点
              }
          }
    
          /**
           * 更新节点数据
           */
          private static void updateNodeData(){
              User user = User.builder().age(11).name("wangwu").id(12).build();
              String node = "/user";
              zkClient.writeData(node, user);
    
          }
    
          /**
           * 监控子节点
           */
          private static void watchChildChange() throws InterruptedException {
              String node = "/user";
              zkClient.subscribeChildChanges(node, (str,strs) -> {
                  System.out.println(str);
                  System.out.println(strs);
              });
              Thread.sleep(Long.MAX_VALUE);
          }
    
          /**
           *监控节点
           */
          private static void watchNode(){
              String node ="/user";
              zkClient.subscribeDataChanges(node, new IZkDataListener() {
                  @Override
                  public void handleDataChange(String s, Object o) throws Exception {
                      System.out.println("node name : " + s + ",updated !!");
                      User user = (User)o;
                      System.out.println("node new value : " + user);
                  }
    
                  @Override
                  public void handleDataDeleted(String s) throws Exception {
                      System.out.println("node name : " + s + ", deleted !!");
                  }
              });
    
              try {
                  Thread.sleep(Long.MAX_VALUE);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
    
      }
    
    import lombok.Builder;
    import lombok.Data;
    
    import java.io.Serializable;
    
    @Data
    @Builder
    public class User implements Serializable {
          private int id;
          private int age;
          private String name;
    
          @Override
          public String toString(){
              return "id=" + id + ", age="+age + ",name="+name;
         }
    }
    
  • curator:开源的zk客户端,在原生API基础上封装,apache顶级项目
    1. Curator采用Fluent风格API
    2. Curator对zk进行基本操作代码示例:
    import net.lc7.util.ZkPropertiesUtil;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    
    /**
      * @Description: Curator对zk进行基本操作,Curator采用流式风格API
      * @Author: Jason.zhu
      * @Create: 2019/05/24 17:55
      */
    public class CuratorClient {
    private static CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
                  ZkPropertiesUtil.getZkServerIp(),
                  new RetryNTimes(10, 5000));
    
    private static String path = "/curator_node";
    
    public static void main(String[] args) throws Exception {
              curatorFramework.start();
    
              //create node
              String data = "curator_data";
              curatorFramework.create().creatingParentsIfNeeded().forPath(path,data.getBytes());
    
    
              //get Node and Data
              print("ls", "/");
              print(curatorFramework.getChildren().forPath("/"));
    
              print("get", path);
              print(curatorFramework.getData().forPath(path));
    
              //update node data
    
              String dataNew = "curator_data_new";
              print("set", path, dataNew);
              curatorFramework.setData().forPath(path, dataNew.getBytes());
    
              print("get", path);
              print(curatorFramework.getData().forPath(path));
    
              //remove node
              print("delete", path);
              curatorFramework.delete().forPath(path);
              print("ls", "/");
              print(curatorFramework.getChildren().forPath("/"));
    
              curatorFramework.close();
          }
    
          private static void print(String... cmds) {
              StringBuilder text = new StringBuilder("$ ");
              for (String cmd : cmds) {
                  text.append(cmd).append(" ");
              }
              System.out.println(text.toString());
          }
    
          private static void print(Object result) {
              System.out.println(
                      result instanceof byte[]
                              ? new String((byte[]) result)
                              : result);
          }
    
      }
    
      public class ZkPropertiesUtil {
    
      private static Properties zkProperties = new Properties();
    
      static {
    
          InputStream is = ZkPropertiesUtil.class.getResourceAsStream("/zk.properties");
          try {
              zkProperties.load(is);
          } catch (Exception e) {
              e.printStackTrace();
          }finally {
              try {
                  if (is != null) {
                      is.close();
                  }
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }
      }
    
      public static String getZkServerIp(){
          return zkProperties.getProperty("zk.zkServerIps");
      }
    
      public static int getZktimeout(){
          return Integer.valueOf(zkProperties.getProperty("zk.timeout"));
      }
    
      public static void main(String[] args) {
         String source = "0101888";
    
          System.out.println(source.substring(0, source.length() - 2));
      }
    }
    
    1. Curator操作zk实现分布式锁
      import net.lc7.util.ZkPropertiesUtil;
      import org.apache.curator.framework.CuratorFramework;
      import org.apache.curator.framework.CuratorFrameworkFactory;
      import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
      import org.apache.curator.framework.recipes.locks.InterProcessMutex;
      import org.apache.curator.retry.RetryNTimes;
    
      import java.util.concurrent.TimeUnit;
    
      /**
       * @Description: Curator操作zk实现分布式锁
       * @Author: Jason.zhu
       * @Create: 2019/05/27 11:28
       */
    
      public class CuratorDistributeLock {
          private static String lockPath = "/lockPath";
    
          public static void main(String[] args) throws Exception{
              CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZkPropertiesUtil.getZkServerIp(),
                                                              new RetryNTimes(10,5000));
              curatorFramework.start();
              System.out.println("zk client start successfully!!");
              Thread.sleep(1000L);
    
    
              Thread t1 = new Thread(() -> {
                 doWithLock(curatorFramework);
              }, "t1");
    
              Thread t2 = new Thread(() -> {
                  doWithLock(curatorFramework);
              }, "t2");
    
              t1.start();
              t2.start();
    
    
      //        curatorFramework.close();
          }
    
          private static void doWithLock(CuratorFramework curatorFramework){
              System.out.println("Client state : " + curatorFramework.getState());
              InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
    
              try {
                  if(lock.acquire(10*1000, TimeUnit.SECONDS)){
                      System.out.println(Thread.currentThread().getName() + "hold lock!!");
                      Thread.sleep(5000L);
                      System.out.println(Thread.currentThread().getName() + "release lock!!");
                  }
              }catch (Exception e){
                  e.printStackTrace();
              }finally {
                  try {
                      lock.release();
                  }catch (Exception e){
                      e.printStackTrace();
                  }
    
              }
          }
      }
    
    1. Curator操作zk实现Leader选举
      import net.lc7.util.ZkPropertiesUtil;
      import org.apache.curator.framework.CuratorFramework;
      import org.apache.curator.framework.CuratorFrameworkFactory;
      import org.apache.curator.framework.recipes.leader.LeaderSelector;
      import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
      import org.apache.curator.framework.state.ConnectionState;
      import org.apache.curator.retry.RetryNTimes;
      import org.apache.curator.utils.EnsurePath;
    
      /**
       * @Description: Leader选举
       * 当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。
       * Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。
       * 注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。
       * autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
       * @Author: Jason.zhu
       * @Create: 2019/05/27 11:49
       */
    
      public class CuratorLeaderClient {
          private static String path = "/ensurePath";
    
          public static void main(String[] args) throws InterruptedException {
              LeaderSelectorListener listener = new LeaderSelectorListener() {
                  @Override
                  public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                      System.out.println(Thread.currentThread().getName() + " take leadership !!");
                      Thread.sleep(5000L);
                      System.out.println(Thread.currentThread().getName() + " relinquish leadership !!");
                  }
    
                  @Override
                  public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                      System.out.println(connectionState.name() + " state changed !! " + connectionState.isConnected());
                  }
              };
    
              new Thread(() -> {
                  registerListener(listener);
              }).start();
    
              new Thread(() -> {
                  registerListener(listener);
              }).start();
    
    
              new Thread(() -> {
                  registerListener(listener);
              }).start();
    
    
              new Thread(() -> {
                  registerListener(listener);
              }).start();
    
              new Thread(() -> {
                  registerListener(listener);
              }).start();
    
              Thread.sleep(Integer.MAX_VALUE);
          }
    
          public static void registerListener(LeaderSelectorListener listener){
              CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZkPropertiesUtil.getZkServerIp()
                                                                          ,new RetryNTimes(10, 5000));
    
              curatorFramework.start();
    
              //ensure path
              try {
                  new EnsurePath(path).ensure(curatorFramework.getZookeeperClient());
              } catch (Exception e) {
                  e.printStackTrace();
              }
    
              //register listener
              LeaderSelector selector = new LeaderSelector(curatorFramework, path, listener);
              selector.autoRequeue();
              selector.start();
          }
    
      }
    
    1. Curator操作ZK实现监听功能
      import net.lc7.util.ZkPropertiesUtil;
      import org.apache.curator.framework.CuratorFramework;
      import org.apache.curator.framework.CuratorFrameworkFactory;
      import org.apache.curator.framework.recipes.cache.ChildData;
      import org.apache.curator.framework.recipes.cache.PathChildrenCache;
      import org.apache.curator.retry.RetryNTimes;
    
      /**
       * @Description:  Curator实现监听功能
       *
       * Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
       * Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
       * Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
       *
       * @Author: Jason.zhu
       * @Create: 2019/05/27 11:10
       */
    
      public class CuratorWatchClient {
    
          private static String watcherNode = "/watcherNode";
    
          public static void main(String[] args) throws Exception {
              CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory.newClient(
                                                              ZkPropertiesUtil.getZkServerIp(),
                                                              new RetryNTimes(10, 5000));
              curatorFrameworkClient.start();
    
              System.out.println("zk client start successfully!!");
    
    
              //register watcher
              PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFrameworkClient, watcherNode, true);
              pathChildrenCache.getListenable().addListener((client, event) -> {
                  ChildData data = event.getData();
                  if(null == data){
                      System.out.println("no data in event [" + event +"]");
                  }else {
                      System.out.println(
                              "Receive event :" +
                                      "type=["+ event.getType() +"], " +
                                      "path=[" + data.getPath() + "]," +
                                      "data=["+ data.getData() +"]," +
                                      "state=[" + data.getStat() + "]"
                      );
                  }
              });
              pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
              System.out.println(" Register zookeeper watcher successfully!! Please operate in terminal to show the listener function");
              Thread.sleep(Integer.MAX_VALUE);
    
          }
      }
    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352