Zookeeper 开源客户端Curator处理事件监听

事件监听

Zookeeper原生就支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。Curator引入了Cache来实现对zookeeper服务端事件的监听,Cache是Curator中对事件的包装,其对事件的监听其实可以近似的看做是一个本地缓存视图和远程Zookeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听,从而大大简化原生API开发的繁琐过程。Cache分为NodeCache,PathChildrenCache和TreeCache。

NodeCache

NodeCache用于监听指定ZooKeeper数据节点本身的变化,其构造方法有如下两个:

public NodeCache(CuratorFramework client, String path);
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

NodeCache构造方法参数说明如下:

  • client:Curator客户端实例
  • path:数据节点的节点路径
  • dataIsCompressed:是否进行数据压缩

同时NodeCache定义了事件处理的回调接口NodeCacheListener如下:

public interface NodeCacheListener{
    /**
     * Called when a change has occurred
     */
    public void nodeChanged() throws Exception;
}

当ZNode的内容发生变化时,就会回调该方法。下面通过一个实际的例子来看如何在代码中使用NodeCache。

public class NodeCache_Sample {

    static String path = "/zk-book/nodecache";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
            .sessionTimeoutMs(60000)
            .connectionTimeoutMs(15000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path,"init".getBytes());
        final NodeCache cache = new NodeCache(client,path,false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = cache.getCurrentData();
                String data = currentData == null ? "" : new String(currentData.getData());
                System.out.println("=====> Node data update, new Data: "+data);
            }
        });

        client.setData().forPath(path,"i love you".getBytes());
        Thread.sleep(1000);
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(10000);
        cache.close();
        client.close();
    }

NodeCache使用start()方法开启缓存,使用完后调用close()方法关闭它。在上面的例子中,首先构造了一个NodeCache实例,然后再调用start方法,该方法有一个Boolean类型的参数,默认是false,如果设置为true,那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取该节点的数据内容,并保存在Cache中。

NodeCache不仅可以用于监听数据节点的内容变更,也可能监听指定节点是否存在。如果原节点不存在,那么Cache就会在节点被创建后触发NodeCacheListenter。节点的删除操作也会触发NodeCacheListenter有很多文章说删除不会触发其实是错误的,我测试的时候是可以触发的。

PathChildrenCache

PathChildrenCache是用来监听指定节点 的子节点变化情况,共有以下几种构造方法(不包括过时的方法):

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService);
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService);

PathChildrenCache构造方法参数说明:

  • client: Curator客户端实例
  • path:数据节点路径
  • cacheData:用于配置是否把节点内容缓存起来,如果配置为true。那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容;如果配置为false则无法获取到节点的数据内容。
  • dataIsCompressed:是否进行数据压缩
  • threadFactory:利于这个参数,开发者可以通过构造一个专门的线程池,来处理事件通知
  • executorService:自定义线程池

同时PathChildrenCache定义了事件处理的回调接口PathChildrenCacheListener,其定义如下:

public interface PathChildrenCacheListener{
    /**
     * Called when a change has occurred
     *
     * @param client the client
     * @param event describes the change
     * @throws Exception errors
     */
    public void     childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}

当指定节点的子节点发生变化时,就会回调该方法。PathChildrenCacheListener类中定义了所有的事件类型,主要包括新增子节点(CHILD_ADDED)、子节点数据变更(CHILD_UPDATED)、和子节点的删除(CHILD_REMOVED)三类。
下面通过一个实际例子来看看如何在代码中使用PathChildrenCache。

public class PathChildrenCache_Sample {
    static String path = "/zk-book2";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
            .sessionTimeoutMs(60000)
            .connectionTimeoutMs(15000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client,path,true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("=====> CHILD_ADDED : "+ event.getData().getPath() +"  数据:"+ new String(event.getData().getData()));
                        break;
                    case CHILD_REMOVED:
                        System.out.println("=====> CHILD_REMOVED : "+ event.getData().getPath() +"  数据:"+ new String(event.getData().getData()));
                        break;
                    case CHILD_UPDATED:
                        System.out.println("=====> CHILD_UPDATED : "+ event.getData().getPath() +"  数据:"+ new String(event.getData().getData()));
                        break;
                    default:
                        break;
                }
            }
        });
        client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
        Thread.sleep(1000);
        client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
        Thread.sleep(1000);
        client.setData().forPath(path+"/c1","I love you".getBytes());
        Thread.sleep(1000);
        client.delete().forPath(path+"/c1");
        Thread.sleep(1000);
        client.delete().forPath(path);
        Thread.sleep(10000);
        cache.close();
        client.close();
    }
}

在上面的示例程序中,对/zk-book2节点进行了子节点变更事件的监听,一旦该节点新增/删除子节点或者子节点的数据发生改变时,就会回调PathChildrenCacheListener,并根据对应的事件类型进行相关的处理。同时,我们看到,对于节点/zk-book2本身的变更,并没有通知到客户端。
另外,和其他ZooKeeper客户端产品一样,Curator也无法对二级子节点进行事件监听。也就是说,如果使用PathChildrenCacheListener对/zk-book2进行监听,那么当/zk-book/c1/c2节点被创建或者删除的时候,是无法触发子节点变更事件的。

TreeCache

结合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听。共有以下几种构造方法:

public TreeCache(CuratorFramework client, String path);
TreeCache只有一个构造方法,其内部还有一个默认访问修饰符修饰的构造方法,如下:
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector);

TreeCache构造方法参数说明:

  • client: Curator客户端实例
  • path:数据节点路径
  • cacheData:用于配置是否把节点内容缓存起来,如果配置为true。那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容;如果配置为false则无法获取到节点的数据内容。
  • dataIsCompressed:是否进行数据压缩
  • maxDepth:比如当前监听节点/t1,目录最深为/t1/t2/t3/t4,则maxDepth=3,说明下面3级子目录全监听,即监听到t4,如果为2,则监听到t3,对t3的子节点操作不再触发,默认maxDepth最大值2147483647
  • executorService:自定义线程池
  • createParentNodes:如果父节点不存在,是否需要创建父节点,默认情况下,不会自动创建path的父节点
  • TreeCacheSelector:用于区分哪些节点作为缓存使用

下面通过一个实际例子来看看如何在代码中使用TreeCache。

public class TreeCache_Sample {

    static String path = "/zk-book3";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.58.42:2181")
            .sessionTimeoutMs(60000)
            .connectionTimeoutMs(15000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();
        TreeCache cache = new TreeCache(client,path);

        cache.start();

        //添加错误监听器
        cache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
            @Override
            public void unhandledError(String s, Throwable throwable) {
                System.out.println("======>  错误原因:" + throwable.getMessage());
            }
        });

        //节点变化的监听器
        cache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                switch (treeCacheEvent.getType()) {
                    case INITIALIZED:
                        System.out.println("=====> INITIALIZED :  初始化");
                        break;
                    case NODE_ADDED:
                        System.out.println("=====> NODE_ADDED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ new String(treeCacheEvent.getData().getData()));
                        break;
                    case NODE_REMOVED:
                        System.out.println("=====> NODE_REMOVED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ new String(treeCacheEvent.getData().getData()));
                        if("/zk-book3".equals(treeCacheEvent.getData().getPath())){
                            throw new RuntimeException("=====> 测试异常监听UnhandledErrorListener");
                        }
                        break;
                    case NODE_UPDATED:
                        System.out.println("=====> NODE_UPDATED : "+ treeCacheEvent.getData().getPath() +"  数据:"+ new String(treeCacheEvent.getData().getData()));
                        break;
                    default:
                        System.out.println("=====> treeCache Type:" + treeCacheEvent.getType());
                        break;
                }
            }
        });
        client.create().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
        Thread.sleep(3000);
        client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
        Thread.sleep(3000);
        client.setData().forPath(path+"/c1","I love you".getBytes());
        Thread.sleep(3000);
        client.delete().forPath(path+"/c1");
        Thread.sleep(3000);
        client.delete().forPath(path);
        Thread.sleep(10000);
        cache.close();
        client.close();
    }
}

原文

https://blog.csdn.net/qq_34021712/article/details/82877236

原文

https://blog.csdn.net/qq_34021712/article/details/82877236

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容