整个的服务调用的流程说起来也非常的清晰,只是流程比较长,中间又利用SPI不太好调试而已。
- 前期准备工作,结合spring配置好bean信息以及各种协议
- 连接并注册到注册中心
- 订阅消息,获取到服务提供方IP:PROT列表(具体得看服务提供方有多少)
- 根据IP:PROT 生成对应的Invoker(Invoker是调用的实体,其中包含了所具备的信息)
- 动态代理执行invoke反射执行方法
- 是否会进行mock测试
- 负载均衡选择合适的某一具体服务提供方
- 加入重试机制,如果出现类似timeout等情况会进行重试操作(有一点需要注意,biz异常是不再进行重试,而直接上抛异常)
- 服务异步调用或者有无返回值
接下来就具体的总结归纳下上面的9点
1、Dubbo & Spring
dubbo 作为一个基于java并且依赖spring开发的rpc框架,dubbo的bean肯定是必须被Spring IOC 容器所管理的,所实现的各种方法,也是利用spring的接口。
1.1、服务提供方
dubbo的服务提供方是利用了在spring完成对bean的生成后,在finishRefresh方法中利用ContextRefreshedEvent事件去完成服务暴露的操作
看看在服务暴露的UML序列图
ServiceBean 类
public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if (isDelay() && ! isExported() && ! isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export(); // 服务暴露
}
}
}
结合上面的序列图以及ServiceBean的onApplicationEvent函数,非常清楚的展示了dubbo是如何利用spring的功能实现服务暴露
需要注意到onApplicationEvent的方法是来自于ApplicationListener接口,在自定义使用该功能的时候,同样要实现ApplicationListener接口,后续spring通过getApplicationListeners方法就扫描到自定义的监听器bean。
TODO 写一个小demo完成自己的小实践,加强理解 2018年05月19日18:29:49
1.2、服务调用方
服务调用方不是利用上述的事件机制去完成服务调用,而是利用FactoryBean。看下面两个类实现的接口就已经能够说明问题了。
public class ServiceBean<T> extends ServiceConfig<T>
implements InitializingBean,
DisposableBean,
ApplicationContextAware,
ApplicationListener,
BeanNameAware{}
public class ReferenceBean<T> extends ReferenceConfig<T>
implements FactoryBean,
ApplicationContextAware,
InitializingBean,
DisposableBean {}
在spring中bean包含了三种类型
- 普通bean:一般意义上的普通bean,通过getBean(XXX)就可以获取到
- 工厂bean:通过工厂方法自定义生成bean,其实就是对外暴露可以修改bean对应的真正实体对象,通过getBean(XXX)就可以获取到自定义后的bean实体,如需要获取bean本身,则需要通过getBean("&" + XXX )
- bean工厂:其实不是bean,是Spring IOC 容器
如果对上面三点有什么不理解的,可以看看spring bean工厂和工厂bean 源码分析
public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
if (destroyed){
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
2、连接并且注册中心
2.1、连接注册中心
连接到注册中心的前提条件肯定是获取注册中心的配置信息,要不然如何连接呢?
<dubbo:registry protocol="zookeeper" address="${dubbo.zk.servers}" client="zkclient" group="${dubbo.zk.group}"
/>
如上xml配置信息,使用的是zookeeper协议,有设置注册地址,分组等信息。
那么第一步通过loadRegistries方法拼接出来的注册中心url是类似于registry://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?application=dubbo-consume&client=zkclient&dubbo=2.5.3&group=dubbo-demo&owner=jwfy&pid=1527®istry=zookeeper×tamp=1525943117155
然后来到了RegistryProtocol类,第一步是替换协议url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
,结果就是协议变成zookeeper://XXXXX
,第二步是获取注册中心Registry registry = registryFactory.getRegistry(url);
这句话就是获取注册中心,registryFactory本身是一个动态生成的代码,最后指向的类肯定就是ZookeeperRegistryFactory了
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// 锁定注册中心获取过程,保证注册中心单一实例
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 释放锁
LOCK.unlock();
}
}
上述代码中有几个关键点
toServiceString():这本身是个url的方法,深入进去看,会发现得到的key是类似于
zookeeper://127.0.0.1:2182/dubbo-demo/com.alibaba.dubbo.registry.RegistryService
的数据,包含的内容是注册中心的协议,端口,ip,以及分组,版本(由于未设置版本,故本demo无此数据),参数信息。REGISTRIES :线程安全的map,是<key,Registry>结构,如果下一次再来了同一个key的注册中心获取,那就不需要再次生成,直接从map获取即可
-
createRegistry(url):注册全新的注册中心,如下图,其中包含了zkClient,可以进行和注册中心的交互操作
通过上述getRegistry调用就完成了获取注册中心的操作,也就可以认为是连接上了注册中心,不过其实这里并没有真正的连接注册中心,只是完成了连接注册中心的配置,理解上可以这样认为罢了,真正的连接是在后面的注册操作中。
2.2、注册到注册中心
注册到注册中心,什么注册到注册中心呢?肯定是服务调用方的使用信息注册到注册中心
具体的可以看看Dubbo 服务调用 源码学习(上)(六)
归纳出来就是上面的url转变为consumer协议的subscribeUrl参数,然后调用ZookeeperRegistry的register操作,之后在zk中心可以观察到注册好的服务调用方。
3、获取服务提供方(订阅)& Invoker生成
3.1、获取服务提供方(订阅)
获取服务提供方的操作是从zk注册中心获取到相对应的服务提供方的信息,在dubbo中称为订阅。真正使用的也是ZookeeperRegistry的subscribe方法。先进入到FailbackRegistry,再进入到ZookeeperRegistry
- FailbackRegistry:当获取提供方出现问题,要么直接抛出异常,要么添加到失败列表,由定时线程去主动重试
其实看名字也能大概猜到这是个出现失败如何操作的抽象类
- ZookeeperRegistry:zookeeper协议的注册中心对象,实现具体协议的数据注册和订阅
具体类的关系,如下图所示
继续回到ZookeeperRegistry的doSubscribe方法,其中有个集合非常关键,ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
,doSubscribe传入url和listener,就可以获取最后的zk链接的ChildListener
后面的zkClient.addChildListener
返回值List<String>也就是对应目录的服务提供方zk配置
获取了服务提供方的zk属性呢?一方面存储到本地的cache文件,另一方面交由RegistryDirectory的notify方法去生成invoke,之前没有很仔细的介绍notify方法。
进入到FailbackRegistry类
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// 将失败的通知请求记录到失败列表,定时重试
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
如上述代码可以知道FailbackRegistry类主要的功能就是出现问题如何处理,一般情况下都是交由定时线程去处理,具体如下FailbackRegistry类的失败集合
// 定时任务执行器
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
// 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
private final ScheduledFuture<?> retryFuture;
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); // 注册到注册中心失败的url
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); // 取消注册到注册中心的url
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 订阅失败的url以及监听器
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); // 取消订阅的url以及监听器
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); // 通知失败
后续的notify操作在文章Dubbo 服务调用 源码学习(上)也已经说明了
3.2、Invoke 生成
其实真正的invoker已经在上面的notify方法的refreshInvoker方法中刷新了服务提供方具体的invoker对象。那么此处说明的invoke又是什么呢?是具体的通过接口生成的代理对象proxyFactory.getProxy(invoker)
,这个参数里面的invoker是MockClusterInvoker,包含了上文说的RegistryDirectory,而在RegistryDirectory包含了所有的具体的invoker对象信息。
后续真正的生成是在JavassistProxyFactory类完成对spring 接口动态生成的,是InvokerInvocationHandler类。
4、Mock & 负载均衡 & 重试
4.1、Mock
在经过整个的服务调用过程中mock本身是作为mock测试而使用的,我们应该理解mock测试是在假设服务提供方并没有真正的可以被调用,而为了确保开发进度不被阻塞的一种手段,主需要约定好服务提供方和服务调用方的接口即可。
在3.2小节已经说了通过动态代理生成的对象的invoker就是MockClusterInvoker类
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
// 看url中是否包含了mock或者name + mock的键值对
if (value.length() == 0 || value.equalsIgnoreCase("false")){
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
// 如果值不为false而且是force开头
// doMockInvoke 会筛选出可以进行mock的invokerlist
// 然后选择第一个invoker进行调用操作
// 压根没有任何均衡负责的操作,也不会跨网络调用
result = doMockInvoke(invocation, null);
}
...
}
具体的mock操作需要设置mock类型的服务提供方和可以进行mock测试的服务调用方,这个可以分为一篇文章去学习。
4.2、负载均衡
在4、mock一小节说了mock测试不会进行均衡负责,直接选择第一个invoker,那么在正常调用中肯定就有负载均衡了。负载均衡并没有立马选择出合适的invoker,而是先选择出可用的均衡负载类对象,然后进入到了FailoverClusterInvoker类中进行doInvoke方法调用选择
主要的原因是在调用过程中,服务提供方会随时变化,而一开始调用就提前选择好均衡负责的invoker,则可能会出现在后面的具体invoker操作出现不可用的情况(被选择好的invoker不可用)
4.3、重试操作
在上文的获取注册中心中使用了FailbackRegistry进行重试操作,同样的在具体的invoker调用更容易出现各种异常从而出现重试操作,这就使用到了FailoverClusterInvoker类了。
他是被MockClusterInvoker包含的invoker对象
5、调用方式
调用方法分为三类
- 不关心返回值的调用:不会设置future
- 异步调用:设置future,回调返回
- 同步调用:设置future了,只是再调用get方法,从而认为是同步调用(其实不是严格的同步调用)
在设置调用的时候可以通过return = "true" 以及async = "true"完成不关系返回值的调用和异步调用,正常的调用默认都是false值