zookeeper通过使用watcher可以实现发布订阅的功能,实际上就是基于监听的事件触发。
示例
以下是在zk上创建一个Node存储app的配置信息,然后监听配置变化来做出相应的动作。
模拟配置信息类
public class SampleConf {
    private String url;
    private int port;
    private String name;
    private String password;
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder("{");
        sb.append("\"url\":\"")
                .append(url).append('\"');
        sb.append(",\"port\":")
                .append(port);
        sb.append(",\"name\":\"")
                .append(name).append('\"');
        sb.append(",\"password\":\"")
                .append(password).append('\"');
        sb.append('}');
        return sb.toString();
    }
}
简单封装的zk工具类
import com.alibaba.fastjson.JSON;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class SimpleZKUtils {
    private SimpleZKUtils() {
    }
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleZKUtils.class);
    private static final String hostPort = "localhost:2181";
    private static ZooKeeper zk;
    static {
        try {
            zk = new ZooKeeper(hostPort, 3000, event -> System.out.println(JSON.toJSONString(event)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static Stat set(String path, String data) {
        try {
            Stat stat = zk.exists(path, false);
            if (null != stat) {
                return zk.setData(path, data.getBytes(), stat.getVersion());
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    public static String get(String path) {
        try {
            Stat stat = zk.exists(path, false);
            if (null != stat) {
                return new String(zk.getData(path, true, stat));
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    public static String get(String path, Watcher watcher) {
        try {
            Stat stat = zk.exists(path, false);
            if (null != stat) {
                return new String(zk.getData(path, watcher, stat));
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    public static void create(String path, String data) {
        try {
            Stat stat = zk.exists(path, false);
            if (null == stat) {
                zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } else {
                throw new RuntimeException("node is already existed");
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void del(String path) {
        try {
            Stat stat = zk.exists(path, false);
            if (null != stat) {
                zk.delete(path, stat.getVersion());
            } else {
                throw new RuntimeException("node is already existed");
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
测试
- 创建ZNode
    public static final String basePath = "/app1";
    public static final String path = "/app1/conf";
    private static SampleConf sampleConf = null;
    @Test
    public void testCreate() {
        SimpleZKUtils.create(basePath,"");
        SimpleZKUtils.create(path, "");
    }
    @Test
    public void configure() {
        SampleConf sampleConf = new SampleConf();
        sampleConf.setName("jk");
        sampleConf.setUrl("localhost");
        sampleConf.setPort(2181);
        sampleConf.setPassword("helloworld");
        SimpleZKUtils.set(path, JSON.toJSONString(sampleConf));
        System.out.println(SimpleZKUtils.get(path));
    }
- 监听
@Test
    public void testPubSub() {
        String conf = SimpleZKUtils.get(path, new ConfigWatcher((watcher) -> {
            System.out.println("watcher execute");
            sampleConf = JSON.parseObject(SimpleZKUtils.get(path, watcher), SampleConf.class);
            System.out.println(JSON.toJSONString(sampleConf));
        }));
        System.out.println(conf);
        //  阻塞线程以查看监听触发的动作
        LockSupport.park();
    }
    // Watcher实现类
    static class ConfigWatcher implements Watcher {
        private Consumer<Watcher> myWatch;
        ConfigWatcher(Consumer<Watcher> myWatch) {
            this.myWatch = myWatch;
        }
        @Override
        public void process(WatchedEvent event) {
            if (event.getType().equals(Watcher.Event.EventType.NodeDataChanged)) {
                // 使用此方式是为了把watcher实例设置到zk的get方法里面去
                myWatch.accept(this);
            }
        }
    }
因为zk的watcher是一次性的,所以每次在触发事件时需要设置watcher才能在后续的事件发生时继续响应,此处我套了个Consumer接口来复用最外层的watcher实例,因为在lambda表达式里面没法直接传this。使用匿名内部类可以解决:
    @Test
    public void testPubSub2() {
        String conf = SimpleZKUtils.get(path, new Watcher(){
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("watcher execute");
                sampleConf = JSON.parseObject(SimpleZKUtils.get(path, this), SampleConf.class);
                System.out.println(JSON.toJSONString(sampleConf));
            }
        });
        System.out.println(conf);
        LockSupport.park();
    }