前言
Registry是dubbo对注册中心的抽象,提供服务的注册、注销、查找、订阅、取消订阅等功能。本文按照dubbo中Registry的组织形式,分析Registry的核心逻辑。首先来看注册中心的创建,RegistryFactory接口定义注册中心的创建(工厂模式实现),支持SPI扩展,默认SPI实现是DubboRegistryFactory。注册中心Registry接口继承Node、RegistryService,抽象基类AbstractRegistry直接实现Registry,FailbackRegistry继承自基类AbstractRegistry,所有注册中心实现均继承自FailbackRegistry,这里可以看出,所有注册中心均支持失败重试(Failback)。
一、RegistryFactory(注册中心工厂)
RegistryFactory的UML类图如下:
先来看RegistryFactory接口定义,比较简单:
@SPI("dubbo")
public interface RegistryFactory {
/**
* Connect to the registry
* 1、check=false时,连接无需check,否则连接断开时直接抛异常
* 2、支持URL的用户名、密码权限校验
* 3、支持注册中心族备份地址:10.20.153.10
* 4、支持注册中心本地缓存文件
* 5、支持超时设置
* 6、支持session 60s过期
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
1.1 AbstractRegistryFactory
接着是抽象基类AbstractRegistryFactory,实现RegistryFactory接口的getRegistry方法,同时定义模板方法createRegistry供子类实现;AbstractRegistryFactory的getRegistry方法先从Map缓存查询注册中心,查不到则执行模板方法createRegistry创建注册中心,并放入缓存,然后返回该registry
@Override
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceStringWithoutResolving();
//加锁,保证单例
LOCK.lock();
try {
// 缓存查询注册中心
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 不存在则通过spi、ioc方式创建registry
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 锁释放
LOCK.unlock();
}
}
1.2 其他实现
基类AbstractRegistryFactory的子类实现中,比较重要的是DubboRegistryFactory(介绍Protocol时已经做了解析),其他实现比如RedisRegistryFactory、ZookeeperRegistryFactory、MulticastRegistryFactory的逻辑非常简单,直接返回对应的注册中心实现,代码就省略了。
二、Registry(注册中心)
来看dubbo中Registry实现。UML类图如下:
方便理解起见,这里把注册中实现分为三个层次,分别是Registry接口、FailbackRegistry实现、注册中心实现。
2.1、Registry接口
上面UML类图中可以看出,Registry继承Node和RegistryService接口(Registry接口内部无新增方法),重点关注RegistryService接口。RegistryService抽象了服务的注册、注销、订阅、取消订阅、查找等核心功能,来看接口定义:
public interface RegistryService {
/**
* 注册数据,比如提供者服务、消费者地址、路由规则、override规则以及其他数据
* 1、若URL中check=false,那么注册失败会进行重试且不会抛出异常,否则直接抛异常
* 2、若URL中dynamic=false,那么URL中信息会被持久化存储,否则注册过程异常退出,URL信息应当被删除
* 3、若URL中category=routers,那么意味着分类存储,默认catetory=providers,且会根据分类区域进行通知更新
* 4、当注册中心重启,网络波动,数据不能被丢弃,包括删除损坏的流水线中数据的删除
* 5、参数不同的URL可以共存,不能相互覆盖
* @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);
/**
* 注销
* 1、若是dynamic=false的持久化存储,如果找不到注册数据,那么会抛出非法状态异常,否则会忽略
* 2、根据完整URL进行匹配注销
* @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);
/**
* 订阅注册数据,并在注册数据更新的时候自动推送
* 服务订阅
* 1、若URL中check=false,那么当注册失败时,会直接在后台重试不会抛异常
* 2、若URL中category=routers,那么只会通知特定分类数据;多个分类用逗号隔开;允许使用*全部匹配
* 3、允许接口、组、版本以及分类作为查询条件
* 4、查询条件允许使用*进行匹配,意味着订阅接口的所有版本
* 5、若注册中心重启、网络波动,必须自动保存订阅请求
* 6、参数不同的URL可以共存,不能相互覆盖
* 7、订阅过程必须是阻塞的
*/
void subscribe(URL url, NotifyListener listener);
/**
* 1、没有订阅,则直接忽略
* 2、根据URL完全匹配
**/
void unsubscribe(URL url, NotifyListener listener);
/**
* 根据条件匹配,查询注册数据;对应订阅的推模式,提供拉模式,且仅返回一个结果
*/
List<URL> lookup(URL url);
从接口定义可以看出,RegsitryService对数据的注册、注销、订阅、取消订阅、查找等功能做了定义约束,所有对RegistryService的实现都必须满足这个约束。
2.2、AbstractRegistry & FailbackRegistry
接下来看Registry接口的基类实现AbstractRegistry、FailbackRegistry。
2.2.1 AbstractRegistry
先来看AbstractRegistry,重点关注构造方法,AbstractRegistry基类的构造过程的核心逻辑分为三部分:1、创建注册中心缓存文件;2、cache文件加载至properties;3、同步backUpUrl信息。我们先来看构造方法的定义,然后再分步来看:
public AbstractRegistry(URL url) {
// 注册中心URL负载
setUrl(url);
// Start file save timer
// 1、注册中心文件同步保存开关,默认关闭,即默认异步保存
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 默认文件路径: user.home/.dubbo/dubbo-registry-applicationName-ip.cache
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
// 文件目录创建失败,直接抛异常
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// When starting the subscription center,we need to read the local cache file for future Registry fault tolerance processing.
// 2、加载注册中心cache文件到内存,用于容错
loadProperties();
// 3、backUpUrl数据同步,同步所有订阅者、更新properties缓存。
notify(url.getBackupUrls());
}
2.2.1.1、创建注册中心缓存
这一步比较简单,首先根据URL信息,确定缓存文件保存方式(异步、同步);然后,拼接缓存文件名称,根据文件名创建缓存文件,创建失败则直接抛异常,否则初始化缓存文件file。
2.2.1.2、注册中心cache加载
加载本地cache文件到内存缓存properties,将上一步中的cache文件内容,加载到properties。比较容易理解,启动时先读本地缓存,用于容错。
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
// 省略try-catch
in = new FileInputStream(file);
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry cache file " + file + ", data: " + properties);
}
}
}
2.2.1.3、backupUrl数据同步
重点来看backupUrl的数据同步,backupUrl的生成方式前面我们已经讲过了,这一步主要是将生成的backupUrl列表同步给各订阅者以及内存缓存Properties,来看代码:
protected void notify(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return;
}
// 根据已订阅的URL,以及订阅者listener进行同步
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
//过滤掉不匹配的URL
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
// 核心逻辑,执行URL信息同步
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
继续来看notify方法:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
// 按照category分组
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//缓存分组后的Notified结果
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 同步URL信息,这里有两种实现,分别是RegistryDirectory和RegistryProtocol$OverrideListener,listener通过subscribe方法被注册到subscribed缓存,
listener.notify(categoryList);
// 每次notify都会更新cache缓存文件(同步或者异步),保证注册中心properties内容与各订阅者拿到的信息一致。
saveProperties(url);
}
}
同步URL信息到订阅者的逻辑主要在RegistryDirectory和RegistryProtocol$OverrideListener,这里就不再做解析了。来看同步到内存properties的逻辑,把所有待同步的URL序列化为字符串(每个url中间用空格隔开),然后将URL数据保存(同步或异步)至缓存文件(即2.2.1.1中创建的缓存文件),注意,这里异步保存实际执行的逻辑与同步保存完全一致,核心逻辑在doSaveProperties方法。
private void saveProperties(URL url) {
if (file == null) {
return;
}
try {
StringBuilder buf = new StringBuilder();
// url缓存信息组装,每个url中间用空格隔开
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
// 更新内存缓存
properties.setProperty(url.getServiceKey(), buf.toString());
// 版本控制,可以看出,MVCC并非DB专有
long version = lastCacheChanged.incrementAndGet();
// 同步保存文件则直接执行save,将properties缓存内容同步至缓存文件,否则放入线程池异步调度
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
继续来看doSaveProperties方法,方法参数是当前文件的版本号,可以看到,防止并发操作,版本号用于版本控制
// 保存配置到本地缓存文件,文件版本
public void doSaveProperties(long version) {
// 防止版本回退
if (version < lastCacheChanged.get()) {
return;
}
if (file == null) {
return;
}
// Save
try {
// 创建本地缓存文件锁,不存在则直接创建;
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
//拿到锁才可以操作
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
properties.store(outputFile, "Dubbo Registry Cache");
}
} finally {
lock.release();
}
}
} catch (Throwable e) {
if (version < lastCacheChanged.get()) {
return;
} else {
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
logger.warn("Failed to save registry cache file, cause: " + e.getMessage(), e);
}
}
为了进一步加深理解,我们把AbstractRegistry构建过程中的数据流图示如下:
2.2.1.4、数据注册 - register
基类中的注册等方法逻辑非常简单,只是将URL放入缓存,非常简单,不做过多说明。
2.2.1.5、数据注销 - unRegister
同样的,注销方法逻辑也非常简单,将URL从缓存中删除。
2.2.1.6、数据订阅 - subscribe
数据订阅需要注意,订阅的逻辑核心是注册监听器,以便数据变更时,同步更新;subscribed缓存的结构比较特殊,来看代码:
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// subscribed缓存结构:ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>()
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
listeners.add(listener);
}
2.2.1.7、取消数据订阅 - unSubscribe
取消订阅即删除监听器,从subscribed缓存中删除对应监听器。
2.2.1.8、数据查找 - lookup
查找逻辑即,从已同步过的URL列表(notified缓存)中,过滤满足要求的URL;若当前已同步过的URL集合为空,则
@Override
public List<URL> lookup(URL url) {
List<URL> result = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
} else {
final AtomicReference<List<URL>> reference = new AtomicReference<>();
NotifyListener listener = reference::set;
// 即注册监听器,保证首次notify有数据返回
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
}
return result;
}
2.2.1.9、注册中心销毁- destory
销毁的逻辑比较简单,分为注销数据和注销监听器两部分,来看代码:
@Override
// 主要做两件事情:1、注销URL,从registered列表中剔除所有URL;2、移除所有NotifyListener,也就是说不再接受配置变更同步消息。
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 将所有URL从registered移除,即注销全部数据
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 注销监听器,即从subscribed移除所有订阅URL的listener
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
AbstractRegistry基类的逻辑就分析到这里,接着来看FailbackRegistry。
2.2.2、FailbackRegistry
FailbackRegistry 顾名思义,支持失败恢复的注册中心,继承自基类AbstractRegistry,可以看到在基类基础上扩展了失败恢复功能,先来看几个缓存变量:
// 重试任务map
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
几个重试任务map,用于缓存失败的任务以便于重试;接着来看构造方法,在父类构造方法的基础上,新增了HashedWheelTimer实例,用于定时重试失败任务,默认重试时间间隔5s。
public FailbackRegistry(URL url) {
super(url);
// 默认重试时间间隔 5s
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 利用hashTimer实现定时重试,
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
除此之外,FailbackRegistry还定义了几个关键的模板方法,由子类实现:
public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
2.2.2.1、数据注册-register
注册逻辑在父类基础上新增了失败以后的操作,逻辑比较简单,直接来看代码:
@Override
public void register(URL url) {
// 父类注册方法,保存url到已注册列表
super.register(url);
// 将url从注册失败列表中剔除
removeFailedRegistered(url);
// 将url从注销失败列表中剔除
removeFailedUnregistered(url);
try {
// 执行模板方法逻辑
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果启动检测开关开启(默认开启),失败会直接抛异常,否则加入失败列表,用于重试
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 加入注册失败列表,用于重试
addFailedRegistered(url);
}
}
2.2.2.2、数据注销-unRegister
注销逻辑与注册类似,直接来看代码:
@Override
public void unregister(URL url) {
// 父类注销逻辑
super.unregister(url);
// 从注册失败缓存中删除
removeFailedRegistered(url);
// 从注销失败列表中删除
removeFailedUnregistered(url);
try {
// 执行模板方法
doUnregister(url);
} catch (Exception e) {
Throwable t = e;
// 如果启动检测开关开启(默认开启),失败会直接抛异常,否则加入失败列表,用于重试
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
//加入失败重试缓存
addFailedUnregistered(url);
}
}
2.2.2.3、数据订阅-subscribe
与注册、注销逻辑类似,数据订阅逻辑也非常简单,代码就不再展示了
2.2.2.4、取消数据订阅-unSubscribe
与注册、注销逻辑类似,不做过多解析。
2.3、注册中心实现
好了,前面的铺垫结束了,本节来看具体的注册中心实现,下面按照DubboRegistry、MulticastRegistry、RedisRegistry、ZookeeperRegistry的顺序依次分析。
2.3.1、DubboRegistry
DubboRegistry作为dubbo的默认注册中心实现(RegistryFactory默认SPI实现是DubboRegistryFactory),严格意义上来讲,DubboRegistry实际上是一个Registry代理,核心逻辑全部由代理也即registryService实现。DubboRegistry的创建在DubboRegistryFactory中已经做了解析,这里不做过多说明。重点关注DubboRegistry的构造方法
public DubboRegistry(Invoker<RegistryService> registryInvoker, RegistryService registryService) {
super(registryInvoker.getUrl());
this.registryInvoker = registryInvoker;
// Registry代理,核心逻辑借助registryService实现
this.registryService = registryService;
// 重连定时器,默认重连间隔时间3s
this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
// 初始化调度任务逻辑,具体逻辑参考recover方法
reconnectFuture = reconnectTimer.scheduleWithFixedDelay(() -> {
try {
// 重连逻辑
connect();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
}
}, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}
介绍FailbackRegistry时说过,所有的注册中心实现都支持自动恢复,DubboRegistry的自动恢复实现原理是若当前注册中心不可用,则直接将该URL注册信息加入到注册失败、订阅失败缓存(借助父类FailbackRegistry的addFailedRegistered、addFailedSubscribed方法实现),由父类的HashedWheelTimer重新调度,进行恢复,来看失败后加入缓存的逻辑(外层方法是connect,内部实际上执行的recover方法):
// 核心逻辑:注册失败、订阅失败的url分别放入对应列表,用于hashedWheelTimer调度
@Override
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
// 放入注册失败列表
for (URL url : recoverRegistered) {
addFailedRegistered(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
addFailedSubscribed(url, listener);
}
}
}
}
2.3.2、MulticastRegistry
MulticastRegistry即多播注册中心,顾名思义,采用多播实现;注册、注销、订阅、取消订阅等均通过多播方式实现。核心逻辑在构造方法,包括多播组的创建以及多播消息的处理;创建多播组比较容易理解,重点关注多播消息的处理,在构造方法中创建并启动一个守护线程,用于接收并处理多播消息(注册消息、注销消息、订阅消息),处理多播消息的入口是receive方法。
2.3.2.1、构造方法
先来看构造方法:
// 创建并启动daemon线程,用于接收广播消息,对接收到的消息处理逻辑在receive方法
public MulticastRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
try {
// 创建并加入多播组
multicastAddress = InetAddress.getByName(url.getHost());
checkMulticastAddress(multicastAddress);
multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
multicastSocket = new MulticastSocket(multicastPort);
// 注册中心URL地址加入多播组
NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
// 启动daemon线程,用于接收广播socket消息
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
byte[] buf = new byte[2048];
// UDP包封装
DatagramPacket recv = new DatagramPacket(buf, buf.length);
while (!multicastSocket.isClosed()) {
try {
// 接收UDP报文
multicastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > 0) {
msg = msg.substring(0, i).trim();
}
// 多播消息接收处理
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf, (byte) 0);
} catch (Throwable e) {
if (!multicastSocket.isClosed()) {
logger.error(e.getMessage(), e);
}
}
}
}
}, "DubboMulticastRegistryReceiver");
thread.setDaemon(true);
thread.start();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 这里利用定时调度线程池,定时清理received缓存中不可用socket;默认清理时间间隔60s;
this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
if (url.getParameter("clean", true)) {
this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
clean(); // Remove the expired
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
}
}
}, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
} else {
this.cleanFuture = null;
}
}
receive方法实现比较简单,根据msg类型(通过消息前缀判断)执行相应的注册、注销、订阅操作;重点关注registered、unregistered、multicast三个方法(MulticastRegistry的注册、注销、订阅逻辑均通过这三个方法实现)。
private void receive(String msg, InetSocketAddress remoteAddress) {
if (logger.isInfoEnabled()) {
logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
}
// 广播注册消息
if (msg.startsWith(Constants.REGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
registered(url);
// 广播注销消息
} else if (msg.startsWith(Constants.UNREGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
unregistered(url);
// 广播订阅消息
} else if (msg.startsWith(Constants.SUBSCRIBE)) {
URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
// 根据注册的地址,发送单播、多播消息
Set<URL> urls = getRegistered();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String host = remoteAddress != null && remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : url.getIp();
// 发送单播,多播消息
if (url.getParameter("unicast", true) // Whether the consumer's machine has only one process
&& !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
unicast(Constants.REGISTER + " " + u.toFullString(), host);
} else {
multicast(Constants.REGISTER + " " + u.toFullString());
}
}
}
}
}/* else if (msg.startsWith(UNSUBSCRIBE)) {
}*/
}
先来看registered方法,核心逻辑是将URL与subscried缓存中的URL进行匹配,匹配成功则加入received,同时同步给订阅者(通过NotifyListener的notify方法),然后通知当前listener(在subscribe时wait*)
protected void registered(URL url) {
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.get(key);
if (urls == null) {
received.putIfAbsent(key, new ConcurrentHashSet<URL>());
urls = received.get(key);
}
urls.add(url);
List<URL> list = toList(urls);
for (NotifyListener listener : entry.getValue()) {
notify(key, listener, list);
synchronized (listener) {
listener.notify();
}
}
}
}
}
再来看unRegistered,逻辑上大体与registered类似,将多播消息中的URL与subscried中URL匹配,匹配成功则将该URL从received中剔除;若当前received中该URL对应URL列表为空,则重置该URL的protocol为empty;最后同步变更给订阅者。
protected void unregistered(URL url) {
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.get(key);
if (urls != null) {
urls.remove(url);
}
// received中url对应URL列表为空,则直接重置该url协议为empty
if (urls == null || urls.isEmpty()) {
if (urls == null) {
urls = new ConcurrentHashSet<URL>();
}
URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
urls.add(empty);
}
List<URL> list = toList(urls);
// 同步变更消息到各订阅者
for (NotifyListener listener : entry.getValue()) {
notify(key, listener, list);
}
}
}
}
最后来看multicast方法,逻辑比较简单,即为发送多播消息到多播消息组(多播消息由构造方法中创建的线程异步处理)
private void multicast(String msg) {
if (logger.isInfoEnabled()) {
logger.info("Send multicast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
2.3.2.2、其他逻辑
MulticastRegistry的其他逻辑包括doRegister、doUnregister、doSubscribe、doUnsubscribe、destroy,前面四个方法的实现方式完全一致,即拼接并发送多播消息到多播组,这里以doRegister为例,代码如下:
@Override
public void doRegister(URL url) {
multicast(Constants.REGISTER + " " + url.toFullString());
}
最后来看destroy,销毁需要处理多播组、关闭所有线程池、关闭所有socket,来看代码:
@Override
public void destroy() {
super.destroy();
try {
// 取消定时清理任务
ExecutorUtil.cancelScheduledFuture(cleanFuture);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
// 退出多播组,并关闭socket
multicastSocket.leaveGroup(multicastAddress);
multicastSocket.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
// 关闭定时清理线程池,这里关注下线程池的优雅关闭
ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}
来看下dubbo线程池的优雅关闭:
public static void gracefulShutdown(Executor executor, int timeout) {
if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
return;
}
final ExecutorService es = (ExecutorService) executor;
try {
// 新任务不能再提交
es.shutdown();
} catch (SecurityException ex2) {
return;
} catch (NullPointerException ex2) {
return;
}
try {
// 等待队列内剩余任务结束,立即关闭线程池
if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
es.shutdownNow();
Thread.currentThread().interrupt();
}
// 若线程池仍未关闭,则新建线程用于线程池的关闭
if (!isTerminated(es)) {
newThreadToCloseExecutor(es);
}
}
// 专门用于关闭线程池的线程池shutdownExecutor
private static void newThreadToCloseExecutor(final ExecutorService es) {
if (!isTerminated(es)) {
shutdownExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 1000; i++) {
es.shutdownNow();
if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
});
}
}
了解完MulticastRegistry的所有方法,我们来看整个MulticastRegistry中的数据流,执行注册操作(这里以register为例,其他操作类似)将数据多播至多播组,然后由deamon线程异步将数据同步至订阅者,如下图所示:
2.3.3、RedisRegistry
RedisRegistry,即使用Redis存储URL数据的注册中心,这里从初始化、数据流以及核心方法等几个方面进行解析。RedisRegisty除了使用Redis缓存之外,还使用了Redis的消息队列,用于doSubscribe过程中的URL数据变更消息处理。
2.3.3.1、初始化
先来看RedisRegistry的初始化,大致可以分为父类构造方法、注册中心参数初始化、RedisPool连接池创建、过期调度线程池启动。父类构造方法主要是AbstractRegistry构造方法的调用;参数初始化主要包括RedisPool连接池参数初始化、注册中心重连时间间隔初始化、过期线程池调度时间间隔初始化等;RedisPool的创建比较简单,即直接取上一步中的参数创建RedisPool连接池(这里需要注意,会对同一个URL的多个backup地址单独创建RedisPool,多个地址之间进行了隔离,保证URL的可用性);过期调度线程池的调度比较容易理解,也就是说在注册中心构造过程中已经启动了对过期URL数据的定时清理,默认调度间隔30s。
然后来看构造方法:
public RedisRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 借用对象池管理配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
// config参数配置,略去
// 支持的redis集群模式,failover和replicate
String cluster = url.getParameter("cluster", "failover");
if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
}
replicate = "replicate".equals(cluster);
List<String> addresses = new ArrayList<>();
addresses.add(url.getAddress());
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (ArrayUtils.isNotEmpty(backups)) {
addresses.addAll(Arrays.asList(backups));
}
for (String address : addresses) {
int i = address.indexOf(':');
String host;
int port;
if (i > 0) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = DEFAULT_REDIS_PORT;
}
// 每个地址对应一个jedis连接池,URL的各地址之间互不影响,保证可用性
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
url.getParameter("db.index", 0)));
}
this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
if (!group.endsWith(Constants.PATH_SEPARATOR)) {
group = group + Constants.PATH_SEPARATOR;
}
// group = "/group/"
this.root = group;
// 配置过期定时调度
this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
try {
// 延迟过期
deferExpired(); // Extend the expiration time
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}
// 延迟过期
private void deferExpired() {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
String key = toCategoryPath(url);
// 延长缓存过期时间,并发布队列消息
if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
jedis.publish(key, Constants.REGISTER);
}
}
}
// 如果开启了强制清理开关;则直接清理redis中数据,并发布注销消息
if (admin) {
clean(jedis);
}
// 无需创建副本,则只写单台redis,直接break;
if (!replicate) {
break;// If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}
再来看整个RedisRegistry中的数据流,可以发现,RedisRegistry通过Redis缓存与Redis消息队列,异步+同步的方式将数据同步至本地cache文件、内存缓存Properties以及具体的数据订阅者,如RegistryDirectory:
2.3.3.2、注册(doRegister)
了解完RedisRegistry中的数据流,再来看注册过程就比较容易理解了。主要包括两个核心操作:将当前URL缓存至Redis;然后将URL信息发布至Redis消息队列。直接来看代码:
public void doRegister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
boolean success = false;
RpcException exception = null;
// 当前URL的所有可用地址,遍历,存入redis,并发布注册消息
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hset(key, value, expire);
jedis.publish(key, Constants.REGISTER);
success = true;
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
2.3.3.3、注销(doUnregister)
注销逻辑即注册逻辑的反向操作,先删除redis缓存,再发布注销消息:
public void doUnregister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
RpcException exception = null;
boolean success = false;
// 遍历当前URL的所有可用节点地址,遍历从redis中删除,并发布注销消息
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hdel(key, value);
jedis.publish(key, Constants.UNREGISTER);
success = true;
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
2.3.3.4、订阅(doSubscribe)
订阅逻辑稍微复杂,支持同步和异步方式将URL信息同步至订阅者,如RegistryDirectory;同步逻辑比较简单,将被订阅的URL与redis中缓存URL进行交叉过滤,最终通过父类notify方法(AbstractRegistry的notify方法),完成URL数据同步;异步逻辑由Notifier线程实现,Notifier依照线性退避规则执行,执行逻辑除了同步URL信息之外,还会订阅redis消息队列并消费队列中消息,当然,消费的核心逻辑也是执行doNotify方法。
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
// 每个service对应一个notifier线程
Notifier notifier = notifiers.get(service);
// 异步方式,创建Notifier线程,并启动,线性回避执行,数据流 : redis缓存、redis消息队列 -> 订阅者
if (notifier == null) {
Notifier newNotifier = new Notifier(service);
notifiers.putIfAbsent(service, newNotifier);
notifier = notifiers.get(service);
// notifier线程创建后即启动
if (notifier == newNotifier) {
notifier.start();
}
}
boolean success = false;
RpcException exception = null;
// 同步方式订阅 数据流 redis缓存 -> 订阅者,如RegistryDirector
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
// 区分所有服务与单个服务。
if (service.endsWith(Constants.ANY_VALUE)) {
admin = true;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
Map<String, Set<String>> serviceKeys = new HashMap<>();
for (String key : keys) {
String serviceKey = toServicePath(key);
Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
sk.add(key);
}
for (Set<String> sk : serviceKeys.values()) {
doNotify(jedis, sk, url, Collections.singletonList(listener));
}
}
} else {
doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Collections.singletonList(listener));
}
success = true;
break; // Just read one server's data
}
} catch (Throwable t) { // Try the next server
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
// 异常处理逻辑略去
}
来看doNotify实现
private void doNotify(Jedis jedis, String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
}
}
private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
if (keys == null || keys.isEmpty()
|| listeners == null || listeners.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
List<URL> result = new ArrayList<>();
List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
String consumerService = url.getServiceInterface();
for (String key : keys) {
if (!Constants.ANY_VALUE.equals(consumerService)) {
String providerService = toServiceName(key);
if (!providerService.equals(consumerService)) {
continue;
}
}
String category = toCategoryName(key);
if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
continue;
}
List<URL> urls = new ArrayList<>();
// redis缓存URL信息与内存中被订阅的URL信息进行交叉过滤
Map<String, String> values = jedis.hgetAll(key);
if (CollectionUtils.isNotEmptyMap(values)) {
for (Map.Entry<String, String> entry : values.entrySet()) {
URL u = URL.valueOf(entry.getKey());
if (!u.getParameter(Constants.DYNAMIC_KEY, true)
|| Long.parseLong(entry.getValue()) >= now) {
if (UrlUtils.isMatch(url, u)) {
// 与url匹配的缓存URL放入待通知列表
urls.add(u);
}
}
}
}
// 若无有效URL,则填充urls地址为任意值*
if (urls.isEmpty()) {
urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
.setAddress(Constants.ANYHOST_VALUE)
.setPath(toServiceName(key))
.addParameter(Constants.CATEGORY_KEY, category));
}
result.addAll(urls);
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + key + " = " + urls);
}
}
if (CollectionUtils.isEmpty(result)) {
return;
}
// 由父类nofity方法完成最终URL数据的同步
for (NotifyListener listener : listeners) {
notify(url, listener, result);
}
}
再来看Notifier实现
private class Notifier extends Thread {
private final String service;
private final AtomicInteger connectSkip = new AtomicInteger();
private final AtomicInteger connectSkipped = new AtomicInteger();
private volatile Jedis jedis;
private volatile boolean first = true;
private volatile boolean running = true;
private volatile int connectRandom;
public Notifier(String service) {
super.setDaemon(true);
super.setName("DubboRedisSubscribe");
this.service = service;
}
private void resetSkip() {
connectSkip.set(0);
connectSkipped.set(0);
connectRandom = 0;
}
// 首次结果为false;线性退避算法
private boolean isSkip() {
// 初始值均为0
int skip = connectSkip.get(); // Growth of skipping times
if (skip >= 10) {
if (connectRandom == 0) {
connectRandom = ThreadLocalRandom.current().nextInt(10);
}
skip = 10 + connectRandom;
}
// 初始值 false,
// 第一次:false,0-1
// 第二次:true,1-1
// 第三次:false,0-2
// 第四次:true,1-2
// 第五次:true,2-2
// 第五次:false,0-3
if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
return true;
}
// skip 自增
connectSkip.incrementAndGet();
// skiped重置0
connectSkipped.set(0);
connectRandom = 0;
return false;
}
@Override
public void run() {
while (running) {
try {
// 线性回避,是否跳过
if (!isSkip()) {
try {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
jedis = jedisPool.getResource();
try {
// service是具体类型还是所有服务
if (service.endsWith(Constants.ANY_VALUE)) {
if (!first) {
first = false;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
for (String s : keys) {
// 同步redis中缓存数据至各订阅者以及本地cache文件
doNotify(jedis, s);
}
}
// 同步完之后,重置skip
resetSkip();
}
// 处理Redis消息队列中所有与service匹配的消息
jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
} else {
if (!first) {
first = false;
// 同步redis中缓存数据至各订阅者以及本地cache文件
doNotify(jedis, service);
resetSkip();
}
// 处理Redis消息队列中所有与service匹配的消息,同样的同步至各数据订阅者以及本地cache文件。
jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
}
break;
} finally {
jedis.close();
}
} catch (Throwable t) { // Retry another server
logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
// If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
public void shutdown() {
try {
running = false;
jedis.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
这里顺便提一下NotifySub,继承自JedisPubSub,用于redis消息队列中消息的消费,核心逻辑在onMessage方法:
@Override
public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
// 只处理注册、注销类消息
if (msg.equals(Constants.REGISTER)
|| msg.equals(Constants.UNREGISTER)) {
try {
Jedis jedis = jedisPool.getResource();
try {
doNotify(jedis, key);
} finally {
jedis.close();
}
} catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
logger.error(t.getMessage(), t);
}
}
}
2.3.4、ZookeeperRegistry
ZookeeperRegistry,即zk注册中心,也是dubbo官方默认采用的注册中心。先来看构造方法:
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 初始化zkClient
zkClient = zookeeperTransporter.connect(url);
// 若是重连状态,执行恢复逻辑,即将已注册过的URL,放入retry列表,重新注册;
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
构造方法里有一个参数,ZookeeperTransporter,先来看一下这个ZookeeperTransporter。
2.3.4.1、ZookeeperTransporter
ZookeeperTransporter是dubbo对zk客户端的适配接口,支持SPI(方法级),内部只有一个connect方法,返回ZookeeperClient实例。我们知道,zk客户端有常用的两个实现,分别是:
-
Curator,Netflix公司开源的一套zookeeper客户端框架,jar包gav如下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency>
-
Zkclient,Github上一个开源的Zookeeper客户端,在Zookeeper原生 API接口之上进行了包装,是一个更加易用的Zookeeper客户端,jar包gav如下:
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
dubbo中借助这两种客户端组件,ZookeeperClient的实现有CuratorZookeeperClient、ZkclientZookeeperClient,内部通过客户端组件完成zk的连接、监听等动作,具体逻辑这里不做详细解析。继续来看ZookeeperTransporter,基于接口的基类实现AbstractZookeeperTransporter,实现了connect方法,同时定义模板方法createZookeeperClient由具体的Transporter(CuratorZookeeperTransporter、ZkclientZookeeperTransporter,逻辑比较简单,省略)实现。直接来看基类的connect方法:
@Override public ZookeeperClient connect(URL url) { ZookeeperClient zookeeperClient; // backUrl解析 List<String> addressList = getURLBackupAddress(url); // The field define the zookeeper server , including protocol, host, port, username, password // fetchAndUpdateZookeeperClientCache 逻辑比较简单,从缓存里取,取不到则返回null; if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // avoid creating too many connections, so add lock,加锁,防并发。 synchronized (zookeeperClientMap) { if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // 缓存没取到,创建zkClient,并缓存到map zookeeperClient = createZookeeperClient(toClientURL(url)); logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url); writeToClientMap(addressList, zookeeperClient); } return zookeeperClient; }
2.3.4.2、doRegister方法
基类FailbackRegistry的模板方法实现,逻辑非常简单:
@Override
public void doRegister(URL url) {
try {
// 创建zk临时节点,节点目录类似 /dubbo/xxx/xxx/xxx.xxx.xxxService
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.3、doUnregister方法
基类FailbackRegistry的模板方法实现,逻辑同样非常简单:
@Override
public void doUnregister(URL url) {
try {
// 删除zk节点
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.4、doSubscribe方法
基类FailbackRegistry的模板方法实现,逻辑相对复杂,主要分为几部分1)zkListener的初始化,2)递归订阅url变更信息,创建zk永久节点,3)执行父类notify(逻辑参考AbstractRegistry解析部分),代码如下:
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 无指定服务
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
// 根节点/dubbo
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
// 初始化zkListener
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// 初始化anyServices 注册 ChildListener到listener内存缓存,订阅childChange变更,即当执行childChanged时,完成对所有child的订阅。
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
// 递归订阅url变更消息,最终会走到else,结束
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
// 创建永久节点/dubbo
zkClient.create(root, false);
// 订阅URL变更信息
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 递归订阅url变更消息,最终会走到else,结束
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
//指定URL变更处理
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// childChange事件,只做notify到指定listener
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 监听该path
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 父类AbstractRegistry的notify逻辑,略去
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.5、doUnsubscribe方法
基类FailbackRegistry的模板方法实现,直接上代码:
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 逻辑比较简单,即不再订阅对应的listener
zkClient.removeChildListener(root, zkListener);
} else {
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
2.3.4.6、lookup方法
基类AbstractRegistry的模板方法实现,核心逻辑是从zk中查询指定URL对应地址的所有可用URL(不同Category),直接来看代码:
@Override
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
List<String> children = zkClient.getChildren(path);
if (children != null) {
providers.addAll(children);
}
}
return toUrlsWithoutEmpty(url, providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
小结
本文重点解析了dubbo中Registry的相关核心实现,从RegistryFactory到Registry,dubbo抽象了一系列注册中心,为用户自定义扩展提供了非常优良的入口;同时,dubbo为我们提供了基于缓存、多播、zk等的注册中心实现,非常方便,不得不感叹设计的非常好。
注:源码版本 2.7.1。8月以来,接连经历了迎接小生命的惊喜,跳槽后的适应期,中间耽误了挺长一段时间,后面我会努力持续更新的,come on。