Dubbo之ZookeeperRegistry源码分析

ZookeeperRegistry的作用

ZookeeperRegistry是dubbo中常用的注册中心实现,它主要作用通过Zookeeper的目录监听机制,让消费者能够实时得到在线的提供者列表。并且一些服务治理的功能也是通过zookeeper这个监听特性巧妙的完成。

在具体讲解ZookeeperRegistry的相关源码之前,先来分析下dubbo在zookeeper的目录结构以及dubbo如何利用这个特性

Zookeeper目录结构

dubbo在zookeeper建立的目录是基于接口的,大致如下


image.png

针对每个接口节点会存在以下4个子节点

节点名 作用 子节点是否持久节点
consumers 存储消费者节点url
configuators 存储override或者absent url,用于服务治理
routers 用于设置路由url,用于服务治理
providers 存储在线提供者url

consumer节点存在的意义并不大,主要还是为了做监控
其他三个节点,都会设置被相应的监听器,发生改变时,会触发特定事件

Dubbo对Zookeeper监听机制的利用

Dubbo中通过ZookeeperClient的实现类来对zookeeper进行操作


image.png

ZookeeperClient提供设置两种监听器的方法,对应子节点监听器和状态监听器,这里我们关注子节点监听器ChildListener

public interface ChildListener {

    /**
     *
     * @param path 监听的节点
     * @param children 监听的节点的所有子节点
     */
    void childChanged(String path, List<String> children);

}

ZookeeperClient有两种实现,第一种通过官方提供的jar包,第二个通过Apache的Curator框架,默认使用第二种,我们讲解的也是Curator的对应实现
添加子节点监听器的方法为addChildListener

public List<String> addChildListener(String path, final ChildListener listener) {
        //对listener做缓存,因为ChildListener是dubbo提供的监听器接口,需要转换为cruator的监听器接口
        ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
        if (listeners == null) {
            childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
            listeners = childListeners.get(path);
        }
        TargetChildListener targetListener = listeners.get(listener);
        if (targetListener == null) {
            //createTargetChildListener会对监听器进行转换
            listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
            targetListener = listeners.get(listener);
        }
        return addTargetChildListener(path, targetListener);
    }

Dubbo底层封装了2套Zookeeper API,所以通过ChildListener抽象了监听器,但是在实际调用时会通过createTargetChildListener转为对应框架的监听器实现
addTargetChildListener方法在添加监听器之后会返回监听path当前的所有的子节点

public List<String> addTargetChildListener(String path, CuratorWatcher listener) {
        try {
            //添加监听,并且返回这个目录当前所有子节点
            //这种监听方式是一次性的,在listener实现中会再次执行监听逻辑
            return client.getChildren().usingWatcher(listener).forPath(path);
        } catch (NoNodeException e) {
            return null;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

上述代码需要注意监听是一次性的,其实curator提供了TreeCache用作永久性的监听,这边不用到这个特性,应该是为了和官方API保持一致吧。
接下去看下Cruator监听器的封装

public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {
        return new CuratorWatcherImpl(listener);
    }
private class CuratorWatcherImpl implements CuratorWatcher {

        private volatile ChildListener listener;

        public CuratorWatcherImpl(ChildListener listener) {
            this.listener = listener;
        }

        public void unwatch() {
            this.listener = null;
        }

        @Override
        public void process(WatchedEvent event) throws Exception {
            if (listener != null) {
                String path = event.getPath() == null ? "" : event.getPath();
                listener.childChanged(path,
                        // if path is null, curator using watcher will throw NullPointerException.
                        // if client connect or disconnect to server, zookeeper will queue
                        // watched event(Watcher.Event.EventType.None, .., path = null).
                        StringUtils.isNotEmpty(path)
                                //再次设置监听,并且把监听path的所有子节点传入childChanged方法
                                ? client.getChildren().usingWatcher(this).forPath(path)
                                : Collections.<String>emptyList());
            }
        }
    }

可以看到listener的触发逻辑以及入参来源

源码分析

image.png

通过ZookeeperRegistry的类继承图,逐上而下的分析源码

Registry接口

public interface Registry extends Node, RegistryService {
}

Registry继承Node和RegistryService两个接口,本身不提供接口方法

public interface Node {

    /**
     * get url.
     *
     * @return url.
     */
    URL getUrl();

    /**
     * is available.
     *
     * @return available.
     */
    boolean isAvailable();

    /**
     * destroy.
     */
    void destroy();

}

Node约束了三个生命周期相关的方法
getUrl用于获取当前组件的url配置
isAvailable检测组件是否可用
destroy用于销毁组件

public interface RegistryService {

   
    void register(URL url);

    void unregister(URL url);

    void subscribe(URL url, NotifyListener listener);

    void unsubscribe(URL url, NotifyListener listener);

    List<URL> lookup(URL url);

}

RegistryService规定了和注册中心相关的方法
register和unregister用于提供者向注册中心注册提供者url
subscribe和unsubscribe用于消费者向对应接口目录注册监听
lookup用于查找查找url,通过消费者url查找提供者url以及服务治理有关的url

AbstractRegistry

主要提供接口提供者本地缓存功能
以及基础register,unregister,subscribe,unsubscribe,notify,lookup,recover逻辑

register,unregister会(接触)注册提供者url,主要操作

private final Set<URL> registered = new ConcurrentHashSet<URL>();

subscribe,unsubscribe则会针对特定url提供监听,主要操作

private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

notify方法会缓存最近通知的url到notified以及触发listener回调

/**
     * 这个方法不会直接触发,被FailbackRegistry重载
     * FailbackRegistry增加failback逻辑后,还是会调用这个方法
     * @param url
     * @param listener
     * @param urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        //根据url的category进行分类
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        //下面操作notified缓存
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            //对notified内容进行覆盖,相当于会保存上一次的通知
            categoryNotified.put(category, categoryList);
            //每次通知后会刷新本地缓存
            saveProperties(url);
            //进行listener回调,每种category的url分别回调一次
            listener.notify(categoryList);
        }
    }

这个类的recover方法不分析,因为FailbackRegistry完全重写了这个方法

FailbackRegistry

FailbackRegistry重载了AbstractRegistry中的subscribe,unsubscribe,register,unregister,notify方法,在AbstractRegistry的基础上提供了失败重试机制,并且暴露模板方法doRegister,doUnregister,doSubscribe,doUnsubscribe让不同类型的注册中心实现。doNotify还是默认父类的逻辑。
同时也重载了recover方法,通过FailbackRegistry的重试机制实现recover

以registry方法作为样例看下添加的重试机制

/**
     * register行为,提供者使用
     * 在AbstractRegistry的基础上,增加失败重试机制
     * @param url
     */
    @Override
    public void register(URL url) {
        super.register(url);
        //这里成功,会删除failedRegistered,failedUnregistered中的url
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            //具体register逻辑交给子类实现
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            //如果注册中心或者提供者url的check为false的话,跳过抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            //如果是注册的时候,抛出这个异常,那么也会忽略,只打日志
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            //加入到失败重试集合
            failedRegistered.add(url);
        }
    }

注册失败后会把需要注册重试的url放入failedRegistered集合
然后在FailbackRegistry构造函数中起的定时任务会进行重试

public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        //重试的定时线程,使用future用于取消
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

retry方法的具体逻辑,就是循环遍历这些失败集合,然后调用doXXX方法进行重试

recover方法会在和Zookeeper重连时触发,在断连状态下,dubbo进程内的注册,订阅行为是会被缓存下来的,然后对所有缓存的url进行重新注册,订阅。
这边有个细节点,可以看到failedRegistered这些集合使用的都是线程安全的集合

private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

因为recover,retry这两个操作还是存在资源竞争的,但不仅限于这两个操作

ZookeeperRegistry

ZookeeperRegistry的工作就是通过Zookeeper API实现doRegister,doUnregister,doSubscribe,doUnsubscribe具体逻辑

首先来看下ZookeeperRegistry的构造函数,做的主要工作是初始化zk客户端

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //如果不进行配置,默认dubbo根目录就是/dubbo
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        //zookeeper添加重连回调,会触发recover方法,进行失败任务重试
        //为什么FailbackRegistry都是用线程安全的集合,因为在这里存在线程竞争资源
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }

使用zookeeperTransporter扩展点加载zk客户端实现,默认为Curator框架

@SPI("curator")
public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}

注册/取消注册实现

然后再来看doRegister和doUnregister方法,对于zk来说,就是创建目录呗

/**
     * 注册的逻辑,就是在zookeeper创建节点,节点路径为toUrlPath(url)
     * 具体格式为 /{group}/{interfaceName}/{category}/{url.toFullString}
     * DYNAMIC_KEY表示是否创建永久节点,true表示不是,断开连接后会消失,所以需要进行recover
     * @param url
     */
    @Override
    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
/**
     * 取消注册,就是删除那个节点
     * @param url
     */
    @Override
    protected void doUnregister(URL url) {
        try {
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

需要注意下节点的路径生成格式,也就是toUrlPath(url)方法,格式为 /{group}/{interfaceName}/{category}/{url.toFullString},
group一般不配置的话为dubbo,
interfaceName对应具体接口,
category开始就讲过,分为consumers,configuators,routers,providers
url.toFullString就是我们的url配置

对于registry来讲category=providers

取消注册就是对应删除那个节点

订阅/取消订阅实现

订阅的行为对于消费者来讲,用于获取providers和routers,用于得到路由后的提供者
对于提供者来讲,订阅configuators,通过新的配置重新暴露
在ZookeeperRegistry,我们只关注如何进行订阅,具体监听器的作用,在用到的模块再讲
doSubscribe方法支持订阅全局和订阅特定接口
如果interface=*,即订阅全局,对于新增和已存在的所有接口的改动都会触发回调
如果interface=特定接口,那么只有这个接口的子节点改变时,才触发回调

@Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
                                child = URL.decode(child);
                                if (!anyServices.contains(child)) {
                                    anyServices.add(child);
                                    //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
                                    //这里是用来对/dubbo下面提供者新增时的回调,相当于增量
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                //添加监听器会返回子节点集合
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
                        //这里的逻辑只执行一次,一次全量
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                //这边是针对明确interface的订阅逻辑
                List<URL> urls = new ArrayList<URL>();
                //针对每种category路径进行监听
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        //封装回调逻辑
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //创建节点
                    zkClient.create(path, false);
                    //增加回调
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //如果有子节点,直接进行触发一次,对应AbstractRegsitry的lookup方法
                //意思就是第一次订阅,如果订阅目录存在子节点,直接会触发一次
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

这边需要注意一点的是,每次进行订阅,最重要的第一次,会使用当前订阅节点的子节点数据触发一次notify,执行对应监听器逻辑,这个在后面RegistryDirectory中会用到这个特性

取消订阅没什么好讲的,删除订阅数据即可

讲了这么多,对于lookup方法,使用消费者查找提供者的逻辑其实也很简单。使用消费者url构造出zk中provider的目录,然后返回所有子节点即可

/**
     * 查找消费者url 对应 提供者url实现
     * 这边的url为消费者url
     * @param url
     * @return
     */
    @Override
    public List<URL> lookup(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("lookup url == null");
        }
        try {
            List<String> providers = new ArrayList<String>();
            //返回inteface下面所有category的url
            for (String path : toCategoriesPath(url)) {
                List<String> children = zkClient.getChildren(path);
                if (children != null) {
                    providers.addAll(children);
                }
            }
            //返回匹配的url
            return toUrlsWithoutEmpty(url, providers);
        } catch (Throwable e) {
            throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

总结

1.Zookeeper监听器的妙用,在Elasticjob也是使用到了这个特性,进行任务触发
2.通过zookeeperTransporter以及ZookeeperClient对Zookeeper操作进行抽象,进而支持两种zookeeper客户端框架。包括在remoting模块也是采用这种设计模式,和底层框架解耦。
3.Zookeeper默认的监听是一次性的,Curator框架实现了永久监听,但是dubbo没用到Curator这个特性。
4.写完这部分,Dirctory模块就比较容易写下去了,东西太多,有些地方的理解肯定存在偏差,希望读者能多多交流

最后

希望大家关注下我的公众号


image
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351