1.概述
Dubbo服务暴露原理一文已经详细介绍了服务暴露的原理以及过程,本文会对剩下的服务引用和调用的原理及其过程进行详细介绍。在服务消费方中,接口并不包含具体的实现逻辑,具体实现都放在服务提供方,但是当我们在调用接口的时候,却能做到与调用本地方法没有区别,原因在于调用方提供了一个代理类,在运行时与该接口绑定,当接口中的方法被调用时,实际是作用于该代理类上,代理类封装了远程调用的逻辑,把请求参数发送给远程服务提供方,获取结果后再返回。因此在调用具体方法之前,需要获取到这个代理对象,这便是服务引用的过程。
Dubbo
在服务引用的时候还会订阅zookeeper
以保证能时刻监听provider
节点新增或者减少的信息,实现动态感知,对应subscribe
流程。2.Dubbo服务引用
dubbo-demo-consumer
配置文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),
don't set it same as provider -->
<dubbo:application name="demo-consumer"/>
<!-- use multicast registry center to discover service -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- generate proxy for the remote service, then demoService can be used in the same way as the
local regular interface -->
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
</beans>
Java代码示例如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.demo.consumer;
import com.alibaba.dubbo.demo.DemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Consumer {
public static void main(String[] args) {
//Prevent to get IPV6 address,this way only work in debug mode
//But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
context.start();
/*服务引用*/
DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
while (true) {
try {
Thread.sleep(1000);
/*服务调用*/
String hello = demoService.sayHello("world"); // call remote method
System.out.println(hello); // get result
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
}
整个服务引用的流程如下图所示:
可以概括为以下四个步骤:
- 创建invoker对象,并对其加工(在后续的服务调用中是以invoker对象为载体完成的)
- 建立与
zookeeper
的连接并订阅注册中心消息 - 初始化Netty Client并建立与服务提供方Netty Server的连接
- 返回代理对象以便服务调用时使用
2.1创建并加工invoker
ReferenceConfig
类中refprotocol
属性通过Dubbo SPI机制初始化,被赋值为Protocol$Adaptive
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
当调用refer
方法时,会执行如下逻辑,其中extension
为ProtocolListenerWrapper
。此处的设计与服务暴露中是一致的,因为ProtocolListenerWrapper
与ProtocolFilterWrapper
为Protocol
接口的包装类,在整个调用过程中会优先调用这两个类的refer
方法。
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
...
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
ProtocolListenerWrapper
类实现:
/**
* ListenerProtocol
*/
public class ProtocolListenerWrapper implements Protocol {
...
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
/*url.getProtocol() == registry*/
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
...
}
}
ProtocolFilterWrapper
类实现:
/**
* ListenerProtocol
*/
public class ProtocolFilterWrapper implements Protocol {
...
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
/*url.getProtocol() == registry*/
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
...
}
}
整个流程如下:
2.2建立zookeeper连接并订阅注册中心信息
registryFactory.getRegistry
方法会创建ZookeeperRegistry
对象,在ZookeeperRegistry
构造函数中会建立consumer与zookeeper的连接,directory.subscribe
会订阅注册中心消息,这里最终会调用到CuratorZookeeperClient#addTargetChildListener
实现,订阅地址为/dubbo/com.alibaba.dubbo.demo.DemoService/providers
即服务暴露时注册的地址。
/**
* RegistryProtocol
*
*/
public class RegistryProtocol implements Protocol {
...
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
/**设置registry属性值为dubbo**/
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
/**建立与注册中心的连接**/
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
/*订阅注册中心消息*/
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
...
}
在注册中心完成消息订阅后,会根据服务暴露时的url转换为invoker
对象
因为
protocol
具有两个包装类,ProtocolListenerWrapper
与ProtocolFilterWrapper
,因此整个调用过程会先经过这两个类
/**
* ListenerProtocol
*/
public class ProtocolListenerWrapper implements Protocol {
...
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
...
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
...
}
ListenerInvokerWrapper
类的设计与在服务暴露中分析的ListenerExporterWrapper
类的设计是非常相似的,在构造函数中遍历调用InvokerListener
的referred
方法,此处作为扩展点,开发者可以在接口的实现类中加入自己的逻辑也可以定制化实现类以达到监听器的效果,destroy
方法同理。
/**
* ListenerInvoker
*/
public class ListenerInvokerWrapper<T> implements Invoker<T> {
...
public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
if (invoker == null) {
throw new IllegalArgumentException("invoker == null");
}
this.invoker = invoker;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
/*调用InvokerListener接口实现类的referred方法*/
listener.referred(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
...
@Override
public void destroy() {
try {
invoker.destroy();
} finally {
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
/*调用InvokerListener接口实现类的destroyed方法*/
listener.destroyed(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
}
}
ProtocolFilterWrapper
作为过滤器,主要负责包装invoker
类,在调用invoke
方法的时候会触发Fliter
实现类的invoke
方法,开发者可以定义自己的Filter
以达到扩展框架的作用。
/**
* ListenerProtocol
*/
public class ProtocolFilterWrapper implements Protocol {
...
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
/*filter包装invoker对象*/
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
...
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
}
整体流程如图:
2.3初始化Netty Client并建立与服务提供方Netty Server的连接
该阶段目的为与服务提供方建立长连接,成功后会返回ExchangeClient
对象并作为Invoker
的属性保存,在服务调用阶段,实现接口的代理类直接选择invoker
中保存的clients
向服务提供者发送网络请求。其中Dubbo协议支持两种模式的网络连接,一种是共享client即所有调用共用一个连接,另一种是创建多个连接。
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
...
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
/**创建dubboInvoker对象,该对象会有ExchangeClient[] client属性存储与server端的长连接**/
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
/**获取与server端的连接**/
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
/**获取配置中的connections属性,不配置默认为0**/
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
/**connections属性不配置或者配置为0**/
if (service_share_connect) {
/**只创建一个公共的客户端**/
clients[i] = getSharedClient(url);
} else {
/**创建多个client,调用时轮流使用**/
clients[i] = initClient(url);
}
}
return clients;
}
...
}
共享连接的模式下,在一个消费者应用中,使用Dubbo协议进行通讯时会创建一个DubboProtocol
类对象,对于公共连接,该对象会使用referenceClientMap
这个属性进行维护,其中key为从配置中心获取到的服务提供方的ip:port
,value就为创建出来的client。对于每一个rpc接口类来说(在某个特定的协议下,这里我们使用的是Dubbo协议),都会生成一个对应的invoker
对象在调用时使用,同时invoker
对象中会有属性来保存DubboProtocol
提供的这个公共client以便在调用时直接使用,如下图所示:
在
DubboProtocol
类中的实现如下所示:
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
...
/**
* Get shared connection
*/
private ExchangeClient getSharedClient(URL url) {
/**key=ip:port**/
String key = url.getAddress();
/**从缓存中获取client**/
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
/**已经初始化过client,记录的连接数增加1(每关闭一个连接会减少1,直接为0时才能销毁)**/
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
ExchangeClient exchangeClient = initClient(url);
/**无client对象,初始化**/
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
/**放入map中缓存**/
referenceClientMap.put(key, client);
/**删除懒加载client保存的key,因为这时需要立即建立client与server的连接**/
ghostClientMap.remove(key);
locks.remove(key);
return client;
}
}
...
}
对于公共client,会使ReferenceCountExchangeClient
类包装,其中refenceCount
属性会记录连接数,每有一个invoker使用该client就会增加1,其目的是在断开连接时使用,每断开一个连接该值就会减少1,为0时说明无连接,这时client就能够被安全销毁,ReferenceCountExchangeClient
类如下所示:
/**
* dubbo protocol support class.
*/
@SuppressWarnings("deprecation")
final class ReferenceCountExchangeClient implements ExchangeClient {
...
private final URL url;
/**refenceCount变量记录连接数,每有一个invoker使用shared client就会增加一**/
private final AtomicInteger refenceCount = new AtomicInteger(0);
// private final ExchangeHandler handler;
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap;
private ExchangeClient client;
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
this.client = client;
/**初始化时增加到1**/
refenceCount.incrementAndGet();
this.url = client.getUrl();
if (ghostClientMap == null) {
throw new IllegalStateException("ghostClientMap can not be null, url: " + url);
}
this.ghostClientMap = ghostClientMap;
}
...
}
在多个连接的模式下,消费端可以通过connections
属性配置,不配置该属性则默认共享一个client(详见:Dubbo连接控制),与sharedClient模式同理,在每个rpc接口类所对应的invoker
对象中,会有一个数组用于保存多个client,在调用时轮流使用,与共享模式不同的是DubboProtocol
只会负责帮助生成这些client,并不会保存,如下图所示:
在
DubboProtocol
类中的实现如下所示:
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
...
/**
* Create new connection
*/
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
/**懒加载属性lazy,默认为false**/
// connection should be lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
/**配置懒加载时,只有在调用时才能建立连接**/
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
/**与netty server建立连接**/
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
...
}
2.4返回代理对象以便服务调用时使用
ReferenceConfig
类中通过refprotocol.refer
获取到invoker
对象后,执行(T) proxyFactory.getProxy(invoker)
返回代理对象,这里以JDK的动态代理为例
/**
* JavaassistRpcProxyFactory
*/
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
/**InvokerInvocationHandler继承jdk自带的InvocationHandler实现动态代理**/
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
...
}
当调用rpc接口类的方法时,就会被拦截到InvokerInvocationHandler
中处理,执行invoker.invoke(new RpcInvocation(method, args)).recreate()
:
/**
* InvokerHandler
*/
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
3.Dubbo服务调用
整个服务调用可以划分为四步:
- 消费者封装方法、入参,选择远程服务提供方ip、port发起网络请求
- 生产者接收到消费者发送的请求,根据发送过来的调用方法、入参匹配到接口实现,执行调用并获取结果
- 生产者将返回结果回送给消费者
-
消费者接收到调用结果并将结果返回给业务代码
3.1消费者发起方法调用
消费者发起方法调用流程大致如下:
在服务引用阶段Dubbo为每个rpc接口返回了对应的代理类,服务调用时所有的方法都会被拦截到代理类中执行
invoker.invoke
,其中invoker
为MockClusterInvoker
,在服务引用阶段ReferenceConfig
类中调用cluster.join
返回的
/**
* InvokerHandler
*/
public class InvokerInvocationHandler implements InvocationHandler {
...
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...
/**当业务代码调用接口的方法时触发,method为方法名,args为入参,args以数组的方式记录,多个入参时从左往右依次记录在数组中**/
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
在MockClusterInvoker
与FailoverClusterInvoker
这两个阶段会选择出执行调用的invoker
对象,整个阶段称为集群容错的过程,包括根据路由策略过滤invoker
,根据不同的负载均衡策略选择出要调用的invoker
,面对调用失败的场景进行特殊处理(例如:重试、快速失败等)。选择出要执行调用的invoker
对象后,会进入到Filter
的调用链,因为在服务引用阶段生成invoker
时,对该invoker
对象使用Filter
进行包装。
/**
* ListenerProtocol
*/
public class ProtocolFilterWrapper implements Protocol {
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
...
@Override
public Result invoke(Invocation invocation) throws RpcException {
/**依次调用filter的invoke方法**/
return filter.invoke(next, invocation);
}
...
};
}
}
return last;
}
}
完成Filter
过滤器链的调用后,最终到DubboInvoker
执行远程调用逻辑,在服务引用阶段invoker
对象中会保存初始化成功的client对象,在调用时直接使用以发送网络请求。其中index
属性主要用来在多连接的场景下计数,每次调用doInvoke
都会增加1,使用client时会根据index
值与连接数量取模来选取以达到每个client能被均衡使用到的目的。调用分为三类:
- 调用不需要服务提供者返回结果:直接返回一个空的
RpcResult
对象 - 异步调用返回结果:在请求上下文中放置一个
Future
对象后返回一个空的RpcResult
对象,用户使用RpcContext.getContext().getFuture就能获取Future
对象,使用Future
对象提供的get
方法就能阻塞当前线程直到结果返回 - 同步调用返回结果,框架主动调用
Future
对象提供的get
方法进行阻塞,直到服务提供方结果返回
同步与异步调用的不同之处在于是框架自身调用Future
对象提供的get
方法还是由业务代码控制调用,当由业务代码来控制阻塞的时候就能够在结果返回之前执行与本次rpc调用无关的代码逻辑以达到异步化的目的。
/**
* DubboInvoker
*/
public class DubboInvoker<T> extends AbstractInvoker<T> {
/**服务引用时生成的client保存在该属性中以便在调用时使用**/
private final ExchangeClient[] clients;
/**使用AtomicPositiveInteger类保证并发安全,每次调用doInvoke时增加1**/
private final AtomicPositiveInteger index = new AtomicPositiveInteger();
...
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
/**拥有多个client连接时,每次调用时index增加1取模client总数量获取client连接,达到每个client被轮流使用到的效果**/
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
/**消费者调用时不需要生产者返回**/
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
/**异步调用,直接返回RpcResult,由用户使用RpcContext.getContext().getFuture获取future对象来阻塞**/
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
/**同步调用使用get方法阻塞直到结果返回**/
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
...
}
3.2生产者接收网络请求,执行真正的接口调用
DubboProtocol
中的requestHandler
会处理消费者发送过来的RpcInvocation
对象,根据ip:port获取exporterMap
中的DubboExporter
对象(服务暴露阶段存储的),再根据exporter
对象获取到invoker
,对invoker
进行调用。
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
...
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
/**获取服务暴露阶段创建的invoker**/
Invoker<?> invoker = getInvoker(channel, inv);
...
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
};
...
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
...
/**生成serviceKey,ip:port**/
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
/**根据serviceKey获取服务暴露阶段存储的exporter**/
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
/**获取invoker**/
return exporter.getInvoker();
}
...
}
调用时序图如下所示:
DubboProtocol
调用invoker
提供的invoke
方法后会依次执行Filter
调用链中的invoke
方法,因为该invoker
在服务暴露阶段时被Filter
调用链所包装,完成Filter
中的调用后会进入之前生成好的JdkProxyFactory
代理工厂,完成最终的DemoServiceImpl
的调用并获取结果。
/**
* JavaassistRpcProxyFactory
*/
public class JdkProxyFactory extends AbstractProxyFactory {
...
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
/**调用DemoServiceImpl完成最终的调用**/
return method.invoke(proxy, arguments);
}
};
}
}
3.3生产者将返回结果回送给消费者
获取到DemoServiceImpl的执行结果后,在HeaderExchangeHandler
类中通过调用channel.send
将结果返回给consumer
/**
* ExchangeReceiver
*/
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
...
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
/**将获取到的结果返回给consumer**/
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
}
...
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
}
3.4消费者接收到调用结果并将结果返回给业务代码
该过程与向服务提供方发送请求时的顺序相反,时序图如下:
4.总结
Dubbo服务引用的过程就是为消费者生成代理类的过程,在服务调用阶段所有对方法的调用会直接作用于代理类,代理类帮助完成请求的封装、网络请求的发送、结果的接收等等,其中invoker
作为实现这些操作的桥梁,所有的调用最终都是作用于invoker
对象上的,接口类、协议与远程地址这三个维度共同决定一个invoker
对象。