上周基本都在研究代码,所以没有及时更新服务的启动过程,大概看了一遍之后现在开始更新一下服务的引用过程。
服务的引用过程还是分两部分来看,第一部分是订阅服务(从zk中获得服务提供者的URL),第二部分是真正引用服务。
服务订阅
因为在服务引用的过程中会牵扯到很多要考虑的内容,比如*的考虑,cluster的考虑,容错等等。如果把所有的情况都讲清楚要花费很大的篇章,所以在这里只涉及重要的通用逻辑。
服务引用的主要步骤是是ReferenceConfig的get方法开始,所以我们就从这个方法开始往下看:
public synchronized T get() {
if (destroyed){
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
//这是消费端进行初始化调用的初始方法
private void init() {
//设置consumer的参数
//设置reference的参数
//判断是不是通用(generic)服务
//根据配置初始化interfaceClass
//处理点对点服务
// 设置ReferenceConfig中的application,module,registries,monitor属性
checkApplication();
//检查stub和mock的配置,这两个都是降级使用的一些配置,具体使用见用户指南
checkStubAndMock(interfaceClass);
// 将参数设置到map中供以后调用
//设置methods参数
//根据层级设置参数,优先级依次为:reference-》consumer-》module-》application
String prifix = StringUtils.getServiceKey(map); //三要素
//设置方法级别的retry参数
//attributes通过系统context进行存储.
StaticContext.getSystemContext().putAll(attributes);
//这里的套路和ServiceConfig里面的套路差不多,都是通过配置文件然后读取一系列属性最终放到一个Map里面
//然后重点就是创建这个代理对象
ref = createProxy(map);
}
private T createProxy(Map<String, String> map) {
//根据Map构造URL
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
//默认情况下如果本地有服务暴露,则引用本地服务.(比如我自己引用了自己暴露的服务)
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
//如果是本地服务引用的话
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
//因为这个时候调用的是本地服务,所以这里的Invoker就是一个简单Invoker,并没有涉及集群的内容
//这时候的refprotocol直接就是InjvmProtocol,鉴于这种情况并不多,所以就不多讲
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
// 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
if (url != null && url.length() > 0) {
//注册中心可能是多个
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
//如果用户指定的URL是注册中心的话
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
//合并provider和consumer的配置
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 如果用户没有指定的话就通过注册中心配置拼装URL(这也是最通用的情况)
//得到所有注册中心的地址
// registry://host:ip/com.alibaba.dubbo.registry.RegistryService?key=value&...
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
//添加监控中心的URL
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加服务引用的URL
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
//如果注册中心的地址只有一个的话,这时候的URL大致是这样子:
// registry://username@:password@127.0.0.1:20880/com.alibaba.dubbo.registry.RegistryService?refer=.(经过编码的key,value键值队集合).&protocol=remote&owner=lvyanfeng&...
if (urls.size() == 1) {//只有一个注册中心的话
//这个invoker并不是简单的DubboInvoker,而是由RegistryProtocol构建基于目录服务的集群策略Invoker,
//这个invoker可以通过目录服务list出真正可调用的远程服务invoker
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
//这里的invoker和上面提到的一样
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// check参数检查
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
从上面看,这里的核心其实就是refprotocol.refer,不管是多协议的refer还是单个协议的refer,最终都是回到了这个方法,这个方法的内容也非常之多,下面我们慢慢看。
因为这时候的protocol是registry,所以应该进入的方法就是:RegistryProtocol.refer();
//type:实际引用的服务
//url:registry://username@:password@127.0.0.1:20880/com.alibaba.dubbo.registry.RegistryService?refer=.(经过编码的key,value键值队集合).&protocol=remote&owner=lvyanfeng&...
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//设置好protocol后从URL中移除registry对应的值
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 创建zookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
//判断引用是否是注册中心RegistryService服务,如果是的话就跳过注册中心
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 如果不是的话说明是普通服务
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0 ) {
//如果group配置了*或者group中的内容不止一个分组的话
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
return doRefer( getMergeableCluster(), registry, type, url );
}
}
//url:registry://username@:password@127.0.0.1:20880/com.alibaba.dubbo.registry.RegistryService?refer=.(经过编码的key,value键值队集合).&protocol=remote&owner=lvyanfeng&...
return doRefer(cluster, registry, type, url);
}
// 重点关注doRefer方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//初始化RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);//ZookeeperRegistry
directory.setProtocol(protocol);//DubboProtocol
//comsumer://127.0.0.1:0/com.netease.kaola.GoodsCompose/+ 之前referUrl的参数
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
//如果url的ServiceInterface不等于*并且register=true的话
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//如果register设置为true的话表示将consumer也注册到注册中心,主要是用于服务治理
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//这里通过directory订阅服务,不过本质上还是通过registry来订阅
//在subscribeUrl上增加键值对&category=providers,configurators,routers
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//使用集群策略去包装一下directory
return cluster.join(directory);
}
dubbo会将接口调用方的信息也注册到zk上,正常情况下的结构式这样子:dubbo-》interface-》consumer。
register方法已经将consumer对应的URL映射到zk的结构目录中了,那么订阅操作subscribe主要是做什么的?
订阅服务服务主要是用于监听提供信息变化的,如果变化的话就推送给服务Consumer。
针对监听服务来说,首先要明白zk的监听机制,下面以上图为例来分析一下dubbo是服务监听变化的:
1.作为consumer我们监听了/dubbo/com.foo.BaseService/providers目录,今后如果providers下面的信息有变更,比如说IP地址变更了,那么zk会首先在Watcher(监听器)中触发到变更信息,这样Dubbo就能借助于zk的推送变更消息来作出相应的响应来。
2.Dubbo为来与zk解偶,需要定义自己的监听器,这样的话如果底层zk切换为redis或者其他结构的时候不会对上层造成影响。
下面先宏观的看一下这些监听的机制(以zk为例):
我们知道在RegistryProtocol.refer的时候创建来zookeeperRegistry,我们来看具体的创建代码:
//一些通用判断逻辑我就不再列出,最后创建的Registry最终落到来ZookeeperRegistry的构造方式。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
//因为zk有两种实现,一种是官方的zkClient(不等于下面行的zkClient),另外一种是Curator,这里的zkClient是对上面两种方式的抽象包装,屏蔽了底层不一致的逻辑。
zkClient = zookeeperTransporter.connect(url);
//主要处理Session失效之后重连的逻辑
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
//关于zk的注册监听器的逻辑都在zkClient的生成中,具体如下:
public CuratorZookeeperClient(URL url) {
super(url);
try {
Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
client = builder.build();
//处理Session问题,所有的client状态变更都会触发ConnectionStateListener
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {//Session断开
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {//Session连上
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {//Session重连
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
client.start();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
//目前dubbo只是在重连的时候就触发recover动作,代码如下:
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
//这个操作就是恢复客户端与zk的连接
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
})
上面的介绍都是在说客户端与zk的连接出现变更的情况,在这种情况下dubbo会触发客户端的重连。但是变更的话还有另外一种情况,就是服务者信息变更,这种状态变更的话具体是怎么推送呢?
我们先来看一下这个调用链是怎样的:
RegistryDirectory.subscribe ->FailbackRegistry.subscribe - >
AbstractRegistry.subscribe -> ZookeeperRegistry.doSubscribe
// RegistryDirectory.subscribe
public void subscribe(URL url) {
setConsumerUrl(url);
// comsumer://127.0.0.1:0/com.netease.kaola.GoodsCompose/+ 之前referUrl的参数 + category=providers,configurators, routers
registry.subscribe(url, this);
}
//FailbackRegistry.subscribe
// comsumer://127.0.0.1:0/com.netease.kaola.GoodsCompose/+ 之前referUrl的参数 + category=providers,configurators, routers
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
//注意这里的一个小技巧,如果订阅失败的话(可能是注册中心挂了)可以尝试从缓存中取出服务提供者的列表
List<URL> urls = getCacheUrls(url);
if (urls != null && urls.size() > 0) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
}
}
//AbstractRegistry.subscribe
//在URL已经对应的监听器集合中添加一个新的listener
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()){
logger.info("Subscribe: " + url);
}
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
listeners.add(listener);
}
// ZookeeperRegistry.doSubscribe,这里的listener就是RegistryDirectory
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
//处理sreviceInterface为*的情况
} else {
List<URL> urls = new ArrayList<URL>();
//url: omsumer://127.0.0.1:0/com.netease.kaola.compose.ic.service.goods.PublishGoodsQueryCompose/+ 之前referUrl的参数 + category=providers,configurators,routers
//dubbo/com.netease.kaola.compose.ic.service.goods.PublishGoodsQueryCompose/providers
//dubbo/com.netease.kaola.compose.ic.service.goods.PublishGoodsQueryCompose/configurators
//dubbo/com.netease.kaola.compose.ic.service.goods.PublishGoodsQueryCompose/routers
//toCategoriesPath 将URL分成上面三组(providers,configurators,routers),内容如上
for (String path : toCategoriesPath(url)) {
// NotifyListener:主要是同来通知变更操作的,是核心变更逻辑的载体类
// ChildListener:主要是解偶zk事件的监听器
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//订阅的时候如果对应的provider还没有创建的话,这里也会直接将对应的providers的path创建好
zkClient.create(path, false);
//在path下面添加指定的监听器,这个是添加监听器的入口逻辑
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//在订阅完成后会触发一次notify,这里的notify操作跟ZookeeperRegistry.this.notify是同一个内容,上面的notify主要用于监听变化,这里主要是用来在订阅成功之后手动触发一次
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
所以现在的核心就是分析一下这个notify操作到底是怎么进行的,又是通过哪些调用链
//FailbackRegistry.notify -> FailbackRegistry.doNotify -> AbstractRegistry.notify
//FailbackRegistry.notif
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//参数检查
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
//FailbackRegistry.doNotify
doNotify(url, listener, urls);
} catch (Exception t) {
// 将失败的通知请求记录到失败列表,定时重试
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
//FailbackRegistry.doNotify 没有其他内容,直接调用父类的notify
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
// AbstractRegistry.notify 这个是真正通知操作的核心方法
//通知所有匹配的providerUrl
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
// 核心参数有效性判断
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
//如果(consumerUrl)url和(providerUrl)u匹配的话,通俗来讲就是你调用端希望调用的接口与接口的提供者提供的接口一致
if (UrlUtils.isMatch(url, u)) {
//取到providerUrl的category,正常情况下这里是有三组category的,分别是providers,configurators,routers
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
//根据category区分不同的url
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//后面操作的前提都是result中有内容,也就是说能找到url中匹配的providerUrl
//存储已经notified过的URL以及对应的providerUrl列表
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//将url对应的providerUrl列表在本地作为文件你存储起来,这么做的好处就是如果zk挂了,本地还是有一份备份数据可以取的,不会导致应用直接不可用
saveProperties(url);
//然后看到这里最终调用了通知逻辑
listener.notify(categoryList);
}
}
private void saveProperties(URL url) {
if (file == null) {
return;
}
try {
StringBuilder buf = new StringBuilder();
//在上个操作的时候已经将对应的数据存储到notified中去了
Map<String, List<URL>> categoryNotified = notified.get(url);
//将所有对应的URL组合成字符串
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
//将一个接口对应的所有的category全部存入buf,多个对应数据的话以空格区分,然后写入properties文件
properties.setProperty(url.getServiceKey(), buf.toString());
long version = lastCacheChanged.incrementAndGet();
//是否同步保存文件
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.execute(new SaveProperties(version));
}//这个doSaveProperties方法的内容就是把刚才的properties文件内容再次写入到file缓存文件去
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
通过上面的层层逻辑我们看到,其实subscribe的核心作用是两个:
- 将consumerUrl映射到zk的具体路径上,并且监听具体的provider路径信息变化。
- 将与consumerUrl匹配的providerUrl列表存储到notified属性中,之后再真正开始进行服务调用的时候直接到notified中取出就好。
包括之前我提到的url也说到category有三种类型,一种是providers,另外两种是configurators和routers,providers我相信大家都熟悉,就是普通注册服务时候的category。configurators主要是用于这种场景:当我们的服务已经在线上部署了,但是我们想动态修改某个配置,比如某个接口的时间,这时候我们可以在不改变代码(不用重新发布服务)的前提下直接像注册中心写上覆盖规则,大致像下面这样:
RegistryFactoryregistryFactory=ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registryregistry=registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181")); registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService? category=configurators&dynamic=false&application=foo&timeout=1000"));
这样的话其实就可以覆盖现有服务的某种规则;而routers主要是用于改变某个服务的路由规则:
RegistryFactoryregistryFactory=ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registryregistry=registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181")); registry.register(URL.valueOf("condition://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule="+ URL.encode("host=10.20.153.10=>host=10.20.153.11")+"));
因为规则修改之后都需要重新变更服务的引用,所以会在notify的时候一并更新
明白了上面的作用之后我们再来看看具体的通知操作都做了哪些事情:
public synchronized void notify(List<URL> urls) {
//urls : 是多种类型的混合列表
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
//根据category区分不同类型的URL
for (URL url : urls) {
String protocol = url.getProtocol();//协议
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);//category类型
//如果是路由类型
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
//如果是配置类型
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
//如果是服务提供者的类型
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// 将配置的URL列表转换为configurators
// 将URL列表都转换为Router对象
// 合并override参数(这些流程这次都认为是非核心流程,以后单独讲解,所以本次就略过)
// providers
refreshInvoker(invokerUrls);
}
/**
* 根据invokerURL列表转换为invoker列表。转换规则如下:
* 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
* 2.如果传入的invoker列表不为空,则表示最新的invoker列表
* 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
* @param invokerUrls 传入的参数不能为null
*/
private void refreshInvoker(List<URL> invokerUrls){
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {
this.forbidden = false; // 允许访问
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
}
if (invokerUrls.size() ==0 ){
return;
}
//核心方法
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
// state change
//如果计算错误,则不进行处理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
return ;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try{
destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
}catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
/**
* 将urls转成invokers,如果url已经被refer过,不再重新引用。
*
* @param urls
* @return invokers
*/
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if(urls == null || urls.size() == 0){
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
//可能有多个协议
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
//如果reference端配置了protocol,则只选择匹配的protocol
if (queryProtocols != null && queryProtocols.length() >0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
//如果提供者的服务协议与可接受的服务协议一致的话
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
//如果服务提供者的协议为空的话
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
//合并消费者参数
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // URL参数是排序的
if (keys.contains(key)) { // 重复URL(不需要重复引用)
continue;
}
keys.add(key);
// 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
//每个URL都对应一个key值,然后这个key值对应一个Invoker,这种关系保存在缓存Map中
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 缓存中没有,重新refer,缓存命中的话就代表已经refer过了。因为同一个工程里面可以多次引用同一个服务,所以cache还是很有用的
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = ! url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
======================================================
//奶奶的,终于看到refer调用了(这里是获得Invoker的过程,下个篇章讲解)
if (enabled) {
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
======================================================
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
}
if (invoker != null) { // 将新的引用放入缓存
newUrlInvokerMap.put(key, invoker);
}
}else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();//促进垃圾回收
return newUrlInvokerMap;
}
/**
* 将invokers列表转换为methodName-》List<Invoker>的map存储起来
* @param invokersMap Invoker列表
* @return Invoker与方法的映射关系
*/
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
// 本质上就是将组成method->List<Invoker>组合
List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
if (invokersMap != null && invokersMap.size() > 0) {
for (Invoker<T> invoker : invokersMap.values()) {
String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
if (parameter != null && parameter.length() > 0) {
String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
if (methods != null && methods.length > 0) {
for (String method : methods) {
if (method != null && method.length() > 0
&& ! Constants.ANY_VALUE.equals(method)) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null) {
methodInvokers = new ArrayList<Invoker<T>>();
newMethodInvokerMap.put(method, methodInvokers);
}
methodInvokers.add(invoker);
}
}
}
}
invokersList.add(invoker);
}
}
// 处理*逻辑,*意味这所有方法
newMethodInvokerMap.put(Constants.ANY_VALUE, invokersList);
if (serviceMethods != null && serviceMethods.length > 0) {
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null || methodInvokers.size() == 0) {
methodInvokers = invokersList;
}
//在映射的时候直接处理路由逻辑
newMethodInvokerMap.put(method, route(methodInvokers, method));
}
}
//将一个method下对应的Invoker进行排序
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
}
基本到了这里的话所有的关于服务引用并且监听的逻辑都差不多讲完了,我们总结一下服务引用整体都做了哪些核心工作:
- 根据zk的结构,由consumerUrl得到对应的可用的providerUrl列表,存储在AbstractRegistry的notified属性中。
- 在RegistryDirectory的urlInvokerMap中存储了providerUrl和List<Invoker>的映射。
从这里我们就知道之后在真正引用服务的时候大致步骤了:先根据consumerUrl得到对应的providerUrl列表,然后根据具体的路有规则又得到具体的Invoker,拿到具体的Invoker之后就可以随意嗨了。
所有的服务引用逻辑,在这里都讲完了。下一篇会讲拿到Invoker之后的操作了。