Zookeeper基础(三):Zookeeper客户端ZkClient

1、ZkClient简介

ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能;目前已经应用到了很多项目中,比如Dubbo、Kafka、Helix;

Github:https://github.com/sgroschupf/zkclient
Maven依赖

<dependency>
  <groupId>com.101tec</groupId>
  <artifactId>zkclient</artifactId>
  <version></version>
</dependency>

或者

 <dependency>
     <groupId>com.github.sgroschupf</groupId>
     <artifactId>zkclient</artifactId>
     <version></version>
 </dependency>

2、ZkClient组件

image.png

IZKConnection:是一个ZkClient与Zookeeper之间的一个适配器;在代码里直接使用的是ZKClient,实质上还是委托了zookeeper来处理了。

在ZKClient中,根据事件类型,分为

  • 节点事件(数据事件),对应的事件处理器是IZKDataListener;
  • 子节点事件,对应的事件处理器是IZKChildListener;
  • Session事件,对应的事件处理器是IZKStatusListener;

ZkEventThread:是专门用来处理事件的线程

3、API介绍

  • 启动ZKClient:在创建ZKClient对象时,就完成了到ZooKeeper服务器连接的建立
    1、启动时,制定好connection string,连接超时时间,序列化工具等
    2、创建并启动eventThread,用于接收事件,并调度事件监听器Listener的执行
    3、连接到Zookeeper服务器,同时将ZKClient自身作为默认的Watcher
image.png
  • 为节点注册Watcher
    Zookeeper 原始API的三个方法:getData,getChildren、exists,ZKClient都提供了相应的代理方法,比如exists,
image.png

hasListeners是看有没有与该数据节点绑定的listener


image.png

所以,默认情况下,都会自动的为指定的path注册watcher,并且是默认的watcher(ZKClient),那么怎样才能让hasListeners值为true呢,也就是怎么才能为path绑定Listener呢?
ZKClient提供了订阅功能,一个新建的会话,只需要在取得响应的数据节点后,调用subscribeXXX就可以订阅上相应的事件了。


image.png
  • Zookeeper的CURD(节点的增删查改)
    Zookeeper中提供的变更操作有:节点的创建、删除,节点数据的修改

1、创建操作,节点分为4种,所以ZKClient分别为他们提供了相应的代理


image.png

2、删除节点操作


image.png

3、修改节点数据


image.png

updateDataSerialized:修改已系列化的数据;执行过程是,先读取数据,然后DataUpdater对数据修改,最后调用writeData将修改后的数据发送给服务端;
writeDataReturnStat:写数据并返回数据的状态;

4、客户端处理变更流程

ZKClient是默认的Watcher(ZKClient实现了Watcher接口),并且在为各个数据节点注册的Watcher都是这个默认的Watcher,那么该如何将各种事件通知给相应的Listener呢:
1、判断变更类型,变更类型分为state变更、ChildNode变更、NodeData变更;
2、取出与path关联的Listeners,并为每一个Listener创建一个ZKEvent,将ZkEvent,将ZkEvent交给ZkEventThread处理;
3、ZkEventThread线程,拿到ZkEvent后,只需要调用ZkEvent的run方法进行处理就可以了,所以,具体的如何调用Listener,还要依赖于ZkEvent的run()实现

5、序列化处理

Zookeeper中,会涉及到序列化、反序列化的操作有两种:getData/setData;在ZkClient中,分别用readData/WriteData来替代了。

  • ReadData:先调用zookeeper的getData,然后使用ZKSerializer进行反序列化工作
  • WriteData:先使用ZKSerializer将对象序列化后,再调用zookeeper的setData

6、注册监听

在ZkClient中客户端可以通过注册相关的事件监听来实现对Zookeeper服务端事件的订阅。


image.png

7、demo

package com.xxx.api.zkclient;

import com.xxx.ZookeeperUtil;
import com.xxx.api.natives.crud.User;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class ZkClientCrud<T> {

    ZkClient zkClient ;
    final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class);

    public ZkClientCrud(ZkSerializer zkSerializer) {
        logger.info("链接zk开始");
       // zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout);
        zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer);
    }


    public void createEphemeral(String path,Object data){

        zkClient.createEphemeral(path,data);
    }

    /***
     * 支持创建递归方式
     * @param path
     * @param createParents
     */
    public void createPersistent(String path,boolean createParents){

        zkClient.createPersistent(path,createParents);
    }

    /***
     * 创建节点 跟上data数据
     * @param path
     * @param data
     */
    public void createPersistent(String path,Object data){

        zkClient.createPersistent(path,data);
    }

    /***
     * 子节点
     * @param path
     * @return
     */
    public  List<String> getChildren(String path){
       return zkClient.getChildren(path);

    }

    public  T readData(String path){
        return zkClient.readData(path);
    }

    public  void  writeData(String path,Object data){
         zkClient.writeData(path,data);
    }

    //递归删除
    public  void deleteRecursive(String path){
        zkClient.deleteRecursive(path);

    }
}

package com.xxx.api.zkclient;
import com.xxx.api.natives.crud.User;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class ZkClientCrudTest {
    final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class);
    public static void main(String[] args) {
        ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer());
        String path="/root";
        zkClientCrud.deleteRecursive(path);
        zkClientCrud.createPersistent(path,"hi");
     /*  zkClientCrud.createPersistent(path+"/a/b/c",true);//递归创建 但是不能设在value
       //zkClientCrud.createPersistent(path,"hi");
        logger.info(zkClientCrud.readData(path));
        //更新
        zkClientCrud.writeData(path,"hello");
        logger.info(zkClientCrud.readData(path));
        logger.info(String.valueOf(zkClientCrud.getChildren(path)));
        //子节点
        List<String> list=zkClientCrud.getChildren(path);
        for(String child:list){
            logger.info("子节点:"+child);
        }*/

        User user=new User();
        user.setUserid(1);
        user.setUserName("张三");
        zkClientCrud.writeData(path,user);
        System.out.println(zkClientCrud.readData(path).getUserName());;


    }



}

package com.xxx.api.zkclient;


import com.xxx.ZookeeperUtil;
import org.I0Itec.zkclient.*;
import org.apache.zookeeper.Watcher;

import java.util.List;

public class ZkClientWatcher    {
    ZkClient zkClient;
    public ZkClientWatcher() {
        zkClient= new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);
    }


    public void createPersistent(String path,Object data){
        zkClient.createPersistent(path,data);
    }


    public  void writeData(String path,Object object){
        zkClient.writeData(path,object);

    }

    public  void delete(String path){
        zkClient.delete(path);

    }

    public  boolean exists(String path){
        return zkClient.exists(path);

    }

    public  void deleteRecursive(String path){
        zkClient.deleteRecursive(path);

    }

    //对父节点添加监听数据变化。
    public void subscribe(String path){


        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath,data );
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.printf("删除的节点为:%s\r\n", dataPath );
            }
        });
    }
    //对父节点添加监听子节点变化。
    public void subscribe2(String path){
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("父节点: " + parentPath+",子节点:"+currentChilds+"\r\n");
            }
        });
    }


    //客户端状态
    public void subscribe3(String path) {
        zkClient.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                if(state== Watcher.Event.KeeperState.SyncConnected){
                    //当我重新启动后start,监听触发
                    System.out.println("连接成功");
                }else if(state== Watcher.Event.KeeperState.Disconnected){
                    System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
                }else
                    System.out.println("其他状态"+state);
            }

            @Override
            public void handleNewSession() throws Exception {
                System.out.println("重建session");

            }

            @Override
            public void handleSessionEstablishmentError(Throwable error) throws Exception {

            }
        });

    }


  /*  @Override
    public void handleDataChange(String dataPath, Object data) throws Exception {



    }

    @Override
    public void handleDataDeleted(String dataPath) throws Exception {

    }*/
}

package com.xxx.api.zkclient;

import java.util.concurrent.TimeUnit;

public class ZkClientWatcherTest {
    public static void main(String[] args) throws InterruptedException {
        ZkClientWatcher zkClientWatche=new ZkClientWatcher();
        String path="/root";
        zkClientWatche.deleteRecursive(path);
        zkClientWatche.createPersistent(path,"hello");
        zkClientWatche.subscribe(path);
        zkClientWatche.subscribe2(path);
       // zkClientWatche.subscribe3(path);//需要启服务
       // Thread.sleep(Integer.MAX_VALUE);
        zkClientWatche.createPersistent(path+"/root2","word");
        TimeUnit.SECONDS.sleep(1);
        zkClientWatche.writeData(path,"hi");
        TimeUnit.SECONDS.sleep(1);
        //zkClientWatche.delete(path);//如果目录下有内容 不能删除 会报 Directory not empty for /root的异常
        zkClientWatche.deleteRecursive(path);
        TimeUnit.SECONDS.sleep(1); //这个main线程就结束

    }
}

package com.xxx;

public class ZookeeperUtil {

    /** zookeeper服务器地址 */
    public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";
    /** 定义session失效时间 */
    public static final int sessionTimeout = 5000;
    public static final String path = "/root";
}



最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容

  • ... 一、相关概念 中间件:为分布式系统提供协调服务的组件,如专门用于计算服务的机器就是一个计算型中间件,还有专...
    帅可儿妞阅读 472评论 0 0
  • 转自:https://www.jianshu.com/p/84ad63127cd1作者:Jeffbond 简介 Z...
    小北觅阅读 931评论 0 8
  • Zookeeper系列文章1.Zookeeper简介2.Zookeeper集群安装3.原生API操作Zookeep...
    deve_雨轩阅读 566评论 0 0
  • 时常听到心灵鸡汤:开心一天是过,不开心一天也是过,为何不开开心心度过每一天呢?简单朴素的话语直抵心间,可似乎总有些...
    Christy_22ba阅读 158评论 0 0
  • 我喜欢黑夜的颜色 我更喜欢在夜深人静的时候 用我的双眼 去看清这夜晚的城市 但我什么也看不见 此刻,我想到了鬼 只...
    柳四公子阅读 357评论 0 0