watcher的用法

https://blog.csdn.net/zkp_java/article/details/82711810

在进入watcher之前我们先试想在应用服务器集群中可能存在的两个问题:

因为集群中有很多机器,当某个通用的配置发生变化后,怎么让自动的让所有服务器的配置统一生效?

当集群中某个节点宕机,如何让集群中的其他节点知道?

为了解决这两个问题,zookeeper引入了watcher机制来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。

watcher基本原理

zookeeper中实现watcher需要有三个部分,如下图所示:

分别是zookeeper服务端、客户端以及客户端的watchManager。

如图所示,客户端向zk注册watcher的同时,会将客户端的watcher对象存储在客户端的WatchManager中;zk服务器触发watch事件后,会向客户端发送通知,客户端线程从watchManager中取出对应watcher执行。

客户端如何实现事件通知的动作

客户端只需定义一个类实现org.apache.zookeeper.Watcher接口并实现接口中的如下方法:

abstract public void process(WatchedEvent event);

1

即可在得到通知后执行相应的动作。参数org.apache.zookeeper.WatchedEvent是zk服务端传过来的事件,有三个成员:

final private KeeperState keeperState; // 通知状态

final private EventType eventType; // 事件类型

private String path; // 哪个节点发生的时间

1

2

3

分别代表通知的状态、事件类型和发生事件的节点。

keeperState是个枚举对象,代表客户端和zk服务器的链接状态,定义如下:

/**

* Enumeration of states the ZooKeeper may be at the event

*/

public enum KeeperState {

      /** Unused, this state is never generated by the server */

      @Deprecated

      Unknown (-1),

      /** The client is in the disconnected state - it is not connected

      * to any server in the ensemble. */

      Disconnected (0),

      /** Unused, this state is never generated by the server */

      @Deprecated

      NoSyncConnected (1),

    /** The client is in the connected state - it is connected

      * to a server in the ensemble (one of the servers specified

      * in the host connection parameter during ZooKeeper client

      * creation).

      * /

      SyncConnected (3),

      /**

      * Auth failed state

      */

      AuthFailed (4),

      /**

      * The client is connected to a read-only server, that is the

      * server which is not currently connected to the majority.

      * The only operations allowed after receiving this state is

      * read operations.

      * This state is generated for read-only clients only since

      * read/write clients aren't allowed to connect to r/o servers.

      */

      ConnectedReadOnly (5),

      /**

        * SaslAuthenticated: used to notify clients that they are SASL-authenticated,

        * so that they can perform Zookeeper actions with their SASL-authorized permissions.

        */

        SaslAuthenticated(6),

      /** The serving cluster has expired this session. The ZooKeeper

        * client connection (the session) is no longer valid. You must

        * create a new client connection (instantiate a new ZooKeeper

        * instance) if you with to access the ensemble.

        */

        Expired (-112);

        private final int intValue;    // Integer representation of value

                                        // for sending over wire

        KeeperState(int intValue) {

            this.intValue = intValue;

        }

        public int getIntValue() {

            return intValue;

        }

        public static KeeperState fromInt(int intValue) {

              switch(intValue) {

                  case  -1: return KeeperState.Unknown;

                  case    0: return KeeperState.Disconnected;

                  case    1: return KeeperState.NoSyncConnected;

                  case    3: return KeeperState.SyncConnected;

                  case    4: return KeeperState.AuthFailed;

                  case    5: return KeeperState.ConnectedReadOnly;

                  case    6: return KeeperState.SaslAuthenticated;

                  case -112: return KeeperState.Expired;

                  default:

                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");

              }

        }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

eventType也是个枚举类型,代表节点发生的事件类型,比如创建新的子节点、改变节点数据等,定义如下:

/**

* Enumeration of types of events that may occur on the ZooKeeper

*/

public enum EventType {

      None (-1),

      NodeCreated (1),

      NodeDeleted (2),

      NodeDataChanged (3),

      NodeChildrenChanged (4),

      DataWatchRemoved (5),

      ChildWatchRemoved (6);

      private final int intValue;    // Integer representation of value

                                      // for sending over wire

      EventType(int intValue) {

            this.intValue = intValue;

      }

      public int getIntValue() {

            return intValue;

      }

      public static EventType fromInt(int intValue) {

            switch(intValue) {

                case -1: return EventType.None;

                case  1: return EventType.NodeCreated;

                case  2: return EventType.NodeDeleted;

                case  3: return EventType.NodeDataChanged;

                case  4: return EventType.NodeChildrenChanged;

                case  5: return EventType.DataWatchRemoved;

                case  6: return EventType.ChildWatchRemoved;

                default:

                        throw new RuntimeException("Invalid integer value for conversion to EventType");

            }

      }         

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

keeperState和eventType对应关系如下所示:


对于NodeDataChanged事件:无论节点数据发生变化还是数据版本发生变化都会触发(即使被更新数据与新数据一样,数据版本都会发生变化)。

对于NodeChildrenChanged事件:新增和删除子节点会触发该事件类型。

需要注意的是:WatchedEvent只是事件相关的通知,并没有对应数据节点的原始数据内容及变更后的新数据内容,因此如果需要知道变更前的数据或变更后的新数据,需要业务保存变更前的数据和调用接口获取新的数据

如何注册watcher

watcher注册api

可以在创建zk客户端实例的时候注册watcher(构造方法中注册watcher):

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,ZKClientConfig conf)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig conf)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

1

2

3

4

5

6

7

8

9

ZooKeeper的构造方法中传入的watcher将会作为整个zk会话期间的默认watcher,该watcher会一直保存为客户端ZKWatchManager的defaultWatcher成员,如果有其他的设置,这个watcher会被覆盖。

除了可以通过ZooKeeper类的构造方法注册watcher外,还可以通过ZooKeeper类中其他一些api来注册watcher,只不过这些api注册的watcher就不是默认watcher了(以下每个注册watcher的方法有很多个重载的方法,就不一一列举出来)。

public List<String> getChildren(final String path, Watcher watcher)

// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher

public List<String> getChildren(String path, boolean watch)

// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher

public byte[] getData(String path, boolean watch, Stat stat)

public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)

// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher

public Stat exists(String path, boolean watch)

public Stat exists(final String path, Watcher watcher)

1

2

3

4

5

6

7

8

9

watcher注册示例代码

本示例中使用zookeeper自带客户端演示watcher的使用,zookeeper自带客户端有一点需要注意:

Watcher设置后,一旦触发一次即会失效,如果需要一直监听,则需要再注册

定义默认watcher:

/**

* 测试默认watcher

*/

public class DefaultWatcher implements Watcher {

    @Override

    public void process(WatchedEvent event) {

        System.out.println("==========DefaultWatcher start==============");

        System.out.println("DefaultWatcher state: " + event.getState().name());

        System.out.println("DefaultWatcher type: " + event.getType().name());

        System.out.println("DefaultWatcher path: " + event.getPath());

        System.out.println("==========DefaultWatcher end==============");

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

定义监听子节点变化的watcher:

/**

* 用于监听子节点变化的watcher

*/

public class ChildrenWatcher implements Watcher {

    @Override

    public void process(WatchedEvent event) {

        System.out.println("==========ChildrenWatcher start==============");

        System.out.println("ChildrenWatcher state: " + event.getState().name());

        System.out.println("ChildrenWatcher type: " + event.getType().name());

        System.out.println("ChildrenWatcher path: " + event.getPath());

        System.out.println("==========ChildrenWatcher end==============");

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

定义监听节点变化的watcher:

public class DataWatcher implements Watcher {

    @Override

    public void process(WatchedEvent event) {

        System.out.println("==========DataWatcher start==============");

        System.out.println("DataWatcher state: " + event.getState().name());

        System.out.println("DataWatcher type: " + event.getType().name());

        System.out.println("DataWatcher path: " + event.getPath());

        System.out.println("==========DataWatcher end==============");

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

watcher测试代码:

public class WatcherTest {

    /**

    * 链接zk服务端的地址

    */

    private static final String CONNECT_STRING = "192.168.0.113:2181";

    public static void main(String[] args) {

        // 除了默认watcher外其他watcher一旦触发就会失效,需要充新注册,本示例中因为

        // 还未想到比较好的重新注册watcher方式(考虑到如果在Watcher中持有一个zk客户端的

        // 实例可能存在循环引用的问题),因此暂不实现watcher失效后重新注册watcher的问题,

        // 后续可以查阅curator重新注册watcher的实现方法。

        // 默认watcher

        DefaultWatcher defaultWatcher = new DefaultWatcher();

        // 监听子节点变化的watcher

        ChildrenWatcher childrenWatcher = new ChildrenWatcher();

        // 监听节点数据变化的watcher

        DataWatcher dataWatcher = new DataWatcher();

        try {

            // 创建zk客户端,并注册默认watcher

            ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STRING, 100000, defaultWatcher);

            // 让默认watcher监听 /GetChildren 节点的子节点变化

            // zooKeeper.getChildren("/GetChildren", true);

            // 让childrenWatcher监听 /GetChildren 节点的子节点变化(默认watcher不再监听该节点子节点变化)

            zooKeeper.getChildren("/GetChildren", childrenWatcher);

            // 让dataWatcher监听 /GetChildren 节点本省的变化(默认watcher不再监听该节点变化)

            zooKeeper.getData("/GetChildren", dataWatcher, null);

            TimeUnit.SECONDS.sleep(1000000);

        } catch (Exception ex) {

            ex.printStackTrace();

        }

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

测试过程:

首先在命令行客户端创建节点 /GetChildren

[zk: localhost:2181(CONNECTED) 133] create /GetChildren GetChildrenData

Created /GetChildren

1

2

运行测试代码WatcherTest,输出如下内容:

==========DefaultWatcher start==============

DefaultWatcher state: SyncConnected

DefaultWatcher type: None

DefaultWatcher path: null

==========DefaultWatcher end==============

1

2

3

4

5

可以看出在客户端第一次链接zk服务端时触发了链接成功的事件通知,该事件由默认watcher接收,导致默认watcher相关代码得到执行。

接着在命令行客户端创建子节点:

[zk: localhost:2181(CONNECTED) 134] create /GetChildren/ChildNode ChildNodeData

Created /GetChildren/ChildNode

1

2

ChildrenWatcher收到通知,/GetChildren的子节点发生变化,因此输出如下内容:

==========ChildrenWatcher start==============

ChildrenWatcher state: SyncConnected

ChildrenWatcher type: NodeChildrenChanged

ChildrenWatcher path: /GetChildren

==========ChildrenWatcher end==============

1

2

3

4

5

最后在命令行客户端修改 /GetChildren 节点数据:

[zk: localhost:2181(CONNECTED) 135] set /GetChildren GetChildrenDataV2

cZxid = 0xab

ctime = Sat Sep 15 03:52:48 PDT 2018

mZxid = 0xb0

mtime = Sat Sep 15 04:06:05 PDT 2018

pZxid = 0xaf

cversion = 1

dataVersion = 1

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 17

numChildren = 1

1

2

3

4

5

6

7

8

9

10

11

12

DataWatcher收到通知,输出如下内容:

==========DataWatcher start==============

DataWatcher state: SyncConnected

DataWatcher type: NodeDataChanged

DataWatcher path: /GetChildren

==========DataWatcher end==============

1

2

3

4

5

我们可以接着在命令行客户端修改 /GetChildren 节点数据:

[zk: localhost:2181(CONNECTED) 136] set /GetChildren GetChildrenDataV3       

cZxid = 0xab

ctime = Sat Sep 15 03:52:48 PDT 2018

mZxid = 0xb1

mtime = Sat Sep 15 04:14:54 PDT 2018

pZxid = 0xaf

cversion = 1

dataVersion = 2

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 17

numChildren = 1

1

2

3

4

5

6

7

8

9

10

11

12

但WatcherTest没有任何输出了,说明DataWatcher已经失效了,要能够继续出发需要重新注册。

watcher实现源码分析

我们以注册watcher的 getData api为例,分析watcher的注册流程,以setData api为例,分析watcher的触发流程。

getData的实现在org.apache.zookeeper.ZooKeeper类中,具体代码如下:

public byte[] getData(final String path, Watcher watcher, Stat stat)

        throws KeeperException, InterruptedException

    {

        final String clientPath = path;

        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path

        WatchRegistration wcb = null;

        if (watcher != null) {

            wcb = new DataWatchRegistration(watcher, clientPath);

        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();

        h.setType(ZooDefs.OpCode.getData);

        GetDataRequest request = new GetDataRequest();

        request.setPath(serverPath);

        request.setWatch(watcher != null);

        GetDataResponse response = new GetDataResponse();

        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

        if (r.getErr() != 0) {

            throw KeeperException.create(KeeperException.Code.get(r.getErr()),

                    clientPath);

        }

        if (stat != null) {

            DataTree.copyStat(response.getStat(), stat);

        }

        return response.getData();

    }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

重点看:

......

WatchRegistration wcb = null;

if (watcher != null) {

      wcb = new DataWatchRegistration(watcher, clientPath);

}

......

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

......

1

2

3

4

5

6

7

8

我们首先看org.apache.zookeeper.ZooKeeper.DataWatchRegistration和org.apache.zookeeper.ZooKeeper.WatchRegistration类的实现代码:

/**

* Register a watcher for a particular path.

*/

public abstract class WatchRegistration {

    private Watcher watcher;

    private String clientPath;

    public WatchRegistration(Watcher watcher, String clientPath)

    {

        this.watcher = watcher;

        this.clientPath = clientPath;

    }

    abstract protected Map<String, Set<Watcher>> getWatches(int rc);

    /**

    * Register the watcher with the set of watches on path.

    * @param rc the result code of the operation that attempted to

    * add the watch on the path.

    */

    public void register(int rc) {

        if (shouldAddWatch(rc)) {

            Map<String, Set<Watcher>> watches = getWatches(rc);

            synchronized(watches) {

                  Set<Watcher> watchers = watches.get(clientPath);

                  if (watchers == null) {

                      watchers = new HashSet<Watcher>();

                      watches.put(clientPath, watchers);

                  }

                  watchers.add(watcher);

            }

        }

    }

    /**

    * Determine whether the watch should be added based on return code.

    * @param rc the result code of the operation that attempted to add the

    * watch on the node

    * @return true if the watch should be added, otw false

    */

    protected boolean shouldAddWatch(int rc) {

        return rc == 0;

    }

}

class DataWatchRegistration extends WatchRegistration {

      public DataWatchRegistration(Watcher watcher, String clientPath) {

            super(watcher, clientPath);

      }

      @Override

      protected Map<String, Set<Watcher>> getWatches(int rc) {

            return watchManager.dataWatches;

      }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

org.apache.zookeeper.ZooKeeper.DataWatchRegistration#getWatches方法是从org.apache.zookeeper.ZooKeeper.ZKWatchManager中获取保存watcher的一个HashMap:

private final Map<String, Set<Watcher>> dataWatches =

            new HashMap<String, Set<Watcher>>();

1

2

org.apache.zookeeper.ZooKeeper.WatchRegistration#register方法显然是注册一个watcher,该方法肯定会在后续流程得到调用,事实上在getData返回数据并且判断成功后就会调用该方法将watcher加入到ZKWatchManager中,我们稍后到了这一步流程在分析,这里先有个大概的了解。

我们回到getData发送请求的代码:

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

1

cnxn的类型是org.apache.zookeeper.ClientCnxn,进入到submitRequest方法:

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {

    return submitRequest(h, request, response, watchRegistration, null);

}

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException {

    ReplyHeader r = new ReplyHeader();

    Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);

    synchronized (packet) {

        while (!packet.finished) {

            packet.wait();

        }

    }

    return r;

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

先到此为止,等有空继续完善源码分析部分(源码分析描述起来太麻烦了)。。。。。。。

---------------------

作者:zkp_java

来源:CSDN

原文:https://blog.csdn.net/zkp_java/article/details/82711810

版权声明:本文为博主原创文章,转载请附上博文链接!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容