dubbo之Registry(注册中心)

前言

Registry是dubbo对注册中心的抽象,提供服务的注册、注销、查找、订阅、取消订阅等功能。本文按照dubbo中Registry的组织形式,分析Registry的核心逻辑。首先来看注册中心的创建,RegistryFactory接口定义注册中心的创建(工厂模式实现),支持SPI扩展,默认SPI实现是DubboRegistryFactory。注册中心Registry接口继承Node、RegistryService,抽象基类AbstractRegistry直接实现Registry,FailbackRegistry继承自基类AbstractRegistry,所有注册中心实现均继承自FailbackRegistry,这里可以看出,所有注册中心均支持失败重试(Failback)。

一、RegistryFactory(注册中心工厂)

RegistryFactory的UML类图如下:


注册中心工厂UML (1).jpg

先来看RegistryFactory接口定义,比较简单:

@SPI("dubbo")
public interface RegistryFactory {
    /**
     * Connect to the registry
     *  1、check=false时,连接无需check,否则连接断开时直接抛异常
     *  2、支持URL的用户名、密码权限校验
     *  3、支持注册中心族备份地址:10.20.153.10
     *  4、支持注册中心本地缓存文件
     *  5、支持超时设置
     *  6、支持session 60s过期
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

1.1 AbstractRegistryFactory

接着是抽象基类AbstractRegistryFactory,实现RegistryFactory接口的getRegistry方法,同时定义模板方法createRegistry供子类实现;AbstractRegistryFactory的getRegistry方法先从Map缓存查询注册中心,查不到则执行模板方法createRegistry创建注册中心,并放入缓存,然后返回该registry

@Override
public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceStringWithoutResolving();
    //加锁,保证单例
    LOCK.lock();
    try {
        // 缓存查询注册中心
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        // 不存在则通过spi、ioc方式创建registry
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // 锁释放
        LOCK.unlock();
    }
}

1.2 其他实现

基类AbstractRegistryFactory的子类实现中,比较重要的是DubboRegistryFactory(介绍Protocol时已经做了解析),其他实现比如RedisRegistryFactory、ZookeeperRegistryFactory、MulticastRegistryFactory的逻辑非常简单,直接返回对应的注册中心实现,代码就省略了。

二、Registry(注册中心)

来看dubbo中Registry实现。UML类图如下:


注册中心UML.jpg

方便理解起见,这里把注册中实现分为三个层次,分别是Registry接口、FailbackRegistry实现、注册中心实现。

2.1、Registry接口

上面UML类图中可以看出,Registry继承Node和RegistryService接口(Registry接口内部无新增方法),重点关注RegistryService接口。RegistryService抽象了服务的注册、注销、订阅、取消订阅、查找等核心功能,来看接口定义:

public interface RegistryService {
    /**
     * 注册数据,比如提供者服务、消费者地址、路由规则、override规则以及其他数据
     * 1、若URL中check=false,那么注册失败会进行重试且不会抛出异常,否则直接抛异常
     * 2、若URL中dynamic=false,那么URL中信息会被持久化存储,否则注册过程异常退出,URL信息应当被删除
     * 3、若URL中category=routers,那么意味着分类存储,默认catetory=providers,且会根据分类区域进行通知更新
     * 4、当注册中心重启,网络波动,数据不能被丢弃,包括删除损坏的流水线中数据的删除
     * 5、参数不同的URL可以共存,不能相互覆盖
     * @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
     */
    void register(URL url);

    /**
     * 注销
     * 1、若是dynamic=false的持久化存储,如果找不到注册数据,那么会抛出非法状态异常,否则会忽略
     * 2、根据完整URL进行匹配注销
     * @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
     */
    void unregister(URL url);

    /**
     * 订阅注册数据,并在注册数据更新的时候自动推送
     * 服务订阅
     * 1、若URL中check=false,那么当注册失败时,会直接在后台重试不会抛异常
     * 2、若URL中category=routers,那么只会通知特定分类数据;多个分类用逗号隔开;允许使用*全部匹配
     * 3、允许接口、组、版本以及分类作为查询条件
     * 4、查询条件允许使用*进行匹配,意味着订阅接口的所有版本
     * 5、若注册中心重启、网络波动,必须自动保存订阅请求
     * 6、参数不同的URL可以共存,不能相互覆盖
     * 7、订阅过程必须是阻塞的
     */
    void subscribe(URL url, NotifyListener listener);

    /**
    * 1、没有订阅,则直接忽略
     * 2、根据URL完全匹配
     **/
    void unsubscribe(URL url, NotifyListener listener);

    /**
     * 根据条件匹配,查询注册数据;对应订阅的推模式,提供拉模式,且仅返回一个结果
    */
    List<URL> lookup(URL url);

从接口定义可以看出,RegsitryService对数据的注册、注销、订阅、取消订阅、查找等功能做了定义约束,所有对RegistryService的实现都必须满足这个约束。

2.2、AbstractRegistry & FailbackRegistry

接下来看Registry接口的基类实现AbstractRegistry、FailbackRegistry。

2.2.1 AbstractRegistry

先来看AbstractRegistry,重点关注构造方法,AbstractRegistry基类的构造过程的核心逻辑分为三部分:1、创建注册中心缓存文件;2、cache文件加载至properties;3、同步backUpUrl信息。我们先来看构造方法的定义,然后再分步来看:

public AbstractRegistry(URL url) {
    // 注册中心URL负载
    setUrl(url);
    // Start file save timer
    // 1、注册中心文件同步保存开关,默认关闭,即默认异步保存
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    // 默认文件路径: user.home/.dubbo/dubbo-registry-applicationName-ip.cache
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        // 文件目录创建失败,直接抛异常
        file = new File(filename);
        if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
            if (!file.getParentFile().mkdirs()) {
                throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
            }
        }
    }
    this.file = file;
    // When starting the subscription center,we need to read the local cache file for future Registry fault tolerance processing.
    // 2、加载注册中心cache文件到内存,用于容错
    loadProperties();
    // 3、backUpUrl数据同步,同步所有订阅者、更新properties缓存。
    notify(url.getBackupUrls());
}
2.2.1.1、创建注册中心缓存

这一步比较简单,首先根据URL信息,确定缓存文件保存方式(异步、同步);然后,拼接缓存文件名称,根据文件名创建缓存文件,创建失败则直接抛异常,否则初始化缓存文件file。

2.2.1.2、注册中心cache加载

加载本地cache文件到内存缓存properties,将上一步中的cache文件内容,加载到properties。比较容易理解,启动时先读本地缓存,用于容错。

private void loadProperties() {
    if (file != null && file.exists()) {
        InputStream in = null;
        // 省略try-catch
            in = new FileInputStream(file);
            properties.load(in);
            if (logger.isInfoEnabled()) {
                logger.info("Load registry cache file " + file + ", data: " + properties);
            }
    }
}
2.2.1.3、backupUrl数据同步

重点来看backupUrl的数据同步,backupUrl的生成方式前面我们已经讲过了,这一步主要是将生成的backupUrl列表同步给各订阅者以及内存缓存Properties,来看代码:

protected void notify(List<URL> urls) {
    if (CollectionUtils.isEmpty(urls)) {
        return;
    }

    // 根据已订阅的URL,以及订阅者listener进行同步
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL url = entry.getKey();
                
        //过滤掉不匹配的URL
        if (!UrlUtils.isMatch(url, urls.get(0))) {
            continue;
        }

        Set<NotifyListener> listeners = entry.getValue();
        if (listeners != null) {
            for (NotifyListener listener : listeners) {
                try {
                    // 核心逻辑,执行URL信息同步
                    notify(url, listener, filterEmpty(url, urls));
                } catch (Throwable t) {
                    logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                }
            }
        }
    }
}

继续来看notify方法:

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((CollectionUtils.isEmpty(urls))
            && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // keep every provider's category.
    // 按照category分组
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
  
    //缓存分组后的Notified结果
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        // 同步URL信息,这里有两种实现,分别是RegistryDirectory和RegistryProtocol$OverrideListener,listener通过subscribe方法被注册到subscribed缓存,
        listener.notify(categoryList);
        // 每次notify都会更新cache缓存文件(同步或者异步),保证注册中心properties内容与各订阅者拿到的信息一致。
        saveProperties(url);
    }
}

同步URL信息到订阅者的逻辑主要在RegistryDirectory和RegistryProtocol$OverrideListener,这里就不再做解析了。来看同步到内存properties的逻辑,把所有待同步的URL序列化为字符串(每个url中间用空格隔开),然后将URL数据保存(同步或异步)至缓存文件(即2.2.1.1中创建的缓存文件),注意,这里异步保存实际执行的逻辑与同步保存完全一致,核心逻辑在doSaveProperties方法。

private void saveProperties(URL url) {
    if (file == null) {
        return;
    }

    try {
        StringBuilder buf = new StringBuilder();
        //  url缓存信息组装,每个url中间用空格隔开
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified != null) {
            for (List<URL> us : categoryNotified.values()) {
                for (URL u : us) {
                    if (buf.length() > 0) {
                        buf.append(URL_SEPARATOR);
                    }
                    buf.append(u.toFullString());
                }
            }
        }
        // 更新内存缓存
        properties.setProperty(url.getServiceKey(), buf.toString());
        // 版本控制,可以看出,MVCC并非DB专有
        long version = lastCacheChanged.incrementAndGet();
        // 同步保存文件则直接执行save,将properties缓存内容同步至缓存文件,否则放入线程池异步调度
        if (syncSaveFile) {
            doSaveProperties(version);
        } else {
            registryCacheExecutor.execute(new SaveProperties(version));
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

继续来看doSaveProperties方法,方法参数是当前文件的版本号,可以看到,防止并发操作,版本号用于版本控制

// 保存配置到本地缓存文件,文件版本
public void doSaveProperties(long version) {
    // 防止版本回退
    if (version < lastCacheChanged.get()) {
        return;
    }
    if (file == null) {
        return;
    }
    // Save
    try {
        // 创建本地缓存文件锁,不存在则直接创建;
        File lockfile = new File(file.getAbsolutePath() + ".lock");
        if (!lockfile.exists()) {
            lockfile.createNewFile();
        }
        try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
             FileChannel channel = raf.getChannel()) {
            FileLock lock = channel.tryLock();
            //拿到锁才可以操作
            if (lock == null) {
                throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
            }
            // Save
            try {
                if (!file.exists()) {
                    file.createNewFile();
                }
                try (FileOutputStream outputFile = new FileOutputStream(file)) {
                    properties.store(outputFile, "Dubbo Registry Cache");
                }
            } finally {
                lock.release();
            }
        }
    } catch (Throwable e) {
        if (version < lastCacheChanged.get()) {
            return;
        } else {
            registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save registry cache file, cause: " + e.getMessage(), e);
    }
}

为了进一步加深理解,我们把AbstractRegistry构建过程中的数据流图示如下:


AbstractRegistry init.jpg
2.2.1.4、数据注册 - register

基类中的注册等方法逻辑非常简单,只是将URL放入缓存,非常简单,不做过多说明。

2.2.1.5、数据注销 - unRegister

同样的,注销方法逻辑也非常简单,将URL从缓存中删除。

2.2.1.6、数据订阅 - subscribe

数据订阅需要注意,订阅的逻辑核心是注册监听器,以便数据变更时,同步更新;subscribed缓存的结构比较特殊,来看代码:

@Override
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    // subscribed缓存结构:ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>()
    Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
    listeners.add(listener);
}
2.2.1.7、取消数据订阅 - unSubscribe

取消订阅即删除监听器,从subscribed缓存中删除对应监听器。

2.2.1.8、数据查找 - lookup

查找逻辑即,从已同步过的URL列表(notified缓存)中,过滤满足要求的URL;若当前已同步过的URL集合为空,则

@Override
public List<URL> lookup(URL url) {
    List<URL> result = new ArrayList<>();
    Map<String, List<URL>> notifiedUrls = getNotified().get(url);
    if (notifiedUrls != null && notifiedUrls.size() > 0) {
        for (List<URL> urls : notifiedUrls.values()) {
            for (URL u : urls) {
                if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    result.add(u);
                }
            }
        }
    } else {
        final AtomicReference<List<URL>> reference = new AtomicReference<>();
        NotifyListener listener = reference::set;
        // 即注册监听器,保证首次notify有数据返回
        subscribe(url, listener); // Subscribe logic guarantees the first notify to return
        List<URL> urls = reference.get();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    result.add(u);
                }
            }
        }
    }
    return result;
}
2.2.1.9、注册中心销毁- destory

销毁的逻辑比较简单,分为注销数据和注销监听器两部分,来看代码:

@Override
// 主要做两件事情:1、注销URL,从registered列表中剔除所有URL;2、移除所有NotifyListener,也就是说不再接受配置变更同步消息。
public void destroy() {
    if (logger.isInfoEnabled()) {
        logger.info("Destroy registry:" + getUrl());
    }
    // 将所有URL从registered移除,即注销全部数据
    Set<URL> destroyRegistered = new HashSet<>(getRegistered());
    if (!destroyRegistered.isEmpty()) {
        for (URL url : new HashSet<>(getRegistered())) {
            if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                try {
                    unregister(url);
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unregister url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }

    // 注销监听器,即从subscribed移除所有订阅URL的listener
    Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
    if (!destroySubscribed.isEmpty()) {
        for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                try {
                    unsubscribe(url, listener);
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unsubscribe url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }
}

AbstractRegistry基类的逻辑就分析到这里,接着来看FailbackRegistry。

2.2.2、FailbackRegistry

FailbackRegistry 顾名思义,支持失败恢复的注册中心,继承自基类AbstractRegistry,可以看到在基类基础上扩展了失败恢复功能,先来看几个缓存变量:

// 重试任务map
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();

private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();

private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();

private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();

几个重试任务map,用于缓存失败的任务以便于重试;接着来看构造方法,在父类构造方法的基础上,新增了HashedWheelTimer实例,用于定时重试失败任务,默认重试时间间隔5s。

public FailbackRegistry(URL url) {
    super(url);
    // 默认重试时间间隔 5s
    this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);

    // 利用hashTimer实现定时重试,
    retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}

除此之外,FailbackRegistry还定义了几个关键的模板方法,由子类实现:

public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
2.2.2.1、数据注册-register

注册逻辑在父类基础上新增了失败以后的操作,逻辑比较简单,直接来看代码:

@Override
public void register(URL url) {
    // 父类注册方法,保存url到已注册列表
    super.register(url);
    // 将url从注册失败列表中剔除
    removeFailedRegistered(url);
    // 将url从注销失败列表中剔除
    removeFailedUnregistered(url);
    try {
        // 执行模板方法逻辑
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;
                // 如果启动检测开关开启(默认开启),失败会直接抛异常,否则加入失败列表,用于重试
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 加入注册失败列表,用于重试
        addFailedRegistered(url);
    }
}
2.2.2.2、数据注销-unRegister

注销逻辑与注册类似,直接来看代码:

@Override
public void unregister(URL url) {
    // 父类注销逻辑
    super.unregister(url);
    // 从注册失败缓存中删除
    removeFailedRegistered(url);
    // 从注销失败列表中删除
    removeFailedUnregistered(url);
    try {
        // 执行模板方法
        doUnregister(url);
    } catch (Exception e) {
        Throwable t = e;
                // 如果启动检测开关开启(默认开启),失败会直接抛异常,否则加入失败列表,用于重试
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        //加入失败重试缓存
        addFailedUnregistered(url);
    }
}
2.2.2.3、数据订阅-subscribe

与注册、注销逻辑类似,数据订阅逻辑也非常简单,代码就不再展示了

2.2.2.4、取消数据订阅-unSubscribe

与注册、注销逻辑类似,不做过多解析。

2.3、注册中心实现

好了,前面的铺垫结束了,本节来看具体的注册中心实现,下面按照DubboRegistry、MulticastRegistry、RedisRegistry、ZookeeperRegistry的顺序依次分析。

2.3.1、DubboRegistry

DubboRegistry作为dubbo的默认注册中心实现(RegistryFactory默认SPI实现是DubboRegistryFactory),严格意义上来讲,DubboRegistry实际上是一个Registry代理,核心逻辑全部由代理也即registryService实现。DubboRegistry的创建在DubboRegistryFactory中已经做了解析,这里不做过多说明。重点关注DubboRegistry的构造方法

public DubboRegistry(Invoker<RegistryService> registryInvoker, RegistryService registryService) {
    super(registryInvoker.getUrl());
    this.registryInvoker = registryInvoker;
    // Registry代理,核心逻辑借助registryService实现
    this.registryService = registryService;
    // 重连定时器,默认重连间隔时间3s
    this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
    // 初始化调度任务逻辑,具体逻辑参考recover方法
    reconnectFuture = reconnectTimer.scheduleWithFixedDelay(() -> {
        try {
            // 重连逻辑
            connect();
        } catch (Throwable t) { // Defensive fault tolerance
            logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
        }
    }, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}

介绍FailbackRegistry时说过,所有的注册中心实现都支持自动恢复,DubboRegistry的自动恢复实现原理是若当前注册中心不可用,则直接将该URL注册信息加入到注册失败、订阅失败缓存(借助父类FailbackRegistry的addFailedRegistered、addFailedSubscribed方法实现),由父类的HashedWheelTimer重新调度,进行恢复,来看失败后加入缓存的逻辑(外层方法是connect,内部实际上执行的recover方法):

// 核心逻辑:注册失败、订阅失败的url分别放入对应列表,用于hashedWheelTimer调度
@Override
protected void recover() throws Exception {
    // register
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover register url " + recoverRegistered);
        }
        // 放入注册失败列表
        for (URL url : recoverRegistered) {
            addFailedRegistered(url);
        }
    }
    // subscribe
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
    if (!recoverSubscribed.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover subscribe url " + recoverSubscribed.keySet());
        }
        for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                addFailedSubscribed(url, listener);
            }
        }
    }
}

2.3.2、MulticastRegistry

MulticastRegistry即多播注册中心,顾名思义,采用多播实现;注册、注销、订阅、取消订阅等均通过多播方式实现。核心逻辑在构造方法,包括多播组的创建以及多播消息的处理;创建多播组比较容易理解,重点关注多播消息的处理,在构造方法中创建并启动一个守护线程,用于接收并处理多播消息(注册消息、注销消息、订阅消息),处理多播消息的入口是receive方法。

2.3.2.1、构造方法

先来看构造方法:

// 创建并启动daemon线程,用于接收广播消息,对接收到的消息处理逻辑在receive方法
public MulticastRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    try {
        // 创建并加入多播组
        multicastAddress = InetAddress.getByName(url.getHost());
        checkMulticastAddress(multicastAddress);
        multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
        multicastSocket = new MulticastSocket(multicastPort);
        // 注册中心URL地址加入多播组
        NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
        // 启动daemon线程,用于接收广播socket消息
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                byte[] buf = new byte[2048];
                // UDP包封装
                DatagramPacket recv = new DatagramPacket(buf, buf.length);
                while (!multicastSocket.isClosed()) {
                    try {
                        // 接收UDP报文
                        multicastSocket.receive(recv);
                        String msg = new String(recv.getData()).trim();
                        int i = msg.indexOf('\n');
                        if (i > 0) {
                            msg = msg.substring(0, i).trim();
                        }
                        // 多播消息接收处理
                        MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                        Arrays.fill(buf, (byte) 0);
                    } catch (Throwable e) {
                        if (!multicastSocket.isClosed()) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
        }, "DubboMulticastRegistryReceiver");
        thread.setDaemon(true);
        thread.start();
    } catch (IOException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    // 这里利用定时调度线程池,定时清理received缓存中不可用socket;默认清理时间间隔60s;
    this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    if (url.getParameter("clean", true)) {
        this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    clean(); // Remove the expired
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
                }
            }
        }, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
    } else {
        this.cleanFuture = null;
    }
}

receive方法实现比较简单,根据msg类型(通过消息前缀判断)执行相应的注册、注销、订阅操作;重点关注registered、unregistered、multicast三个方法(MulticastRegistry的注册、注销、订阅逻辑均通过这三个方法实现)。

private void receive(String msg, InetSocketAddress remoteAddress) {
    if (logger.isInfoEnabled()) {
        logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
    }
    // 广播注册消息
    if (msg.startsWith(Constants.REGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
        registered(url);
    // 广播注销消息
    } else if (msg.startsWith(Constants.UNREGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
        unregistered(url);
    // 广播订阅消息
    } else if (msg.startsWith(Constants.SUBSCRIBE)) {
        URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
        // 根据注册的地址,发送单播、多播消息
        Set<URL> urls = getRegistered();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String host = remoteAddress != null && remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : url.getIp();
                    // 发送单播,多播消息
                    if (url.getParameter("unicast", true) // Whether the consumer's machine has only one process
                            && !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
                        unicast(Constants.REGISTER + " " + u.toFullString(), host);
                    } else {
                        multicast(Constants.REGISTER + " " + u.toFullString());
                    }
                }
            }
        }
    }/* else if (msg.startsWith(UNSUBSCRIBE)) {
    }*/
}

先来看registered方法,核心逻辑是将URL与subscried缓存中的URL进行匹配,匹配成功则加入received,同时同步给订阅者(通过NotifyListener的notify方法),然后通知当前listener(在subscribe时wait*)

protected void registered(URL url) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        if (UrlUtils.isMatch(key, url)) {
            Set<URL> urls = received.get(key);
            if (urls == null) {
                received.putIfAbsent(key, new ConcurrentHashSet<URL>());
                urls = received.get(key);
            }
            urls.add(url);
            List<URL> list = toList(urls);
            for (NotifyListener listener : entry.getValue()) {
                notify(key, listener, list);
                synchronized (listener) {
                    listener.notify();
                }
            }
        }
    }
}

再来看unRegistered,逻辑上大体与registered类似,将多播消息中的URL与subscried中URL匹配,匹配成功则将该URL从received中剔除;若当前received中该URL对应URL列表为空,则重置该URL的protocol为empty;最后同步变更给订阅者。

protected void unregistered(URL url) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        if (UrlUtils.isMatch(key, url)) {
            Set<URL> urls = received.get(key);
            if (urls != null) {
                urls.remove(url);
            }
            // received中url对应URL列表为空,则直接重置该url协议为empty
            if (urls == null || urls.isEmpty()) {
                if (urls == null) {
                    urls = new ConcurrentHashSet<URL>();
                }
                URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
                urls.add(empty);
            }
            List<URL> list = toList(urls);
            // 同步变更消息到各订阅者
            for (NotifyListener listener : entry.getValue()) {
                notify(key, listener, list);
            }
        }
    }
}

最后来看multicast方法,逻辑比较简单,即为发送多播消息到多播消息组(多播消息由构造方法中创建的线程异步处理

private void multicast(String msg) {
    if (logger.isInfoEnabled()) {
        logger.info("Send multicast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
    }
    try {
        byte[] data = (msg + "\n").getBytes();
        DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
        multicastSocket.send(hi);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
2.3.2.2、其他逻辑

MulticastRegistry的其他逻辑包括doRegister、doUnregister、doSubscribe、doUnsubscribe、destroy,前面四个方法的实现方式完全一致,即拼接并发送多播消息到多播组,这里以doRegister为例,代码如下:

@Override
public void doRegister(URL url) {
    multicast(Constants.REGISTER + " " + url.toFullString());
}

最后来看destroy,销毁需要处理多播组、关闭所有线程池、关闭所有socket,来看代码:

@Override
public void destroy() {
    super.destroy();
    try {
        // 取消定时清理任务
        ExecutorUtil.cancelScheduledFuture(cleanFuture);
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    try {
        // 退出多播组,并关闭socket
        multicastSocket.leaveGroup(multicastAddress);
        multicastSocket.close();
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    // 关闭定时清理线程池,这里关注下线程池的优雅关闭
    ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}

来看下dubbo线程池的优雅关闭:

public static void gracefulShutdown(Executor executor, int timeout) {
    if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
        return;
    }
    final ExecutorService es = (ExecutorService) executor;
    try {
        // 新任务不能再提交
        es.shutdown();
    } catch (SecurityException ex2) {
        return;
    } catch (NullPointerException ex2) {
        return;
    }
    try {
        // 等待队列内剩余任务结束,立即关闭线程池
        if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
            es.shutdownNow();
        }
    } catch (InterruptedException ex) {
        es.shutdownNow();
        Thread.currentThread().interrupt();
    }
    // 若线程池仍未关闭,则新建线程用于线程池的关闭
    if (!isTerminated(es)) {
        newThreadToCloseExecutor(es);
    }
}
// 专门用于关闭线程池的线程池shutdownExecutor
private static void newThreadToCloseExecutor(final ExecutorService es) {
        if (!isTerminated(es)) {
            shutdownExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 1000; i++) {
                            es.shutdownNow();
                            if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                                break;
                            }
                        }
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            });
        }
    }

了解完MulticastRegistry的所有方法,我们来看整个MulticastRegistry中的数据流,执行注册操作(这里以register为例,其他操作类似)将数据多播至多播组,然后由deamon线程异步将数据同步至订阅者,如下图所示:

MulticastRegistry-dataflow (1).jpg

2.3.3、RedisRegistry

RedisRegistry,即使用Redis存储URL数据的注册中心,这里从初始化、数据流以及核心方法等几个方面进行解析。RedisRegisty除了使用Redis缓存之外,还使用了Redis的消息队列,用于doSubscribe过程中的URL数据变更消息处理。

2.3.3.1、初始化

先来看RedisRegistry的初始化,大致可以分为父类构造方法、注册中心参数初始化、RedisPool连接池创建、过期调度线程池启动。父类构造方法主要是AbstractRegistry构造方法的调用;参数初始化主要包括RedisPool连接池参数初始化、注册中心重连时间间隔初始化、过期线程池调度时间间隔初始化等;RedisPool的创建比较简单,即直接取上一步中的参数创建RedisPool连接池(这里需要注意,会对同一个URL的多个backup地址单独创建RedisPool,多个地址之间进行了隔离,保证URL的可用性);过期调度线程池的调度比较容易理解,也就是说在注册中心构造过程中已经启动了对过期URL数据的定时清理,默认调度间隔30s。

RedisRegistry-init.jpg

然后来看构造方法:

public RedisRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 借用对象池管理配置
    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
    config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
    config.setTestOnReturn(url.getParameter("test.on.return", false));
    config.setTestWhileIdle(url.getParameter("test.while.idle", false));
    // config参数配置,略去

    // 支持的redis集群模式,failover和replicate
    String cluster = url.getParameter("cluster", "failover");
    if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
        throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
    }
    replicate = "replicate".equals(cluster);

    List<String> addresses = new ArrayList<>();
    addresses.add(url.getAddress());
    String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
    if (ArrayUtils.isNotEmpty(backups)) {
        addresses.addAll(Arrays.asList(backups));
    }

    for (String address : addresses) {
        int i = address.indexOf(':');
        String host;
        int port;
        if (i > 0) {
            host = address.substring(0, i);
            port = Integer.parseInt(address.substring(i + 1));
        } else {
            host = address;
            port = DEFAULT_REDIS_PORT;
        }
        // 每个地址对应一个jedis连接池,URL的各地址之间互不影响,保证可用性
        this.jedisPools.put(address, new JedisPool(config, host, port,
                url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
                url.getParameter("db.index", 0)));
    }

    this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    if (!group.endsWith(Constants.PATH_SEPARATOR)) {
        group = group + Constants.PATH_SEPARATOR;
    }
    // group = "/group/"
    this.root = group;

    // 配置过期定时调度
    this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
        try {
            // 延迟过期
            deferExpired(); // Extend the expiration time
        } catch (Throwable t) { // Defensive fault tolerance
            logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
        }
    }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}

// 延迟过期
private void deferExpired() {
        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
            JedisPool jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    for (URL url : new HashSet<>(getRegistered())) {
                        if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                            String key = toCategoryPath(url);
                            // 延长缓存过期时间,并发布队列消息
                            if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                                jedis.publish(key, Constants.REGISTER);
                            }
                        }
                    }
                    // 如果开启了强制清理开关;则直接清理redis中数据,并发布注销消息
                    if (admin) {
                        clean(jedis);
                    }
                    // 无需创建副本,则只写单台redis,直接break;
                    if (!replicate) {
                        break;//  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
    }

再来看整个RedisRegistry中的数据流,可以发现,RedisRegistry通过Redis缓存与Redis消息队列,异步+同步的方式将数据同步至本地cache文件、内存缓存Properties以及具体的数据订阅者,如RegistryDirectory:


RedisRegistry-dataflow (1).jpg
2.3.3.2、注册(doRegister)

了解完RedisRegistry中的数据流,再来看注册过程就比较容易理解了。主要包括两个核心操作:将当前URL缓存至Redis;然后将URL信息发布至Redis消息队列。直接来看代码:

public void doRegister(URL url) {
    String key = toCategoryPath(url);
    String value = url.toFullString();
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    // 当前URL的所有可用地址,遍历,存入redis,并发布注册消息
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                jedis.hset(key, value, expire);
                jedis.publish(key, Constants.REGISTER);
                success = true;
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}
2.3.3.3、注销(doUnregister)

注销逻辑即注册逻辑的反向操作,先删除redis缓存,再发布注销消息:

public void doUnregister(URL url) {
    String key = toCategoryPath(url);
    String value = url.toFullString();
    RpcException exception = null;
    boolean success = false;
    // 遍历当前URL的所有可用节点地址,遍历从redis中删除,并发布注销消息
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                jedis.hdel(key, value);
                jedis.publish(key, Constants.UNREGISTER);
                success = true;
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}
2.3.3.4、订阅(doSubscribe)

订阅逻辑稍微复杂,支持同步和异步方式将URL信息同步至订阅者,如RegistryDirectory;同步逻辑比较简单,将被订阅的URL与redis中缓存URL进行交叉过滤,最终通过父类notify方法(AbstractRegistry的notify方法),完成URL数据同步;异步逻辑由Notifier线程实现,Notifier依照线性退避规则执行,执行逻辑除了同步URL信息之外,还会订阅redis消息队列并消费队列中消息,当然,消费的核心逻辑也是执行doNotify方法。

public void doSubscribe(final URL url, final NotifyListener listener) {
    String service = toServicePath(url);
    // 每个service对应一个notifier线程
    Notifier notifier = notifiers.get(service);
    // 异步方式,创建Notifier线程,并启动,线性回避执行,数据流 : redis缓存、redis消息队列 -> 订阅者
    if (notifier == null) {
        Notifier newNotifier = new Notifier(service);
        notifiers.putIfAbsent(service, newNotifier);
        notifier = notifiers.get(service);
        // notifier线程创建后即启动
        if (notifier == newNotifier) {
            notifier.start();
        }
    }
    boolean success = false;
    RpcException exception = null;
    // 同步方式订阅 数据流 redis缓存 -> 订阅者,如RegistryDirector
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                // 区分所有服务与单个服务。
                if (service.endsWith(Constants.ANY_VALUE)) {
                    admin = true;
                    Set<String> keys = jedis.keys(service);
                    if (CollectionUtils.isNotEmpty(keys)) {
                        Map<String, Set<String>> serviceKeys = new HashMap<>();
                        for (String key : keys) {
                            String serviceKey = toServicePath(key);
                            Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
                            sk.add(key);
                        }
                        for (Set<String> sk : serviceKeys.values()) {
                            doNotify(jedis, sk, url, Collections.singletonList(listener));
                        }
                    }
                } else {
                    doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Collections.singletonList(listener));
                }
                success = true;
                break; // Just read one server's data
            }
        } catch (Throwable t) { // Try the next server
            exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    // 异常处理逻辑略去
}

来看doNotify实现

private void doNotify(Jedis jedis, String key) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
        doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
    }
}

private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
        if (keys == null || keys.isEmpty()
                || listeners == null || listeners.isEmpty()) {
            return;
        }
        long now = System.currentTimeMillis();
        List<URL> result = new ArrayList<>();
        List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
        String consumerService = url.getServiceInterface();
        for (String key : keys) {
            if (!Constants.ANY_VALUE.equals(consumerService)) {
                String providerService = toServiceName(key);
                if (!providerService.equals(consumerService)) {
                    continue;
                }
            }
            String category = toCategoryName(key);
            if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
                continue;
            }
            List<URL> urls = new ArrayList<>();
            // redis缓存URL信息与内存中被订阅的URL信息进行交叉过滤
            Map<String, String> values = jedis.hgetAll(key);
            if (CollectionUtils.isNotEmptyMap(values)) {
                for (Map.Entry<String, String> entry : values.entrySet()) {
                    URL u = URL.valueOf(entry.getKey());
                    if (!u.getParameter(Constants.DYNAMIC_KEY, true)
                            || Long.parseLong(entry.getValue()) >= now) {
                        if (UrlUtils.isMatch(url, u)) {
                            // 与url匹配的缓存URL放入待通知列表
                            urls.add(u);
                        }
                    }
                }
            }
            // 若无有效URL,则填充urls地址为任意值*
            if (urls.isEmpty()) {
                urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
                        .setAddress(Constants.ANYHOST_VALUE)
                        .setPath(toServiceName(key))
                        .addParameter(Constants.CATEGORY_KEY, category));
            }
            result.addAll(urls);
            if (logger.isInfoEnabled()) {
                logger.info("redis notify: " + key + " = " + urls);
            }
        }
        if (CollectionUtils.isEmpty(result)) {
            return;
        }
            // 由父类nofity方法完成最终URL数据的同步
        for (NotifyListener listener : listeners) {
            notify(url, listener, result);
        }
    }

再来看Notifier实现

private class Notifier extends Thread {

    private final String service;
    private final AtomicInteger connectSkip = new AtomicInteger();
    private final AtomicInteger connectSkipped = new AtomicInteger();
    private volatile Jedis jedis;
    private volatile boolean first = true;
    private volatile boolean running = true;
    private volatile int connectRandom;

    public Notifier(String service) {
        super.setDaemon(true);
        super.setName("DubboRedisSubscribe");
        this.service = service;
    }

    private void resetSkip() {
        connectSkip.set(0);
        connectSkipped.set(0);
        connectRandom = 0;
    }

    // 首次结果为false;线性退避算法
    private boolean isSkip() {
        // 初始值均为0
        int skip = connectSkip.get(); // Growth of skipping times
        if (skip >= 10) { 
            if (connectRandom == 0) {
                connectRandom = ThreadLocalRandom.current().nextInt(10);
            }
            skip = 10 + connectRandom;
        }
        // 初始值 false,
        // 第一次:false,0-1
        // 第二次:true,1-1
        // 第三次:false,0-2
        // 第四次:true,1-2
        // 第五次:true,2-2
        // 第五次:false,0-3
        if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
            return true;
        }

        // skip 自增
        connectSkip.incrementAndGet();
        // skiped重置0
        connectSkipped.set(0);
        connectRandom = 0;
        return false;
    }

    @Override
    public void run() {
        while (running) {
            try {
                // 线性回避,是否跳过
                if (!isSkip()) {
                    try {
                        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
                            JedisPool jedisPool = entry.getValue();
                            try {
                                jedis = jedisPool.getResource();
                                try {
                                    // service是具体类型还是所有服务
                                    if (service.endsWith(Constants.ANY_VALUE)) {
                                        if (!first) {
                                            first = false;
                                            Set<String> keys = jedis.keys(service);
                                            if (CollectionUtils.isNotEmpty(keys)) {
                                                for (String s : keys) {
                                                    //  同步redis中缓存数据至各订阅者以及本地cache文件
                                                    doNotify(jedis, s);
                                                }
                                            }
                                            // 同步完之后,重置skip
                                            resetSkip();
                                        }
                                        // 处理Redis消息队列中所有与service匹配的消息
                                        jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
                                    } else {
                                        if (!first) {
                                            first = false;
                                            //  同步redis中缓存数据至各订阅者以及本地cache文件
                                            doNotify(jedis, service);
                                            resetSkip();
                                        }
                                        // 处理Redis消息队列中所有与service匹配的消息,同样的同步至各数据订阅者以及本地cache文件。
                                        jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
                                    }
                                    break;
                                } finally {
                                    jedis.close();
                                }
                            } catch (Throwable t) { // Retry another server
                                logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
                                // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
                                sleep(reconnectPeriod);
                            }
                        }
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                        sleep(reconnectPeriod);
                    }
                }
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
        }
    }

    public void shutdown() {
        try {
            running = false;
            jedis.disconnect();
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
}

这里顺便提一下NotifySub,继承自JedisPubSub,用于redis消息队列中消息的消费,核心逻辑在onMessage方法:

@Override
public void onMessage(String key, String msg) {
    if (logger.isInfoEnabled()) {
        logger.info("redis event: " + key + " = " + msg);
    }
    // 只处理注册、注销类消息
    if (msg.equals(Constants.REGISTER)
            || msg.equals(Constants.UNREGISTER)) {
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                doNotify(jedis, key);
            } finally {
                jedis.close();
            }
        } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
            logger.error(t.getMessage(), t);
        }
    }
}

2.3.4、ZookeeperRegistry

ZookeeperRegistry,即zk注册中心,也是dubbo官方默认采用的注册中心。先来看构造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 初始化zkClient
    zkClient = zookeeperTransporter.connect(url);
    // 若是重连状态,执行恢复逻辑,即将已注册过的URL,放入retry列表,重新注册;
    zkClient.addStateListener(state -> {
        if (state == StateListener.RECONNECTED) {
            try {
                recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    });
}

构造方法里有一个参数,ZookeeperTransporter,先来看一下这个ZookeeperTransporter。

2.3.4.1、ZookeeperTransporter

ZookeeperTransporter是dubbo对zk客户端的适配接口,支持SPI(方法级),内部只有一个connect方法,返回ZookeeperClient实例。我们知道,zk客户端有常用的两个实现,分别是:

  1. Curator,Netflix公司开源的一套zookeeper客户端框架,jar包gav如下:

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.0.1</version>
    </dependency>
    
  2. Zkclient,Github上一个开源的Zookeeper客户端,在Zookeeper原生 API接口之上进行了包装,是一个更加易用的Zookeeper客户端,jar包gav如下:

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.2</version>
    </dependency>
    

    dubbo中借助这两种客户端组件,ZookeeperClient的实现有CuratorZookeeperClient、ZkclientZookeeperClient,内部通过客户端组件完成zk的连接、监听等动作,具体逻辑这里不做详细解析。继续来看ZookeeperTransporter,基于接口的基类实现AbstractZookeeperTransporter,实现了connect方法,同时定义模板方法createZookeeperClient由具体的Transporter(CuratorZookeeperTransporter、ZkclientZookeeperTransporter,逻辑比较简单,省略)实现。直接来看基类的connect方法:

    @Override
    public ZookeeperClient connect(URL url) {
        ZookeeperClient zookeeperClient;
        // backUrl解析
        List<String> addressList = getURLBackupAddress(url);
        // The field define the zookeeper server , including protocol, host, port, username, password
        // fetchAndUpdateZookeeperClientCache 逻辑比较简单,从缓存里取,取不到则返回null;
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }
        // avoid creating too many connections, so add lock,加锁,防并发。
        synchronized (zookeeperClientMap) {
            if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
                logger.info("find valid zookeeper client from the cache for address: " + url);
                return zookeeperClient;
            }
            // 缓存没取到,创建zkClient,并缓存到map
            zookeeperClient = createZookeeperClient(toClientURL(url));
            logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
            writeToClientMap(addressList, zookeeperClient);
        }
        return zookeeperClient;
    }
    
2.3.4.2、doRegister方法

基类FailbackRegistry的模板方法实现,逻辑非常简单:

@Override
public void doRegister(URL url) {
    try {
        // 创建zk临时节点,节点目录类似 /dubbo/xxx/xxx/xxx.xxx.xxxService
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.3、doUnregister方法

基类FailbackRegistry的模板方法实现,逻辑同样非常简单:

@Override
public void doUnregister(URL url) {
    try {
        // 删除zk节点
        zkClient.delete(toUrlPath(url));
    } catch (Throwable e) {
        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.4、doSubscribe方法

基类FailbackRegistry的模板方法实现,逻辑相对复杂,主要分为几部分1)zkListener的初始化,2)递归订阅url变更信息,创建zk永久节点,3)执行父类notify(逻辑参考AbstractRegistry解析部分),代码如下:

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 无指定服务
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            // 根节点/dubbo
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                listeners = zkListeners.get(url);
            }
            // 初始化zkListener
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                // 初始化anyServices 注册 ChildListener到listener内存缓存,订阅childChange变更,即当执行childChanged时,完成对所有child的订阅。
                listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if (!anyServices.contains(child)) {
                            anyServices.add(child);
                            // 递归订阅url变更消息,最终会走到else,结束
                            subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            // 创建永久节点/dubbo
            zkClient.create(root, false);
            // 订阅URL变更信息
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // 递归订阅url变更消息,最终会走到else,结束
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            //指定URL变更处理
            List<URL> urls = new ArrayList<>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    // childChange事件,只做notify到指定listener
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkListener = listeners.get(listener);
                }
                zkClient.create(path, false);
                // 监听该path
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 父类AbstractRegistry的notify逻辑,略去
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.5、doUnsubscribe方法

基类FailbackRegistry的模板方法实现,直接上代码:

@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if (listeners != null) {
        ChildListener zkListener = listeners.get(listener);
        if (zkListener != null) {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                // 逻辑比较简单,即不再订阅对应的listener
                zkClient.removeChildListener(root, zkListener);
            } else {
                for (String path : toCategoriesPath(url)) {
                    zkClient.removeChildListener(path, zkListener);
                }
            }
        }
    }
}
2.3.4.6、lookup方法

基类AbstractRegistry的模板方法实现,核心逻辑是从zk中查询指定URL对应地址的所有可用URL(不同Category),直接来看代码:

@Override
public List<URL> lookup(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("lookup url == null");
    }
    try {
        List<String> providers = new ArrayList<>();
        for (String path : toCategoriesPath(url)) {
            List<String> children = zkClient.getChildren(path);
            if (children != null) {
                providers.addAll(children);
            }
        }
        return toUrlsWithoutEmpty(url, providers);
    } catch (Throwable e) {
        throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

小结

本文重点解析了dubbo中Registry的相关核心实现,从RegistryFactory到Registry,dubbo抽象了一系列注册中心,为用户自定义扩展提供了非常优良的入口;同时,dubbo为我们提供了基于缓存、多播、zk等的注册中心实现,非常方便,不得不感叹设计的非常好。

注:源码版本 2.7.1。8月以来,接连经历了迎接小生命的惊喜,跳槽后的适应期,中间耽误了挺长一段时间,后面我会努力持续更新的,come on。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,427评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,551评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,747评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,939评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,955评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,737评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,448评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,352评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,834评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,992评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,133评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,815评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,477评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,022评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,147评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,398评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,077评论 2 355

推荐阅读更多精彩内容