Zookeeper入门之五-常见应用场景及代码

Master选举实现

思路:选择一个根节点,例如/master_select,多台机器同时向该节点创建一个子节点 /master_select/lock,利用zk的特性,最终只有一台机器能够创建成功,这台机器就是master

 static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .namespace("master_select")
            .build();

    public static void main(String[] args) throws InterruptedException {

        zkFluentClient.start();

        String selectPath = "/master_select";

        LeaderSelector selector = new LeaderSelector(zkFluentClient, selectPath, new LeaderSelectorListenerAdapter() {
            @Override
            // 需要注意的是,一旦执行完这个方法,curator就会立即释放Master的权利,然后重新开始新一轮的Master选举
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                System.out.println("Be a Leader");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("释放 Leader ");
            }
        });

        selector.autoRequeue();

        selector.start();

        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);

    }

如果同时有2个请求,可以看到交替执行,创建2个临时节点:

[zk: localhost:2181(CONNECTED) 21] ls /master_select/master_select
[_c_93265fd6-4b11-4668-baf8-e4211a8d1b5f-lock-0000000067, _c_36a0c859-efb1-442a-9dff-26121e7a1a7e-lock-0000000068]

这里的临时节点,在master失效的时候就会被删除。

一旦takeLeaderShip执行结束,master的就会被释放,然后重新开始新一轮的master选举。

分布式锁

使用InterProcessMutex来做分布式锁处理

public class DistributeLockTest {

    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .namespace("lock")
            .build();

    public static void main(String[] args) {
        zkFluentClient.start();

        final InterProcessMutex lock = new InterProcessMutex(zkFluentClient, "/distribute_lock");

        final CountDownLatch latch = new CountDownLatch(1);

        for (int i = 0; i < 30; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        latch.await();
                        lock.acquire(); // 获取锁

                        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                        String orderNo = sdf.format(Date.from(Instant.now()));
                        System.out.println("OrderNo is:" + orderNo);

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }


                }
            }).start();
        }

        latch.countDown();// 这里很有意思,在主线程启动了几十个线程之后,这些线程都是hold住的(通过 countDownLatch.await()方法)
        // 然后主线程处理latch.countDown(),导致所有子线程同时满足触发条件,同时执行,保证并发。不过仅用在测试环节比较合适。
        // 其实latch可以去掉,只是这样并发没有那么集中。
    }
}
分布式计数器

思路很类似,用上述分布式锁的思路。

比如统计在线人数,指定zk的一个数据节点作为计数器,多个应用实例在分布式锁的控制下,通过更新该数据节点的内容来实现计数功能。

public class DistributeCounterTest {

    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .namespace("counter")
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(new ExponentialBackoffRetry(800, 5))
            .build();

    public static void main(String[] args) throws Exception {

        zkFluentClient.start();

        // 计数器
        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(zkFluentClient, "/adder",
                new RetryNTimes(3, 1000));

        AtomicValue<Integer> rc = atomicInteger.add(8);
        rc = atomicInteger.increment();
        rc = atomicInteger.decrement();
        atomicInteger.increment();

        System.out.println("Result:" + rc.succeeded());
        System.out.println("preValue:" + rc.preValue() + ",postValue:" + rc.postValue());
        System.out.println();

        String value = new String(zkFluentClient.getData().forPath("/adder"));
        System.out.println(value);

        // 试着重新取
        DistributedAtomicInteger newAtomicInteger = new DistributedAtomicInteger(zkFluentClient, "/adder", new RetryNTimes(3, 800));
        System.out.println(newAtomicInteger.get().preValue() + "_" + newAtomicInteger.get().postValue());
    }
}

可以看到,只要同一个路径下,对应的DistributeAtomicInteger的对象值都是同一个,可以随时创建一个对象直接使用。

分布式Barrier

先看一个JDK自带的CyclicBarrier,先看下CyclicBarrier的说明:

CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。线程进入屏障通过CyclicBarrier的await()方法。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

实现原理:在CyclicBarrier的内部定义了一个Lock对象,每当一个线程调用CyclicBarrier的await方法时,将剩余拦截的线程数减1,然后判断剩余拦截数是否为0,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁,接着先从await方法返回,再从CyclicBarrier的await方法中返回。

CyclicBarrier主要用于一组线程之间的相互等待,而CountDownLatch一般用于一组线程等待另一组线程。实际上可以通过CountDownLatch的countDown()和await()来实现CyclicBarrier的功能。即 CountDownLatch中的countDown()+await() = CyclicBarrier中的await()。注意:在一个线程中先调用countDown(),然后调用await()。

先看代码

public class DistributeCyclicBarrierTest {

    static CyclicBarrier jdkBarrier = new CyclicBarrier(3);
    

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(new Thread(new JdkBasedRuner("jinsiyu")));
        executorService.execute(new Thread(new JdkBasedRuner("AMANDA")));
        executorService.execute(new Thread(new JdkBasedRuner("QQ")));
    }

    static class JdkBasedRuner implements Runnable {

        private String name;

        public JdkBasedRuner(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(name + " Ready!!!");

            try {
                jdkBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

            System.out.println(name + " GO!!");
        }
    }
}

结果如下:

jinsiyu Ready!!!
AMANDA Ready!!!
QQ Ready!!!

QQ GO!!
jinsiyu GO!!
AMANDA GO!!

可以看到,只有当CyclicBarrier中的值为0时,才会统一执行其后的操作,也就是“XXX GO”的语句打印。

而如果这里jdkBarrier如果设置的为4,那么下面三句“XXX GO”的语句根本不会打印,会一直等待。

ZK下的实现:

    static DistributedBarrier distributedBarrier;

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectionTimeoutMs(3000)
            .sessionTimeoutMs(5000)
            .connectString("localhost:32770")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .namespace("cyclicBarrier").build();

.......
        zkClient.start();
                // distribute
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    distributedBarrier = new DistributedBarrier(zkClient, "/barrier");
                    System.out.println(Thread.currentThread().getName() + "号barrier设置");
                    try {
                        distributedBarrier.setBarrier(); // 看实现,就是在create节点
                        distributedBarrier.waitOnBarrier(); // 等待,直到remove
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "Starting...");
                }
            }).start();
        }
        TimeUnit.SECONDS.sleep(2);
        distributedBarrier.removeBarrier(); // delete节点

ZKPaths & EnsurePath
public class ZKPathsTest {

    static String path = "/zkpath_sample";

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();

        System.out.println(ZKPaths.fixForNamespace(path,"/sub"));
        System.out.println(ZKPaths.makePath(path,"/sub"));
        System.out.println(ZKPaths.getNodeFromPath("/zkpath_sample/sub1")); // 不存在节点也不会报错,从路径str中截取


        ZKPaths.PathAndNode pn = ZKPaths.getPathAndNode("/zkpath_sample/sub1"); // 获取节点,不存在也不会报错,只是从路径上截取
        System.out.println(pn.getPath());
        System.out.println(pn.getNode());

        // 获取zookeeper,这个是干啥的?
        ZooKeeper zooKeeper = zkClient.getZookeeperClient().getZooKeeper();

        String dir1 = path + "/child1";
        String dir2 = path + "/child2";
        ZKPaths.mkdirs(zooKeeper,dir1); // 创建目录,如果存在不会报错,也不会抛异常
        ZKPaths.mkdirs(zooKeeper,dir2);

        System.out.println(ZKPaths.getSortedChildren(zooKeeper,path)); // 获取已排序的子节点

        ZKPaths.deleteChildren(zooKeeper,path,false); // 删除子节点,如果最后一个参数为true,会删除本身
    }
}
public class EnsurePathTest {

    static String path = "/path2";

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectionTimeoutMs(3000)
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .connectString("localhost:32770")
            .namespace("ensure")
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();

        EnsurePath ensurePath = new EnsurePath(path);  // 这里跟namespace没关系,只会从根目录下开始建,所以是绝对路径了
        ensurePath.ensure(zkClient.getZookeeperClient());

        EnsurePath ensurePath1 = zkClient.newNamespaceAwareEnsurePath("/c2"); // 用这个方法,namespace生效
        ensurePath1.ensure(zkClient.getZookeeperClient());
    }

}

不过EnsurePath貌似已经不推荐使用了。

顺序节点
    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .namespace("sequence-jin")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();
        // 创建顺序节点
        zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                .forPath("/seq");
    }

执行多次后,结果如下:

ls /sequence-jin
[seq0000000001, seq0000000000, seq0000000002]

可以看到,顺序节点。。。就是如此

可以通过临时节点来代替心跳,来判断client端是否存在。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容