Zookeeper学习

zk 题外话:paxos 与 raft 理解

paxos算法就是为了解决分布式的一致性问题的
最大的用途就是保持多个节点数据一致
由于leader的地位不清晰,就有了投票选举leader的操作,那个leader发出去,再收回来被认可的票数多,最终提案就会被统一

raft协议比paxos的优点是 容易理解,容易实现。它强化了leader的地位,把整个协议可以清楚的分割成两个部分,并利用日志的连续性做了一些简化:

  • leader在时。由leader向follower同步日志
  • leader挂掉了,选一个新leader,leader选举算法。

zookeeper是一个分布式协调框架

一 理论

  1. ZNode 节点状态:(前两个和单独,也可和顺序性节点组合)

持久性节点: Persistent

临时性节点:Ephemeral

顺序性节点:Sequential

  1. 事务ID

首先,事务是对物理和抽象的应用状态上的操作集合。一般事务通常会想到数据库的事务操作,包括ACID特性,即:原子性Atomic,一致性Consistency,隔离性Isolation,和持久性Durability

而在 zookeeper 中:

  • 事务是指能够改变 Zookeeper 服务器状态的操作,也称之为事务操作或更新操作

  • 一般包含数据节点 ZNode 的crud操作

  • 每个事务请求,zk都会分配一个全局唯一的事务ID,即 ZXID ,通常是一个64位的数字

    (每一次更新操作都对应一个 ZXID,间接突出zk处理更新操作的全局执行顺序)

image-20200917150902762.png
image-20200917150920641.png
  1. Watcher 数据更新变更通知 (监听)

zk 使用 Watcher 机制实现发布式数据的发布、订阅功能;

订阅关系为一对多,可以让多个订阅者同时监听同一个主题对象。

当主题对象发行变化时,会通知所有订阅者,让所有订阅者做出相应处理。zk引用watcher机制来实现这种分布式通知功能。

zk允许客户端向服务端注册一个watcher监听,当服务端的一些指定事件触发了这个watcher,那么这个watcher就会向指定客户端发送事件通知来实现分布式的通知功能;

下图展示了watch的三部分 与 这三部分的工作流程执行逻辑

image-20200917153422558.png
  1. ACL 保障数据的安全

它是保障风不是系统运行状态的数据安全的,避免因误操作带来的数据变更而导致数据异常

ACL ( Access Control List ) :

​ 权限模式 Scheme, 授权对象 ID,权限 Permission,一般使用 "scheme:id:permission" 来标识一个有效的ACL信息。

image-20200917154918353.png
image-20200917154950032.png
image-20200917155010551.png
  1. zk的crud

5.1 create 创建

image-20200917160126366.png
./zkCli.sh 连接本地的zk服务器
./zkCli.sh -server ip:port 连接指定的服务器

create /zk-test1 content123 创建一个永久节点
create -s /zk-test2 content123 创建一个顺序(-s)节点

create -e /zk-test1 content123 创建一个临时节点,会话异常或关闭节点就会被删除

5.2 读取节点

ls path  显示path目录下面的节点,跟Linux的一致,列这个指定目录的节点

get path 或者对应目录节点的信息,可以展示内容什么的
image-20200917160617740.png

5.3 更新

set path data [version]  path是节点连接和名称,data是节点的内容,version可以给这个节点指定版本

5.4 删除

delete path [version] 删除某个节点,版本可以自由指定,也可以不带,执行了,再ls就看不到这个节点了

单一删除有限制,必须先删除子节点,再删除父节点

但是也有父子检测轮循删除的机制:代码中调用有对应的支持方法()

二 Code API

api的使用:

导入jar支持

<dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.4.14</version>
 </dependency>
  1. 建立会话:
public class CreateSession implements Watcher {
    //countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException,
            IOException {
         /*
         客户端可以通过创建⼀个zk实例来连接zk服务器
         new Zookeeper(connectString,sesssionTimeOut,Wather)
         connectString: 连接地址:IP:端⼝
         sesssionTimeOut:会话超时时间:单位毫秒
         Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
         */
        ZooKeeper zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new
                CreateSession());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();
        //表示会话真正建⽴
        System.out.println("=========Client Connected to
                zookeeper == ========");
    }
    
    // 当前类实现了Watcher接⼝,重写了process⽅法,
    // 该⽅法负责处理来⾃Zookeeper服务端的watcher通知,在收到服务端发送过来的SyncConnected事件之后,
    // 解除主程序在CountDownLatch上的等待阻塞,⾄此,会话创建完毕
    public void process(WatchedEvent watchedEvent) {
        //当连接创建了,服务端发送给客户端SyncConnected事件
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            countDownLatch.countDown();
        }
    }
}
image-20200917161759271.png
  1. 创建节点
public class CreateNote implements Watcher {
    
    //countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new CreateNote());
        countDownLatch.await();
    }

    public void process(WatchedEvent watchedEvent) {
        //当连接创建了,服务端发送给客户端SyncConnected事件
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            countDownLatch.countDown();
        }
        
        //调⽤创建节点⽅法
        try {
            createNodeSync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void createNodeSync() throws Exception {
        /**
         * path :节点创建的路径
         * data[] :节点创建要保存的数据,是个byte类型的
         * acl :节点创建的权限信息(4种类型)
         * ANYONE_ID_UNSAFE : 表示任何⼈
         * AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替
         换。
         * OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)-->
         world:anyone
         * CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
         * createMode :创建节点的类型(4种类型)
         * PERSISTENT:持久节点
         * PERSISTENT_SEQUENTIAL:持久顺序节点
         * EPHEMERAL:临时节点
         * EPHEMERAL_SEQUENTIAL:临时顺序节点
         String node = zookeeper.create(path,data,acl,createMode);
         */
        String node_PERSISTENT = zooKeeper.create("/lg_persistent", "持久节点内容".getBytes(" utf - 8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String node_PERSISTENT_SEQUENTIAL =
                zooKeeper.create("/lg_persistent_sequential", "持久节点内容".getBytes("utf-8"),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        String node_EPERSISTENT = zooKeeper.create("/lg_ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        System.out.println("创建的持久节点是:" + node_PERSISTENT);
        System.out.println("创建的持久顺序节点是:" + node_PERSISTENT_SEQUENTIAL);
        System.out.println("创建的临时节点是:" + node_EPERSISTENT);
    }
}
  1. 获取节点数据
public class GetNoteData implements Watcher {
    //countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("10.211.55.4:2181", 10000, new
                GetNoteDate());
        Thread.sleep(Integer.MAX_VALUE);

    }

    public void process(WatchedEvent watchedEvent) {
        //⼦节点列表发⽣变化时,服务器会发出NodeChildrenChanged通知,但不会把变化情况告
        诉给客户端
        // 需要客户端⾃⾏获取,且通知是⼀次性的,需反复注册监听
        if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
            //再次获取节点数据
            try {
                List<String> children =
                        zooKeeper.getChildren(watchedEvent.getPath(), true);
                System.out.println(children);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //当连接创建了,服务端发送给客户端SyncConnected事件
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {

            try {
                //调⽤获取单个节点数据⽅法
                getNoteDate();
                getChildrens();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static void getNoteData() throws Exception {
        /**
          path : 获取数据的路径
          watch : 是否开启监听
          stat : 节点状态信息
          null: 表示获取最新版本的数据
          zk.getData(path, watch, stat);
         */
        byte[] data = zooKeeper.getData("/lg_persistent/lg-children", true, null);
        System.out.println(new String(data, "utf-8"));
    }

    private static void getChildrens() throws KeeperException, InterruptedException {
         /*
             path:路径
             watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听
             zooKeeper.getChildren(path, watch);
         */
        List<String> children = zooKeeper.getChildren("/lg_persistent", true);
        System.out.println(children);
    }
}
  1. 修改节点数据
public class updateNote implements Watcher {
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new updateNote());
        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent watchedEvent) {
        //当连接创建了,服务端发送给客户端SyncConnected事件
        try {
            updateNodeSync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void updateNodeSync() throws Exception {
        /*
            path:路径
            data:要修改的内容 byte[]
            version:为-1,表示对最新版本的数据进⾏修改
            zooKeeper.setData(path, data,version);
        */
        byte[] data = zooKeeper.getData("/lg_persistent", false, null);
        System.out.println("修改前的值:" + new String(data));
        //修改 stat:状态信息对象 -1:最新版本
        Stat stat = zooKeeper.setData("/lg_persistent", "客户端修改内容".getBytes(), -1);
        byte[] data2 = zooKeeper.getData("/lg_persistent", false, null);
        System.out.println("修改后的值:" + new String(data2));
    }
}
  1. 删除节点
public class DeleteNote implements Watcher {
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new DeleteNote());
        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent watchedEvent) {
        //当连接创建了,服务端发送给客户端SyncConnected事件
        try {
            deleteNodeSync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void deleteNodeSync() throws KeeperException, InterruptedException {
        /*
            zooKeeper.exists(path,watch) :判断节点是否存在
            zookeeper.delete(path,version) : 删除节点
        */
        Stat exists = zooKeeper.exists("/lg_persistent/lg-children", false);
        System.out.println(exists == null ? "该节点不存在" : "该节点存在");
        zooKeeper.delete("/lg_persistent/lg-children", -1);
        Stat exists2 = zooKeeper.exists("/lg_persistent/lg-children", false);
        System.out.println(exists2 == null ? "该节点不存在" : "该节点存在");
    }
}

三 Code Client

client 的使用:

导入jar支持

<dependency>
     <groupId>com.101tec</groupId>
     <artifactId>zkclient</artifactId>
     <version>0.2</version>
 </dependency>
  1. 建立会话:
import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;

public class CreateSession {
    /*
        创建⼀个zkClient实例来进⾏连接
        注意:zkClient通过对zookeeperAPI内部包装,将这个异步的会话创建过程同步化了
    */
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        System.out.println("ZooKeeper session established.");
    }
}
  1. 创建节点
import org.I0Itec.zkclient.ZkClient;

public class Create_Node_Sample {
    
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        System.out.println("ZooKeeper session established.");
        //createParents的值设置为true,可以递归创建节点
        zkClient.createPersistent("/lg-zkClient/lg-c1", true);
        System.out.println("success create znode.");
    }
}
  1. 删除节点
import org.I0Itec.zkclient.ZkClient;

public class Del_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/lg-zkClient/lg-c1";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.deleteRecursive(path);
        System.out.println("success delete znode.");
    }
}
  1. 获取子节点
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

public class Get_Children_Sample {

    public static void main(String[] args) throws Exception {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        List<String> children = zkClient.getChildren("/lg-zkClient");
        System.out.println(children);

        //注册监听事件
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
            }
        });

        zkClient.createPersistent("/lg-zkClient");
        Thread.sleep(1000);
        zkClient.createPersistent("/lg-zkClient/c1");
        Thread.sleep(1000);
        zkClient.delete("/lg-zkClient/c1");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运行结果:

/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null
  1. 获取数据(节点是否存在、更新、删除)
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

public class Get_Data_Sample {
    public static void main(String[] args) throws InterruptedException {
        String path = "/lg-zkClient-Ep";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        //判断节点是否存在
        boolean exists = zkClient.exists(path);

        if (!exists) {
            zkClient.createEphemeral(path, "123");
        }

        //注册监听
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataChange(String path, Object data) throws Exception {
                System.out.println(path + "该节点内容被更新,更新后的内容" + data);
            }

            public void handleDataDeleted(String s) throws Exception {
                System.out.println(s + " 该节点被删除");
            }
        });
        
        //获取节点内容
        Object o = zkClient.readData(path);
        System.out.println(o);
        //更新
        zkClient.writeData(path, "4567");
        Thread.sleep(1000);
        //删除
        zkClient.delete(path);
        Thread.sleep(1000);
    }
}

运行结果:

123
/lg-zkClient-Ep该节点内容被更新,更新后的内容4567
/lg-zkClient-Ep 该节点被删除

四 Code Curator 客户端

Curator 的使用:

这个跟client是类似的,只是编程风格为Fluent的风格,就是lombok那种链式的操作,点点点那种

导入jar支持

<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>2.12.0</version>
 </dependency>
  1. 建立会话:
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
    
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
  1. 通过调用 CuratorFramework中的 Start() 方法来启动会话:
// 片段一
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);

CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);

client.start();
// 片段二
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);

CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
 5000,1000,retryPolicy);

client.start();
image-20200917165622577.png
// 片段三
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);

private static CuratorFramework Client = CuratorFrameworkFactory.builder()
 .connectString("server1:2181,server2:2181,server3:2181")
 .sessionTimeoutMs(50000)
 .connectionTimeoutMs(30000)
 .retryPolicy(retryPolicy)
 .build();

client.start();
image-20200917165649927.png
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Create_Session_Sample {
    
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);
        client.start();

        System.out.println("Zookeeper session1 established. ");
        CuratorFramework client1 = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181") //server地址
                .sessionTimeoutMs(5000) // 会话超时时间
                .connectionTimeoutMs(3000) // 连接超时时间
                .retryPolicy(retryPolicy) // 重试策略
                .namespace("base") // 独立命名空间/base
                .build(); //

        client1.start();
        System.out.println("Zookeeper session2 established. ");
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355