注:文章中使用的dubbo源码版本为2.5.4
零、文章目录
- Dubbo的三种RPC调用方式
- 关键类介绍
- DefaultFuture实现
- 调用入口流程
- 服务引用方请求响应模型总结
一、Dubbo的三种RPC调用方式
1.1 异步&无返回值
a)服务引用配置如下:
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" async="true" return="false"/>
</dubbo:reference>
-
async="true"
; -
return="false"
;
b)服务调用方式如下:
String hello = demoService.sayHello("world" + i);
- 因为使用了异步无返回值的模式,所以hello的值一直为null;
1.2 异步&有返回值
a)服务引用配置如下:
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" async="true"/>
</dubbo:reference>
-
async="true"
;
b)服务调用方式如下:
demoService.sayHello("world" + i);
Future<String> temp = RpcContext.getContext().getFuture();
String hello = temp.get();
- 因为使用了异步模式,
demoService.sayHello()
被调用后立即返回(此时RPC调用结果还未生成,RPC的执行过程不阻塞业务请求线程); - 服务引用方业务请求线程可以在合适的时候执行
RpcContext.getContext().getFuture()
获取RPC调用结果;
1.3 异步变同步
a)服务引用配置如下:
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" async="false"/>
</dubbo:reference>
-
async="false"
,默认值;
b)服务调用方式如下:
String hello = demoService.sayHello("world" + i);
- 因为使用了同步模式,
demoService.sayHello()
被调用后直到收到RPC响应才返回,sayHello()
方法返回时得到hello
的值,期间业务请求线程被阻塞。
二、关键类介绍
2.1 RPC请求消息封装(Request)
dubbo的交换层定义了RPC请求的封装类Request
,它包含了一个RPC请求所具备的关键信息。
public class Request {
//请求ID生成器,AtomicLong.inc实现
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
//RPC调用的请求ID,在单个Client端内全局唯一
private final long mId;
//RPC请求响应消息协议版本
private String mVersion;
//是否为双向请求响应
private boolean mTwoWay = true;
//实际RPC调用的请求数据,对应了Invocation类,调用参数都封装在这里了
private Object mData;
//Request初始化时,生成请求ID
public Request() {
mId = newId();
}
//请求ID生成方法
private static long newId() {
// getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
return INVOKE_ID.getAndIncrement();
}
}
2.2 RPC响应消息封装(Response)
dubbo的交换层定义了RPC响应的封装类Response
,它包含了一个RPC响应所具备的关键信息。
public class Response {
/**
* ok.
*/
public static final byte OK = 20;
/**
* clien side timeout.
*/
public static final byte CLIENT_TIMEOUT = 30;
/**
* server side timeout.
*/
public static final byte SERVER_TIMEOUT = 31;
// ...省略一些状态码...
//RPC调用的请求ID,默认为0,从Request中获取
private long mId = 0;
//RPC请求响应消息协议版本
private String mVersion;
//响应状态码,默认OK,出现异常时重新设置
private byte mStatus = OK;
//响应错误信息
private String mErrorMsg;
//实际RPC调用的响应数据,对应实际实现类的方法执行结果
private Object mResult;
}
2.3 RPC调用Future接口(ResponseFuture)
dubbo的交换层定义了RPC调用的响应Future接口ResponseFuture
,它封装了请求响应模式,例如提供了将异步网络通信转换成同步RPC调用的关键方法Object get(int timeoutInMillis)
。
public interface ResponseFuture {
/**
* 获取RPC远程执行结果,异步IO转同步RPC的关键方法
*/
Object get() throws RemotingException;
/**
* 获取RPC远程执行结果,异步IO转同步RPC的关键方法
*/
Object get(int timeoutInMillis) throws RemotingException;
/**
* set callback. 响应回调模式
*/
void setCallback(ResponseCallback callback);
/**
* RPC调用是否完成
*/
boolean isDone();
}
三、DefaultFuture实现
DefaultFuture
是ResponseFuture
接口的实现类,具体实现了接口定义的方法。
3.1 请求响应信息的承载
-
DefaultFuture
提供构造方法,在HeaderExchangeChannel.request()
中被调用,用于构建RPC调用Future并返回给调用方使用; -
DefaultFuture
中包含以下关键属性,用于承载请求响应信息;
public class DefaultFuture implements ResponseFuture {
//<请求ID,消息通道> 的映射关系
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
//<请求ID,未完成状态的RPC请求> 的映射关系
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
//RPC调用的请求ID,构造器中从Request获取
private final long id;
//消息通道,构造器传入
private final Channel channel;
//RPC请求消息,构造器传入
private final Request request;
//RPC响应消息
private volatile Response response;
//RPC响应回调器
private volatile ResponseCallback callback;
//构造器
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
}
3.2 RPC执行超时检测
-
DefaultFuture
定义了RPC执行超时时间timeout
和RPC执行开始时间start
,他们都在DefaultFuture
构建时被初始化; -
DefaultFuture
中启动了一个后台守护线程,用于周期性执行 “RPC超时检测任务RemotingInvocationTimeoutScan
”。该任务不断检测 “未完成状态的RPC请求FUTURE” 中哪些已经超时(通过比较System.currentTimeMillis() - start
与timeout
) - 对已超时的RPC请求,构建相应的
超时响应Response
并触发received()
方法。
public class DefaultFuture implements ResponseFuture {
//RPC超时轮询线程,不断轮询超时状态的FUTURES,主动移除并返回超时结果
static {
Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
th.setDaemon(true);
th.start();
}
//RPC执行超时时间,构造器传入,默认值1s
private final int timeout;
//DefaultFuture构建时间
private final long start = System.currentTimeMillis();
}
3.3 异步网络通信转同步RPC调用
-
DefaultFuture
实现了ResponseFuture
接口的重要方法get(int timeout)
,用于同步等待RPC执行结果的返回(成功/异常/超时); -
DefaultFuture
提供了静态方法received(Channel channel, Response response)
,用于对客户端收到的RPC执行响应Response
进行处理。处理逻辑就是讲响应结果放入response
并通知在done
上组织等待的业务线程,同时通知本次RPC请求绑定的回调器Callback
;
public class DefaultFuture implements ResponseFuture {
//响应消息处理互斥锁,get()、doReceived()、setCallback()方法中使用
private final Lock lock = new ReentrantLock();
//请求响应模式Condition,通过get()中的await和doReceived()中的signal完成IO异步转RPC同步
private final Condition done = lock.newCondition();
//RPC响应消息接收方法
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
//将响应结果放入response,通知在done上等待的业务线程,并执行invokeCallback方法触发所有绑定的Callbask执行
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
//RPC执行结果同步获取方法,RPC的同步请求模式就依赖此方法完成,依赖done.await同步等待RPC执行结果
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
}
3.4 异步网络通信转同步RPC调用的两个关键点
a)发起RPC调用请求的业务线程,是如何同步阻塞等待直到RPC响应返回的?
- 业务请求线程调用
HeaderExchangeClient.request()
方法发送RPC请求消息到网络,然后直接调用DefaultFuture.get()
方法阻塞等待RPC执行结果; -
get()
阻塞等待的本质:循环检测Response结果是否被设置成功,如果不成功使用Condition.await()
阻塞直到结果返回; -
NettyClient
接收到RPC响应消息时,会调用DefaultFuture.received()
方法,该方法中触发了Condition.signal()
通知业务请求线程解除阻塞等待状态;
b)对于全双工的网络通信,在多线程并发请求响应的情况下,如果找到RPC响应Response
对应的RPC请求Request
?
- 对于不同的服务消费者客户端,请求响应自然与其网络通道
Channel
绑定,不会存在消费者A接收到消费者B的RPC响应的情况; - 对于同一服务消费者客户端,在RPC请求
Request
构建时生成并携带全局唯一自增ID,RPC响应Response
会携带该ID返回。消费者客户端只需维护 “唯一ID与RPC请求的关系Map<Long, DefaultFuture> FUTURES
”即可定位RPC响应对应的RPC调用上下文;
四、调用入口流程
在 dubbo剖析:五 请求发送与接收 中讲到,服务引用方调用dubbo的代理对象发起RPC请求时,最终会执行到DubboInvoker.doInvoke()
方法:
protected Result doInvoke(final Invocation invocation) throws Throwable {
//入参构建及获取ExchangeClient
RpcInvocation inv = (RpcInvocation) invocation;
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
//case1. 异步,无返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
}
//case2. 异步,有返回值
else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
}
//case3. 异步转同步(默认的通信方式)
else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
- case1.异步&无返回值:直接调用
ExchangeClient.send()
方法,不需要请求响应模式的封装; - case2.异步&有返回值:调用
ExchangeClient.request()
方法后,将ResponseFuture
放入RPC请求上下文中,直接返回; - case3.异步转同步:调用
ExchangeClient.request()
方法后,直接继续调用ResponseFuture.get()
同步等待获取RPC执行结果;
五、服务引用方请求响应模型总结
- 蓝色代表“发送RPC请求”过程,由业务请求线程执行,通过
NettyChannel
将请求数据放入Netty
的IO任务队列后,构建ResponseFuture
并返回。此时RPC请求发送及响应接收并未真正完成; - 紫色是基于
Netty
的网络消息收发过程,通过当前网络通道绑定的NioEventLoop线程
轮询完成; - 橙色代表“接收RPC响应”过程,该过程在 Dubbo业务线程池 中执行,处理RPC响应消息并交由
ResponseFuture
触发接收响应的逻辑; - 绿色代表“获取RPC调用结果”过程,由业务请求线程执行,阻塞直到从
ResponseFuture
中获取到RPC响应结果;