Zookeeper 高级内容

[TOC]

一、Zookeeper 一致性原理

  • 两阶段提交 two-phase commit (2PC)


    2阶段提交
  • 三阶段提交 three-phase commit (3PC)


    3阶段提交
  • Paxos算法


    Paxos算法
  • ZAB算法

ZAB协议是专门为zookeeper实现分布式协调功能而设计。zookeeper主要是根据ZAB协议是实现分布式系统数据一致性。

zookeeper根据ZAB协议建立了主备模型完成zookeeper集群中数据的同步。这里所说的主备系统架构模型是指,在zookeeper集群中,只有一台leader负责处理外部客户端的事物请求(或写操作),然后leader服务器将客户端的写操作数据同步到所有的follower节点中。

ZAB的协议核心是在整个zookeeper集群中只有一个节点即Leader将客户端的写操作转化为事物(或提议proposal)。Leader节点再数据写完之后,将向所有的follower节点发送数据广播请求(或数据复制),等待所有的follower节点反馈。在ZAB协议中,只要超过半数follower节点反馈OK,Leader节点就会向所有的follower服务器发送commit消息。即将leader节点上的数据同步到follower节点之上。

ZAB协议中主要有两种模式,第一是消息广播模式;第二是崩溃恢复模式


ZAB算法

二、Zookeeper集群特点

Zookeeper集群
  • 顺序一致性
  • 原子性
  • 单一视图
  • 可靠性
  • 实时性
  • 角色轮换避免单点故障

1.Leader 集群工作机制中的核心

  • 事务请求的唯一调度和处理者,保证集群事务处理的顺序性
  • 集群内部个服务器的调度者(管理follower,数据同步)

2.Follower 集群工作机制中的跟随者

  • 处理非事务请求,转发事务请求给Leader
  • 参与事务请求proposal投票
  • 参与leader选举投票

3.Observer (3.30以上版本提供)和follower功能相同,但不参与 任何形式投票

  • 处理非事务请求,转发事务请求给 Leader
  • 提高集群非事务处理能力
    配置:
peerType=observer
server.3=192.168.0.102:2888:3888:observer

TIPS

通过stat命令可以查看服务器角色

三、集群配置

1.配置文件zoo.cfg

clientPort=2181  
initLimit=5  
syncLimit=2  
peerType=observer
server.ID1=IP1:2888:3888  
server.ID2=IP2:2888:3888
server.ID3=IP3:nnnn:mmmm:observer

配置说明

IDn: server id,集群中机器序号
IPn: 机器ip地址
nnnn: 同步端口
mmmm: 选举端口
observer: 是否为观察者

2.创建myid 文件

在dataDir所配置的目录下,创建一个名为myid的文件,在该文件的第一行写上 一个数字,和zoo.cfg中当前机器的编号对应上

文件位置

3.配置其他机器

集群其他机器zoo.cfg和myid文件

4.验证服务器

使用stat命令


集群状态信息

四、Zookeeper集群一致性协议ZAB

ZAB 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广 播协议。在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性;
ZAB协议模式:

  • 消息广播
  • 崩溃恢复
    集群生命周期:


    集群生命周期

1.消息广播

消息广播模式

Step 1:
Leader 接收到消息请求后,将消息赋予一个全局唯一的 64 位自增 id,叫做:zxid ,通过 zxid 的大小比较即可实现因果有序这一特性。

Step 2:
Leader 通过先进先出队列(通过 TCP 协议来实现,以此实现了全局有序这一特性) 将带有 zxid 的消息作为一个提案(proposal)分发给所有 follower

Step 3:
当 follower 接收到 proposal,先将 proposal 写到硬盘,写硬盘成功后再向 leader 回一个 ACK

Step 4:
当 leader 接收到合法数量的 ACKs 后,leader 就向所有 follower 发送 COMMIT 命令,同事会在本地执行该消息。

Step 5:
当 follower 收到消息的 COMMIT 命令时,就会执行该消息

2.崩溃恢复

崩溃回复

Step 1:
每个Server会发出一个投票,第一次都是投自己。投票信息:(myid,ZXID)

Step 2:
收集来自各个服务器的投票

Step 3:
处理投票并重新投票,处理逻辑:优先比较ZXID,然后比较myid

Step 4:
统计投票,只要超过半数的机器接收到同样的投票信息,就可以确定leader

Step 5:
改变服务器状态

TIPS:

集群机器的数量必须是奇数

五、Java客户端连接集群

ZkClient client = new ZkClient("host1,host2,host3,host4,host5");

Zk客户端处理过程:解析→打散→形成环形地址列表队列


image

六、Zookeeper典型应用场景

  • 数据发布/订阅(配置中心)
  • 命名服务
  • 集群管理
  • Master选举
  • 分布式锁

1.数据发布/订阅(配置中心)

数据发布订阅

2.命名服务

数据发布订阅

3.集群管理

集群管理
  • 集群中多少机器在正常工作
  • 每台机器的运行时状态收集
  • 对集群中的机器进行上下线操作

4.Master选举

Master选举
  • 读写分离场景中,由master处理 读请求;
  • Master处理复杂的逻辑,并将处 理结果通知到其他集群机器;

5.分布式锁

分布式锁

分布式锁的思路:

分布式锁思路

缺点:惊群效应

优化后的思路:

分布式锁

七、Zookeeper使用注意事项

1. Zk数据与日志清理

dataDir目录、dataLogDir两个目录会随着时间推移变得庞大,容易造成硬盘满了,清理办法:

  • 自己编写shell脚本,保留最新的n个文件
  • 使用zk自带的zkClient.sh保留最新的n个文件,zkClient.sh –n 15
  • 配置autopurge.snapRetainCount和autopurge.purgeInterval两个参数配合使用;

2.Too many connections

配置maxClientCnxns参数,配置单个客户端机器创建的最大连接数;

3.磁盘管理

磁盘的I/O性能直接制约zookeeper更新操作速度,为了提高zk的写性能建议:

  • 使用单独的磁盘
  • Jvm堆内存设置要小心

4.磁盘管理集群数量

集群中机器的数量并不是越多越好,一个写操作需要半数以上的节点ack,所以集群节点数越多
,整个集群可以抗挂点的节点数越多(越可靠),但是吞吐量越差。集群的数量必须为奇数;

5.磁盘管理集群数量

zk是基于内存进行读写操作的,有时候会进行消息广播,因此不建议在节点存取容量比较大的数据;

八、Curator实例操作

1.Maven工程依赖引入

<dependencies>
        <!--Zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>
        <!--Curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.8.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.7.4</version>
        </dependency>
    </dependencies>

新建一个ZK工具类,如下所示:

package com.yongliang.zookeeper.demo;
import com.yongliang.zookeeper.demo.listener.ZkCuratorWatcher;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.util.List;
/**
 * Curator工具类
 *
 * @author zhangyongliang
 * @create 2019-01-12 14:07
 **/
public class CuratorUtils {
    public CuratorFramework client=null;
    private  static final String zkServerIps="192.168.92.20:2181,192.168.92.21:2181,192.168.92.22:2181";

    /**
     * 同步创建ZK实例,设置重连策略
     */
    public CuratorUtils() {
        //设置重连策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
        //实例化Curator客户端
        //使用工厂建立客户端对象
        client=  CuratorFrameworkFactory.builder()
        //放入ZK连接服务器IP
                .connectString(zkServerIps)
        //设定会话时间以及重连策略
                .sessionTimeoutMs(10000)
        //设定重连策略
                .retryPolicy(retryPolicy)
                .build();
        // 启动Curator客户端
        client.start();
    }
    //关闭客户端连接
    private  void closeZkClient(){
        if(client!=null){
            this.client.close();
        }
    }
    //获取当前客户端的状态
    private static void getZkStatus() {
        CuratorUtils curatorUtil=new CuratorUtils();
        boolean zkStatus=curatorUtil.client.isStarted();
        System.out.println("当前客户端的状态:" + (zkStatus ? "连接中..." : "已关闭..."));
    }
    //创建节点
    private  static  void createNonde()throws Exception{
        //节点路径
        String nodePath="/yongliang/testNode";
        //节点数据
        byte[] data="this is testZk Data".getBytes();
        CuratorUtils curatorUtil=new CuratorUtils();
        //创建父节点,递归创建
        String result=curatorUtil.client.create().creatingParentsIfNeeded()
       //创建持久节点
                      .withMode(CreateMode.PERSISTENT)
                      .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                      .forPath(nodePath,data);
        System.out.println(result+"节点,创建成功...");
        //关闭客户端
        curatorUtil.closeZkClient();
        //获取状态信息
        getZkStatus();
    }
    //更新节点
    private static  int updateNode()throws Exception{
        CuratorUtils curatorUtil=new CuratorUtils();
        //节点路径
        String nodePath="/yongliang/testNode";
        //更新数据
        byte[] data="this is new Data".getBytes();
        //指定数据版本
        Stat resultStatus=curatorUtil.client.setData().withVersion(0)
                          .forPath(nodePath,data);
        return resultStatus.getVersion();
    }
    //删除节点
    private static  void deleteNode()throws Exception{
        CuratorUtils curatorUtil=new CuratorUtils();
        //节点路径
        String nodePath="/yongliang/testNode";
        //删除节点, 如果删除失败会继续删除
        curatorUtil.client.delete().guaranteed()
        //子节点也递归删除
                   .deletingChildrenIfNeeded()
        //删除对应的版本
                   .withVersion(1)
                   .forPath(nodePath);
        curatorUtil.closeZkClient();
        getZkStatus();
    }
    //查询节点信息
    private static String getNodeInfo()throws Exception{
        CuratorUtils curatorUtil=new CuratorUtils();
        Stat statInfo=new Stat();
        //节点路径
        String nodePath="/yongliang/testNode";
        byte[] nodeData=curatorUtil.client.getData().storingStatIn(statInfo).forPath(nodePath);
        System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData));
        System.out.println("该节点的数据版本号为:" + statInfo.getVersion());
        return  new String(nodeData);
    }
    //获取父节点的子节点列表信息
    private static List<String> getNodeListInfo()throws Exception{
        CuratorUtils curatorUtil=new CuratorUtils();
        List<String> nodeInfoList=null;
        //节点路径
        String nodePath="/yongliang/testNode";
        nodeInfoList=curatorUtil.client.getChildren().forPath(nodePath);
        System.out.println(nodePath + " 节点下的子节点列表:");
        for (String childNode : nodeInfoList) {
            System.out.println(childNode);
        }
        curatorUtil.closeZkClient();
        return  nodeInfoList;
    }
    //查询节点是否存在
    private static   boolean isNodeModify() throws Exception{
        CuratorUtils curatorUtil=new CuratorUtils();
        //节点路径
        String nodePath="/yongliang/testNode";
        // 查询某个节点是否存在,存在就会返回该节点的状态信息,如果不存在的话则返回空
        Stat  statExist = curatorUtil.client.checkExists().forPath(nodePath);
        if (statExist != null) {
            System.out.println(nodePath + " 节点存在");
            return  true;
        }
        curatorUtil.closeZkClient();
        return  false;
    }
    //Watcher 实现 修改节点数据就会触发监听 (有效性1次)
    private static void wathcerZK()throws Exception{
        CuratorUtils curatorUtil=new CuratorUtils();
        //节点路径
        String nodePath="/yongliang/testNode";
        curatorUtil.client.getData().usingWatcher(new ZkCuratorWatcher()).forPath(nodePath);
    }
    //一次注册,多次监听(只针对NodeChance事件)
    private  static void periestWathcerZk()throws Exception{
        CuratorUtils curatorUtil=new CuratorUtils();
        //节点路径
        String nodePath="/yongliang/testNode";
        // NodeCache: 缓存节点,并且可以监听数据节点的变更,会触发事件
        final NodeCache nodeCache = new NodeCache(curatorUtil.client, nodePath);
        // 参数 buildInitial : 初始化的时候获取node的值并且缓存
        nodeCache.start(true);
        // 获取缓存里的节点初始化数据
        if (nodeCache.getCurrentData() != null) {
            System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
        } else {
            System.out.println("节点初始化数据为空...");
        }
        // 为缓存的节点添加watcher,或者说添加监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            // 节点数据change事件的通知方法
            @Override
            public void nodeChanged() throws Exception {
                // 防止节点被删除时发生错误
                if (nodeCache.getCurrentData() == null) {
                    System.out.println("获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除");
                    return;
                }
                // 获取节点最新的数据
                String data = new String(nodeCache.getCurrentData().getData());
                System.out.println(nodeCache.getCurrentData().getPath() + " 节点的数据发生变化,最新的数据为:" + data);
            }
        });
    }
    //监听一个节点下的所有事件
    private static  void pathCildWatcher()throws Exception{
        CuratorUtils curatorUtil=new CuratorUtils();
        //节点路径
        String nodePath="/yongliang/testNode";
          final String PARENT_NODE_PATH = "/super";  // 父节点
        // PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorUtil.client, PARENT_NODE_PATH, true);
        /**
         * StartMode: 初始化方式
         * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
         * NORMAL:异步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         */
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        // 列出子节点数据列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得,异步是获取不到的
        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("当前节点的子节点详细数据列表:");
        for (ChildData childData : childDataList) {
            System.out.println("\t* 子节点路径:" + new String(childData.getPath()) + ",该节点的数据为:" + new String(childData.getData()));
        }
        // 添加事件监听器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通过判断event type的方式来实现不同事件的触发
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                    System.out.println("\n--------------\n");
                    System.out.println("子节点初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                    if (event.getData().getPath().trim().equals(nodePath)) {
                        System.out.println("\n--------------\n");
                        System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                        System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                    }
                    System.out.println("\n--------------\n");
                    System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                    System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                    if (event.getData().getPath().trim().equals(nodePath)) {
                        System.out.println("\n--------------\n");
                        System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                    }
                    System.out.println("\n--------------\n");
                    System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                    if (event.getData().getPath().trim().equals(nodePath)) {
                        System.out.println("\n--------------\n");
                        System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                        System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                    }
                    System.out.println("\n--------------\n");
                    System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                    System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                }
            }
        });

    }
    public static void main(String[] args) throws Exception {
//        getZkStatus();
//        createNonde();
//       int result=updateNode();
//        System.out.println(result);
//        deleteNode();
//        String reuslt=getNodeInfo();
//        System.out.println(reuslt);
//        getNodeListInfo();
//        isNodeModify();
    }

}


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • 简介 ZooKeeper是一个开源的分布式协调服务,由雅虎创建,是Google Chubby的开源实现。ZooKe...
    whoami2019阅读 642评论 1 2
  • 转自:https://www.jianshu.com/p/84ad63127cd1作者:Jeffbond 简介 Z...
    小北觅阅读 933评论 0 8
  • 声明:本文写的时候,当时就是完全不懂zk,边看网上的文章边学习归纳和整理,这不是我的产出,不用点赞打赏。大家理智友...
    _Zy阅读 76,022评论 38 129
  • 一个真正的写数据流程是怎么样的?一个真正的读数据流程是怎么样的?一个真正的同步数据流程是怎么样的?从哪里到哪里?什...
    时待吾阅读 4,015评论 0 14
  • 我喜欢不迟到的人。 喜欢不说谎的人。 喜欢有真性情的人。 我也喜欢沉默的说话适可而止的人。 喜欢说出的话与行动相符...
    8979af45a4a0阅读 280评论 0 1