Zookeeper/Curator客户端

Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等。此外还有zkClient和Zooleeper自带的Java API。

添加依赖:
在pom.xml文件中添加如下内容即可。

  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.8.0</version>
  </dependency>
  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.8.0</version>
  </dependency>
  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>2.8.0</version>
  </dependency> 

创建会话:
Curator除了使用一般方法创建会话外,还可以使用fluent风格进行创建。

    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;

    public class Create_Session_Sample {
        public static void main(String[] args) throws Exception {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();
            client.start();
            System.out.println("Zookeeper session established. ");        
        }
    }

    运行结果: 
    Zookeeper session1 established. 
    Zookeeper session2 established. 

session会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。当然也可以不设置。

创建节点:
通过使用Fluent风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;

    public class Create_Node_Sample {
        public static void main(String[] args) throws Exception {
            String path = "/zk-book/c1";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                    .namespace("base")
                    .build();

            client.start();
            
            client.create().creatingParentsIfNeeded()
                           .withMode(CreateMode.EPHEMERAL)
                           .forPath(path, "i am c1".getBytes());

            System.out.println("success create znode: " + path);
        }
    }

    运行结果:
    success create znode: /zk-book/c1

其中,也创建了/base/zk-book/c1的父节点/base/zk-book节点。

删除节点:

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;

    public class Del_Data_Sample {
        public static void main(String[] args) throws Exception {
            String path = "/zk-book/c1";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                    .namespace("base")
                    .build();

            client.start();

            client.create().creatingParentsIfNeeded()
                           .withMode(CreateMode.EPHEMERAL)
                           .forPath(path, "i am c1".getBytes());

            System.out.println("success create znode: " + path);
            //以上,节点创建完成。

            Stat stat = new Stat();
            System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
            client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
            System.out.println("success delete znode " + path);
        }
    }
    
    运行结果: 
    i am c1
    success delete znode /zk-book/c1

获取数据:

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;

    public class Get_Data_Sample {
        public static void main(String[] args) throws Exception {
            String path = "/zk-book";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am c1".getBytes());
            Stat stat = new Stat();
            byte b[] = client.getData().storingStatIn(stat).forPath(path);
            System.out.println(new String(b));
        }
    }

    运行结果:
    i am c1

更新数据:

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;

    public class Set_Data_Sample {
        public static void main(String[] args) throws Exception {
            String path = "/zk-book";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am c1".getBytes());            

            stat = client.setData().withVersion(stat.getVersion()).forPath(path);
            System.out.println("Success set node for : " + path + ", new version: "+ stat.getVersion());
        }
    }

    运行结果:  
    Success set node for : /zk-book, new version: 1

异步接口:
如同Zookeeper原生API提供了异步接口,Curator也提供了异步接口。在Zookeeper中,所有的异步通知事件处理都是由EventThread这个线程来处理的,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间,从而影响其他事件的处理,Curator允许用户传入Executor实例,这样可以将比较复杂的事件处理放到一个专门的线程池中去。

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;

    public class Create_Node_Background_Sample {
        static String path = "/zk-book";
        static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        
        static CountDownLatch semaphore = new CountDownLatch(2);
        static ExecutorService tp = Executors.newFixedThreadPool(2);

        public static void main(String[] args) throws Exception {
            client.start();
            System.out.println("Main thread: " + Thread.currentThread().getName());

            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                                                     .inBackground(
                                                         new BackgroundCallback(){
                                                             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                                                                 System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                                                                 semaphore.countDown();
                                                             }
                                                         }, tp
                                                         )
                                                    .forPath(path, "init".getBytes());

            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                                                     .inBackground(
                                                         new BackgroundCallback(){
                                                             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                                                                 System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                                                                 semaphore.countDown();
                                                             }
                                                         }
                                                         )
                                                     .forPath(path, "init".getBytes());

            semaphore.await();
            tp.shutdown();
        }
    }
    
    运行结果:
    Main thread: main
    event[code: -110, type: CREATE], Thread of processResult: main-EventThread
    event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1

其中,创建节点的事件由线程池自己处理,而非默认线程处理。

节点监听:

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.NodeCache;
    import org.apache.curator.framework.recipes.cache.NodeCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;

    public class NodeCache_Sample {

        public static void main(String[] args) throws Exception {
            String path = "/zk-book/nodecache";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                                                            .connectString("127.0.0.1:2181")
                                                            .sessionTimeoutMs(5000)
                                                            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                                                            .namespace("base")
                                                            .build();
            client.start();

            //新建节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am nodecache".getBytes());
            
            //监听
            final NodeCache cache = new NodeCache(client, path, false);
            cache.start(true);
            cache.getListenable().addListener(new NodeCacheListener() {
                public void nodeChanged() throws Exception {
                    System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
                }
            });

            //更新节点
            client.setData().forPath(path, "u".getBytes());
            Thread.sleep(1000);
        }
    }

    运行结果:  
    Node data update, new data: u

当节点数据变更后收到了通知。NodeCache不仅可以监听数据节点的内容变更,也能监听指定节点是否存在。

子节点监听:

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;

    public class PathChildrenCache_Sample {

        public static void main(String[] args) throws Exception {
            String path = "/zk-book/nodecache";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                                                            .connectString("127.0.0.1:2181")
                                                            .sessionTimeoutMs(5000)
                                                            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                                                            .namespace("base")
                                                            .build();
            client.start();

            PathChildrenCache cache = new PathChildrenCache(client, path, true);
            cache.start(StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED," + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED," + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED," + event.getData().getPath());
                        break;
                    default:
                        break;
                    }
                }
            });

            client.create().withMode(CreateMode.PERSISTENT).forPath(path);
            client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
            client.delete().forPath(path + "/c1");
            Thread.sleep(1000);
        }
    }

    运行结果:
    CHILD_ADDED,/zk-book/c1
    CHILD_REMOVED,/zk-book/c1

监听节点的子节点,包括新增、数据变化、删除三类事件。

Master选举:
借助Zookeeper,开发者可以很方便地实现Master选举功能,其大体思路如下:选择一个根节点,如/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用Zookeeper特性,最终只有一台机器能够成功创建,成功的那台机器就是Master。

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.leader.LeaderSelector;
    import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
    import org.apache.curator.retry.ExponentialBackoffRetry;

    public class Recipes_MasterSelect {

        public static void main(String[] args) throws Exception {
        String path = "/zk-book/nodecache";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                                                            .connectString("127.0.0.1:2181")
                                                            .sessionTimeoutMs(5000)
                                                            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                                                            .namespace("base")
                                                            .build();
            client.start();

            LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
                public void takeLeadership(CuratorFramework client) throws Exception {
                    System.out.println("成为Master角色");
                    Thread.sleep(3000);
                    System.out.println("完成Master操作,释放Master权利");
                }
            });
            selector.autoRequeue();
            selector.start();
            Thread.sleep(1000);
        }
    }

    运行结果:
    成为Master角色
    完成Master操作,释放Master权利
    成为Master角色

以上结果会反复循环,并且当一个应用程序完成Master逻辑后,另外一个应用程序的相应方法才会被调用,即当一个应用实例成为Master后,其他应用实例会进入等待,直到当前Master挂了或者推出后才会开始选举Master。


参考:http://www.cnblogs.com/leesf456/p/6032716.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容