面试官:Zookeeper了解吗?说说都有哪些使用场景?

点赞再看,养成习惯,微信搜一搜【一角钱技术】关注更多原创技术文章。
本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。

前言

前两篇讲了Zookeeper的特性、客户端使用和集群原理,因为 Zookeeper 是分布式系统中很常见的一个基础系统。 而且问的话常问的就是说 zookeeper 的使用场景是什么? 看你知道不知道一些基本的使用场景。 但是其实 Zookeeper 挖深了自然是可以问的很深很深的。本文主要来聊聊 Zookeeper 主要的几个使用场景。

  1. 分布式集群管理
  2. 分布式注册中心
  3. 分布式JOB
  4. 分布式锁

分布式集群管理

分布式集群管理的需求

  1. 主动查看线上服务节点
  2. 查看服务节点资源使用情况
  3. 服务离线通知
  4. 服务资源(CPU、内存、硬盘)超出阀值通知

架构设计

节点结构

  1. niuh-manger // 根节点
  2. server00001 : //服务节点 1
  3. server00002 ://服务节点 2
  4. server........n ://服务节点 n

服务状态信息

  1. ip
  2. cpu
  3. memory
  4. disk

功能实现

数据生成与上报

  1. 创建临时节点:
  2. 定时变更节点状态信息:

主动查询

  1. 实时查询 zookeeper 获取集群节点的状态信息。

被动通知

  1. 监听根节点下子节点的变化情况,如果CPU 等硬件资源低于警告位则发出警报。

关键示例代码

package com.niuh.os;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.I0Itec.zkclient.ZkClient;

import java.lang.instrument.Instrumentation;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.UnknownHostException;

public class Agent {
    private static Agent ourInstance = new Agent();
    private String server = "127.0.0.1:2181";
    private ZkClient zkClient;
    private static final String rootPath = "/niuh-manger";
    private static final String servicePath = rootPath + "/service";
    private String nodePath; ///niuh-manger/service0000001 当前节点路径
    private Thread stateThread;


    public static Agent getInstance() {
        return ourInstance;
    }

    private Agent() {
    }

    // javaagent 数据监控
    public static void premain(String args, Instrumentation instrumentation) {
        Agent.getInstance().init();
    }

    public void init() {
        zkClient = new ZkClient(server, 5000, 10000);
        System.out.println("zk连接成功" + server);
        // 创建根节点
        buildRoot();
        // 创建临时节点
        createServerNode();
        // 启动更新的线程
        stateThread = new Thread(() -> {
            while (true) {
                updateServerNode();
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "zk_stateThread");
        stateThread.setDaemon(true);
        stateThread.start();
    }

    // 数据写到 当前的临时节点中去
    public void updateServerNode() {
        zkClient.writeData(nodePath, getOsInfo());
    }

    // 生成服务节点
    public void createServerNode() {
        nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo());
        System.out.println("创建节点:" + nodePath);
    }

    // 更新服务节点状态
    public String getOsInfo() {
        OsBean bean = new OsBean();
        bean.lastUpdateTime = System.currentTimeMillis();
        bean.ip = getLocalIp();
        bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu();
        MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
        bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024;
        bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024;
        bean.pid = ManagementFactory.getRuntimeMXBean().getName();
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsString(bean);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static String getLocalIp() {
        InetAddress addr = null;
        try {
            addr = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
        return addr.getHostAddress();
    }

    public void buildRoot() {
        if (!zkClient.exists(rootPath)) {
            zkClient.createPersistent(rootPath);
        }
    }
}

实现效果

启动参数设置



运行测试用例:

package com.niuh.test;

import com.niuh.os.Agent;
import org.junit.Ignore;
import org.junit.Test;

public class AgentTest {

    @Test
    @Ignore
    public void initTest() {
        Agent.premain(null, null);
        runCPU(2); //20% 占用
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //
    private void runCPU(int count) {
        for (int i = 0; i < count; i++) {
            new Thread(() -> {
                while (true) {
                    long bac = 1000000;
                    bac = bac >> 1;
                }
            }).start();
            ;
        }
    }
}

控制台输出:

CPU 报警...22.55120088850181
CPU 报警...46.06592086097357
CPU 报警...47.87206766163349
CPU 报警...49.49176420213768
CPU 报警...48.967942479969004
CPU 报警...49.193921607021565
CPU 报警...48.806604284784676
CPU 报警...48.63229912951865
CPU 报警...49.34509647972038
CPU 报警...47.07551108884401
CPU 报警...49.18489236134496
CPU 报警...49.903007346777066
CPU 报警...49.28868795953268
// 关闭测试用例
服务已下线:OsBean{ip='192.168.43.11', cpu=49.28868795953268, usedMemorySize=56, usableMemorySize=3641, pid='47192@hejianhui', lastUpdateTime=1602056208842}

本Demo不适用在生产环境,示例Demo涉及组件zookeeper-agent、zookeeper-web。源代码提交在 githubhttps://github.com/Niuh-Frame/niuh-zookeeper

分布式注册中心

在单体式服务中,通常是由多个客户端去调用一个服务,只要在客户端中配置唯一服务节点地址即可,当升级到分布式后,服务节点变多,像一线大厂服务节点更是上万之多,这么多节点不可能手动配置在客户端,这里就需要一个中间服务,专门用于帮助客户端发现服务节点,即许多技术书籍经常提到的服务发现

一个完整的注册中心涵盖以下功能特性:

  • 服务注册:提供者上线时将自提供的服务提交给注册中心。
  • 服务注销:通知注册心提供者下线。
  • 服务订阅:动态实时接收服务变更消息。
  • 可靠:注册服务本身是集群的,数据冗余存储。避免单点故障,及数据丢失。
  • 容错:当服务提供者出现宕机,断电等极情况时,注册中心能够动态感知并通知客户端服务提供者的状态。

Dubbo 对 Zookeeper的使用

阿里著名的开源项目Dubbo 是一个基于JAVA的RCP框架,其中必不可少的注册中心可基于多种第三方组件实现,但其官方推荐的还是Zookeeper做为注册中心服务。


Dubbo Zookeeper注册中心存储结构

节点说明

类别 属性 说明
Root 持久节点 根节点名称,默认是 "dubbo"
Service 持久节点 服务名称,完整的服务类名
type 持久节点 可选值:providers(提供者)、consumers(消费者)、configurators(动态配置)、routers
URL 临时节点 url名称 包含服务提供者的 IP 端口 及配置等信息。

流程说明

  1. 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
  2. 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
  3. 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。

示例Demo

服务端代码

package com.niuh.zk.dubbo;

import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import com.alibaba.dubbo.config.ServiceConfig;

import java.io.IOException;

public class Server {
    public void openServer(int port) {
        // 构建应用
        ApplicationConfig config = new ApplicationConfig();
        config.setName("simple-app");

        // 通信协议
        ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port);
        protocolConfig.setThreads(200);

        ServiceConfig<UserService> serviceConfig = new ServiceConfig();
        serviceConfig.setApplication(config);
        serviceConfig.setProtocol(protocolConfig);
        serviceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        serviceConfig.setInterface(UserService.class);
        UserServiceImpl ref = new UserServiceImpl();
        serviceConfig.setRef(ref);
        //开始提供服务  开张做生意
        serviceConfig.export();
        System.out.println("服务已开启!端口:"+serviceConfig.getExportedUrls().get(0).getPort());
        ref.setPort(serviceConfig.getExportedUrls().get(0).getPort());
    }

    public static void main(String[] args) throws IOException {
        new Server().openServer(-1);
        System.in.read();
    }
}

客户端代码

package com.niuh.zk.dubbo;

import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
import com.alibaba.dubbo.config.RegistryConfig;

import java.io.IOException;

public class Client {
    UserService service;

    // URL 远程服务的调用地址
    public UserService buildService(String url) {
        ApplicationConfig config = new ApplicationConfig("young-app");
        // 构建一个引用对象
        ReferenceConfig<UserService> referenceConfig = new ReferenceConfig<UserService>();
        referenceConfig.setApplication(config);
        referenceConfig.setInterface(UserService.class);
        // referenceConfig.setUrl(url);
        referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        referenceConfig.setTimeout(5000);
        // 透明化
        this.service = referenceConfig.get();
        return service;
    }

    static int i = 0;

    public static void main(String[] args) throws IOException {
        Client client1 = new Client();
        client1.buildService("");
        String cmd;
        while (!(cmd = read()).equals("exit")) {
            UserVo u = client1.service.getUser(Integer.parseInt(cmd));
            System.out.println(u);
        }
    }


    private static String read() throws IOException {
        byte[] b = new byte[1024];
        int size = System.in.read(b);
        return new String(b, 0, size).trim();
    }
}

查询 zk 实际存储内容:

/dubbo
/dubbo/com.niuh.zk.dubbo.UserService
/dubbo/com.niuh.zk.dubbo.UserService/configurators
/dubbo/com.niuh.zk.dubbo.UserService/routers

/dubbo/com.niuh.zk.dubbo.UserService/providers
/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo://192.168.43.11:20880/com.niuh.zk.dubbo.UserService?anyhost=true&application=simple-app&dubbo=2.6.2&generic=false&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=48302&side=provider&threads=200&timestamp=1602057895881

/dubbo/com.niuh.zk.dubbo.UserService/consumers
/dubbo/com.niuh.zk.dubbo.UserService/consumers/consumer://192.168.43.11com.niuh.zk.dubbo.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=49036&side=consumer&timeout=5000&timestamp=1602075359549

示例Demo涉及组件zookeeper-dubbo。源代码提交在 githubhttps://github.com/Niuh-Frame/niuh-zookeeper

分布式JOB

分布式JOB需求

  1. 多个服务节点只允许其中一个主节点运行JOB任务。
  2. 当主节点挂掉后能自动切换主节点,继续执行JOB任务。

架构设计

node结构

  1. niuh-master
  2. server0001:master
  3. server0002:slave
  4. server000n:slave

选举流程

服务启动:

  1. 在niuh-maste下创建server子节点,值为slave
  2. 获取所有niuh-master 下所有子节点
  3. 判断是否存在master 节点
  4. 如果没有设置自己为master节点

子节点删除事件触发:

  1. 获取所有niuh-master 下所有子节点
  2. 判断是否存在master 节点
  3. 如果没有设置最小值序号为master 节点

示例Demo

package com.niuh.zookeeper.master;

import org.I0Itec.zkclient.ZkClient;

import java.util.Map;
import java.util.stream.Collectors;

public class MasterResolve {
    private String server = "127.0.0.1:2181";
    private ZkClient zkClient;
    private static final String rootPath = "/niuh-master";
    private static final String servicePath = rootPath + "/service";
    private String nodePath;
    private volatile boolean master = false;
    private static MasterResolve resolve;

    private MasterResolve() {
        zkClient = new ZkClient(server, 2000, 5000);
        buildRoot();
        createServerNode();
    }

    public static MasterResolve getInstance() {
        if (resolve == null) {
            resolve= new MasterResolve();
        }
        return resolve;
    }


    // 构建根节点
    public void buildRoot() {
        if (!zkClient.exists(rootPath)) {
            zkClient.createPersistent(rootPath);
        }
    }

    // 创建server节点
    public void createServerNode() {
        nodePath = zkClient.createEphemeralSequential(servicePath, "slave");
        System.out.println("创建service节点:" + nodePath);
        initMaster();
        initListener();
    }



    private void initMaster() {
        boolean existMaster = zkClient.getChildren(rootPath)
                .stream()
                .map(p -> rootPath + "/" + p)
                .map(p -> zkClient.readData(p))
                .anyMatch(d -> "master".equals(d));
        if (!existMaster) {
            doElection();

            System.out.println("当前当选master");
        }
    }
    private void initListener() {
        zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> {
            doElection();//  执行选举
        });
    }
    // 执行选举
    public void doElection() {
        Map<String, Object> childData = zkClient.getChildren(rootPath)
                .stream()
                .map(p -> rootPath + "/" + p)
                .collect(Collectors.toMap(p -> p, p -> zkClient.readData(p)));
        if (childData.containsValue("master")) {
            return;
        }

        childData.keySet().stream().sorted().findFirst().ifPresent(p -> {
            if (p.equals(nodePath)) { // 设置最小值序号为master 节点
                zkClient.writeData(nodePath, "master");
                master = true;
                System.out.println("当前当选master" + nodePath);
            }
        });

    }


    public static boolean isMaster() {
        return getInstance().master;
    }
}

示例Demo涉及组件zookeeper-master。源代码提交在 github

分布式锁

锁的的基本概念

开发中锁的概念并不陌生,通过锁可以实现在多个线程或多个进程间在争抢资源时,能够合理的分配置资源的所有权。在单体应用中我们可以通过 synchronizedReentrantLock 来实现锁。但在分布式系统中,仅仅是加synchronized 是不够的,需要借助第三组件来实现。比如一些简单的做法是使用关系型数据行级锁来实现不同进程之间的互斥,但大型分布式系统的性能瓶颈往往集中在数据库操作上。为了提高性能得采用如Redis、Zookeeper之内的组件实现分布式锁。

共享锁:也称作只读锁,当一方获得共享锁之后,其它方也可以获得共享锁。但其只允许读取。在共享锁全部释放之前,其它方不能获得写锁。

排它锁:也称作读写锁,获得排它锁后,可以进行数据的读写。在其释放之前,其它方不能获得任何锁。

锁的获取

某银行帐户,可以同时进行帐户信息的读取,但读取其间不能修改帐户数据。其帐户ID为:888

获得读锁流程

  1. 基于资源ID创建临时序号读锁节点 /lock/888.R0000000002 Read
  2. 获取 /lock 下所有子节点,判断其最小的节点是否为读锁,如果是则获锁成功
  3. 最小节点不是读锁,则阻塞等待。添加lock/ 子节点变更监听。
  4. 当节点变更监听触发,执行第2步

数据结构

获得写锁

  1. 基于资源ID创建临时序号写锁节点 /lock/888.R0000000002 Write
  2. 获取 /lock 下所有子节点,判断其最小的节点是否为自己,如果是则获锁成功
  3. 最小节点不是自己,则阻塞等待。添加lock/ 子节点变更监听。
  4. 当节点变更监听触发,执行第2步

释放锁

读取完毕后,手动删除临时节点,如果获锁期间宕机,则会在会话失效后自动删除。

关于羊群效应

在等待锁获得期间,所有等待节点都在监听 Lock节点,一但lock 节点变更所有等待节点都会被触发,然后在同时反查Lock 子节点。如果等待对例过大会使用Zookeeper承受非常大的流量压力。

为了改善这种情况,可以采用监听链表的方式,每个等待对列只监听前一个节点,如果前一个节点释放锁的时候,才会被触发通知。这样就形成了一个监听链表。

示例Demo

package com.niuh.zookeeper.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;
import java.util.stream.Collectors;

public class ZookeeperLock {
    private String server = "127.0.0.1:2181";
    private ZkClient zkClient;
    private static final String rootPath = "/niuh-lock1";

    public ZookeeperLock() {
        zkClient = new ZkClient(server, 5000, 20000);
        buildRoot();
    }

    // 构建根节点
    public void buildRoot() {
        if (!zkClient.exists(rootPath)) {
            zkClient.createPersistent(rootPath);
        }
    }
    // 获取锁
    public Lock lock(String lockId, long timeout) {
        // 创建临时节点
        Lock lockNode = createLockNode(lockId);
        lockNode = tryActiveLock(lockNode);// 尝试激活锁
        if (!lockNode.isActive()) {
            try {
                synchronized (lockNode) {
                    lockNode.wait(timeout); // 线程锁住
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        if (!lockNode.isActive()) {
            throw new RuntimeException(" lock  timeout");
        }
        return lockNode;
    }

    // 释放锁
    public void unlock(Lock lock) {
        if (lock.isActive()) {
            zkClient.delete(lock.getPath());
        }
    }

    // 尝试激活锁
    private Lock tryActiveLock(Lock lockNode) {

        // 获取根节点下面所有的子节点
        List<String> list = zkClient.getChildren(rootPath)
                .stream()
                .sorted()
                .map(p -> rootPath + "/" + p)
                .collect(Collectors.toList());      // 判断当前是否为最小节点

        String firstNodePath = list.get(0);
        // 最小节点是不是当前节点
        if (firstNodePath.equals(lockNode.getPath())) {
            lockNode.setActive(true);
        } else {
            String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
            zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {

                }

                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    // 事件处理 与心跳 在同一个线程,如果Debug时占用太多时间,将导致本节点被删除,从而影响锁逻辑。
                    System.out.println("节点删除:" + dataPath);
                     Lock lock = tryActiveLock(lockNode);
                    synchronized (lockNode) {
                        if (lock.isActive()) {
                            lockNode.notify(); // 释放了
                        }
                    }
                    zkClient.unsubscribeDataChanges(upNodePath, this);
                }
            });
        }
        return lockNode;
    }


    public Lock createLockNode(String lockId) {
        String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
        return new Lock(lockId, nodePath);
    }
}

示例Demo涉及组件zookeeper-lock。源代码提交在 githubhttps://github.com/Niuh-Frame/niuh-zookeeper

部分图片来源于网络,版权归原作者,侵删。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354