注:文章中的坑出现在2.5.4版本之前,这个坑在2.5.4版本已经得到修复。
一、问题描述

问题描述
场景描述,如上图所示:
客户端远程异步调用服务A,服务A在处理客户端请求的过程中需要远程同步调用服务B,服务A从服务B的响应中取数据时,得到的是 null !!!
二、原因分析

RPC请求响应参数传递过程
2.1 Client的请求发送过程
1)Client在发起RPC调用请求前,将请求参数构建成 RpcInvocation ;
2)Client在发起RPC调用请求前,会经过Filter处理:
-
ConsumerContextFilter会将请求信息,如invoker、invocation、Address等,写入RpcContext;
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.getContext().clearAttachments();
}
}
}
3)Client在发起RPC调用请求前,会经过AbstractInvoker:
-
AbstractInvoker会将RpcContext中的attachments内容写入到RpcInvocation,以实现附加参数的传递;
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
-
AbstractInvoker会从RPC请求参数URL中ASYNC_KEY的值,并设置到RpcInvocation的attachment中;
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
4)Client在发起RPC调用请求时,会经过DubboInvoker:
- DubboInvoker会优先从
RpcInvocation的attachment中获取并判断ASYNC_KEY是否为true,以实现消费端的异步调用;
public static boolean isAsync(URL url, Invocation inv) {
boolean isAsync;
//如果Java代码中设置优先.
if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {
isAsync = true;
} else {
isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);
}
return isAsync;
}
5)Client在发起RPC调用请求时,会将RpcInvocation作为调用参数传递给服务提供方:
-
RpcInvocation中的扩展属性attachments,实现了请求调用扩展信息传递的功能;
2.2 服务A的请求接收和执行过程
1)服务端在接收到RPC请求,调用真正实现接口前,会经过ContextFilter。
-
ContextFilter会将请求信息,如invoker、invocation、Address等,写入RpcContext; -
ContextFilter会将请求参数RpcInvocation的attachments扩展信息取出,过滤掉某些特定KEY之后,将其余扩展属性设置到当前RpcContext的attachments中;
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
attachments = new HashMap<String, String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
attachments.remove(Constants.ASYNC_KEY);//清空消费端的异步参数,2.5.4版本才新加进去的
}
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setAttachments(attachments)
.setLocalAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
其中attachments.remove(Constants.ASYNC_KEY);//清空消费端的异步参数 这行代码是在dubbo的2.5.4版本才加进去的,也就是之前的版本中并没有这行代码。
2)在2.5.4版本之前,对于Client发来的异步调用请求,其RpcInvocation参数中包含了ASYNC=true的attachment扩展信息:
- 此时
ASYNC=true的这个扩展信息就会被设置到服务A的RpcContext的扩展属性中; - 在
服务A处理RPC调用,执行实际接口实现类的逻辑时,因为依赖的服务B,所以会继续发送RPC调用请求给服务B; -
服务A调用服务B时,服务A的RpcContext的扩展属性会被写入到A -> B的RpcInvocation参数中,这就导致ASYNC=true的扩展属性参数被误传到A -> B的RpcInvocation参数中,进而导致在服务A发起RPC请求调用时触发了错误的异步调用逻辑; - 此时
服务A获取到的RPC执行结果RpcResult的内容当然是个空;
else if (isAsync) { //1. 异步,有返回值
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else { //3. 异步->同步(默认的通信方式)
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
以上就是这个坑的产生原因
三、解决方法
自己写了个Filter,添加到Dubbo服务提供方接收请求后、实际处理请求前的Filter执行链中。
从请求参数URL中解析出ASYNC扩展参数标识,而不依赖RpcInvocation中的值。
@Activate(group = {Constants.PROVIDER}, order = -999)
public class DubboSyncFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//避免RpcContext透传,使用配置文件的async
boolean isAsync = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false);
RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, String.valueOf(isAsync));
return invoker.invoke(invocation);
}
}