目录
dubbo 拓展机制 SPI
dubbo 自适应拓展机制
dubbo 服务导出
dubbo 服务引用
dubbo 服务字典
dubbo 服务路由
dubbo 集群
dubbo 负载均衡
dubbo 服务调用过程
大致流程图:
1.服务调用方式
从dubbo源码提供的demo作为入口分析服务调用过程,反编译demo的代理类我不是使用的arthas,我使用的是jdk自带的工具HSDB。最终代理类如下:
public class proxy0 implements DC, EchoService, DemoService {
public static Method[] methods;
private InvocationHandler handler;
public proxy0(InvocationHandler var1) {
this.handler = var1;
}
public proxy0() {
}
public String sayHello(String var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[0], var2);
return (String)var3;
}
public Object $echo(Object var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[1], var2);
return (Object)var3;
}
}
其中InvocationHandler的实现为InvokerInvocationHandler,所以我们接着看它的invoker方法。(PS:服务引用的最后提到过默认通过JavassistProxyFactory生成代理类,从JavassistProxyFactory也可以看出在代理类实例化的时候传入的InvocationHandler就是InvokerInvocationHandler)
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
InvokerInvocationHandler 中的 invoker 成员变量类型为 MockClusterInvoker,MockClusterInvoker 内部封装了服务降级逻辑,这里的Invoker原本是来自于Cluster.join方法返回的invoker,但是MockClusterWrapper作为Cluster的包装类在通过Extension加载具体的Cluster实现类的时候会使用MockClusterWrapper包装一层。下面简单看一下:
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 获取 mock 配置值
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
// 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
// 比如 FailoverClusterInvoker
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
// force:xxx 直接执行 mock 逻辑,不发起远程调用
result = doMockInvoke(invocation, null);
} else {
//fail-mock
// fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
// 调用失败,执行 mock 逻辑
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
其中doMockInvoke是在发生异常或者是mock 的配置为 force的时候调用,我们接下来看看doMockInvoke
private Result doMockInvoke(Invocation invocation, RpcException e) {
Result result = null;
Invoker<T> minvoker;
// 通过directory列举invokers
// ps:这里会通过MockInvokersSelector筛选出mock配置的本地调用
List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
// 如果没有列举的invoker,新建一个MockInvoker
if (CollectionUtils.isEmpty(mockInvokers)) {
minvoker = (Invoker<T>) new MockInvoker(directory.getUrl(), directory.getInterface());
} else {
// 获取第一个invoker
minvoker = mockInvokers.get(0);
}
try {
result = minvoker.invoke(invocation);
} catch (RpcException me) {
if (me.isBiz()) {
result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
} else {
throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
}
} catch (Throwable me) {
throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
}
return result;
}
代码比较简单,可以自行研究下。
回到刚才的invoker调用,继续看正常的远程调用。接着进入FailoverClusterInvoker进行集群调用,集群调用中会使用负载均衡从invokers(在服务引入中提到的RegistryDirectory通过监听Zookeeper节点数据动态变化的invoker列表)中选出一个调用。这个过程已经在前面分析过了,接下来会经过InvokerListener以及各种filter的调用,如果在InvokerListener和filter执行过程中都没有问题,接着调用的是AsyncToSyncInvoker,从名字我们知道是处理异步转同步的,也就是该次调用是异步还是同步的。
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = invoker.invoke(invocation);
try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (t instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
return asyncResult;
}
可以看到当调用方式是同步时,会通过Result.get的方式阻塞等待结果。
接着就是调用DubboInvoker,可以看到它的invoke方法在父类AbstractInvoker中,所以先看AbstractInvoker的invoke方法
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 设置 Invoker
invocation.setInvoker(this);
if (CollectionUtils.isNotEmptyMap(attachment)) {
// 设置 attachment
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
// 添加 contextAttachments 到 RpcInvocation#attachment 变量中
invocation.addAttachments(contextAttachments);
}
// 设置异步信息到 RpcInvocation#attachment 中
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
// 如果异步就设置invoke_id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
}
} catch (RpcException e) {
if (e.isBiz()) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
}
可以看到其中大部分代码用于添加信息到 RpcInvocation#attachment 变量中,添加完毕后,调用 doInvoke 执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,下面到 DubboInvoker 中看一下
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 设置 path 和 version 到 attachment 中
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
// 从 clients 数组中获取 ExchangeClient
if (clients.length == 1) {
currentClient = clients[0];
} else {
// 轮询获取链接
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 获取 dubbo:method 中return的设置,默认为true
// true,则返回future,或回调onreturn等方法,如果设置为false,则请求发送成功后直接返回Null
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获取方法超时配置
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 发送请求
currentClient.send(inv, isSent);
// 直接返回 appResponse 空对象
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
// 异步设置结果到 asyncRpcResult 中,
// 如果方式为同步,将在后面的 AsyncToSyncInvoker 中调用 get 方法阻塞知道获取结果
asyncRpcResult.subscribeTo(responseFuture);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
// 兼容dubbo 2.6.x 版本,设置 future 到上下文中
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
可以看到这里就是选择一个client然后发起网络请求
2.服务消费方发送请求
接下来我们继续分析请求的发送过程,ExchangeClient类以及它的一些子类。先来看下ExchangeClient的类图。
2.1发送请求
上面的代码中currentClient为ReferenceCountExchangeClient,我们现在分析下ReferenceCountExchangeClient这个类。
final class ReferenceCountExchangeClient implements ExchangeClient {
private final URL url;
// 引用计数
private final AtomicInteger referenceCount = new AtomicInteger(0);
// 装饰对象
private ExchangeClient client;
public ReferenceCountExchangeClient(ExchangeClient client) {
this.client = client;
// 增加计数
referenceCount.incrementAndGet();
this.url = client.getUrl();
}
.......
}
ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。
ReferenceCountExchangeClient包装的对象是HeaderExchangeClient,HeaderExchangeClient在HeaderExchanger中使用connect方法生成。
public class HeaderExchangeClient implements ExchangeClient {
private final Client client;
private final ExchangeChannel channel;
// 执行器
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
// 心跳任务
private HeartbeatTimerTask heartBeatTimerTask;
// 连接任务
private ReconnectTimerTask reconnectTimerTask;
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
// 创建 HeaderExchangeChannel 对象
this.channel = new HeaderExchangeChannel(client);
if (startTimer) {
URL url = client.getUrl();
// 开始重新连接任务,当连接不可用或者是还在超时时间内
startReconnectTask(url);
// 开始心跳任务
startHeartBeatTask(url);
}
}
}
HeaderExchangeClient 中很多方法只有一行代码,即调用 HeaderExchangeChannel 对象的同签名方法。那 HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑以及重新建立连接。接下来继续看HeaderExchangeChannel
final class HeaderExchangeChannel implements ExchangeChannel {
// 无返回值
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
// 设置双向通信标志
request.setTwoWay(false);
// 这里的 message 变量类型为 RpcInvocation
request.setData(message);
channel.send(request, sent);
}
}
// 有返回值
@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
// 设置双向通信标志
req.setTwoWay(true);
// 这里的 request 变量类型为 RpcInvocation
req.setData(request);
// 创建 DefaultFuture 对象
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
// 调用 NettyClient 的 send 方法发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
可以看到send方法无返回值,request有一个future的返回。
接下来我们继续看channel.send方法,其中channel默认为NettyClient,NettyClient没有send方法,而是它的父类AbstractPeer实现了send方法。然后会继续调用NettyClient的父类AbstractClient的sent方法。
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (needReconnect && !isConnected()) {
connect();
}
// 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
}
继续看NettyClient的getChannel方法
public class NettyClient extends AbstractClient {
// 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
private volatile Channel channel;
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isConnected())
return null;
// 获取一个 NettyChannel 类型对象
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
}
final class NettyChannel extends AbstractChannel {
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap =
new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
private final org.jboss.netty.channel.Channel channel;
/** 私有构造方法 */
private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
// 尝试从集合中获取 NettyChannel 实例
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
// 如果 ret = null,则创建一个新的 NettyChannel 实例
NettyChannel nc = new NettyChannel(ch, url, handler);
if (ch.isConnected()) {
// 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
ret = channelMap.putIfAbsent(ch, nc);
}
if (ret == null) {
ret = nc;
}
}
return ret;
}
}
获取到NettyChannel的实例后,就开始调用NettyChannel的sent方法:
public void send(Object message, boolean sent) throws RemotingException {
// whether the channel is closed
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 发送消息(包含请求和响应消息)
ChannelFuture future = channel.writeAndFlush(message);
// sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
// 1. true: 等待消息发出,消息发送失败将抛出异常
// 2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
// 默认情况下 sent = false;
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// 等待消息发出,若在规定时间没能发出,success 会被置为 false
success = future.await(timeout);
}
Throwable cause = future.cause();
// 若 success 为 false,这里抛出异常
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
接下来看下sayHello方法的整个调用链
proxy0#sayHello(String)
-> InvokerInvocationHandler#invoke(Object, Method, Object[])
-> MockClusterInvoker#invoke(Invocation) //本地调用和错误时mock本地实现
-> AbstractClusterInvoker#invoke(Invocation) //列举可用服务以及初始化负载均衡对象
-> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) //实现了Failover模式的集群容错
-> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
-> ListenerInvokerWrapper#invoke(Invocation)
-> AsyncToSyncInvoker#invoker // 如果需要异步转同步
-> AbstractInvoker#invoke(Invocation) // 设置attachment和同步或异步
-> DubboInvoker#doInvoke(Invocation) // 设置客户端以及发送请求
-> ReferenceCountExchangeClient#request(Object, int) // 记录该client使用次数
-> HeaderExchangeClient#request(Object, int) // 执行心跳任务和连接超时重连任务
-> HeaderExchangeChannel#request(Object, int) // 设置请求参数
-> AbstractPeer#send(Object) // 检查chaneel是否关闭
-> AbstractClient#send(Object, boolean) // 配置netty,建立连接,获取chaneel重新连接
-> NettyChannel#send(Object, boolean) // 发送请求和捕获异常,向上抛出
-> NioClientSocketChannel#write(Object)
对于服务请求的编码以及接收请求的解码过程暂不分析
2.2 服务提供方接收请求
2.2.1 线程派发模型
默认使用的是all,接下来看看AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
/** 处理连接事件 */
@Override
public void connected(Channel channel) throws RemotingException {
// 获取线程池
ExecutorService executor = getExecutorService();
try {
// 将连接事件派发到线程池中处理
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
/** 处理断开事件 */
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
/** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
// 将请求和响应消息派发到线程池中处理
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
// 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted
// 错误信息封装到 Response 中,并返回给服务消费方。
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
// 返回包含错误信息的 Response 对象
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
/** 处理异常信息 */
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
继续看下构造函数中调用的父构造方法
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 默认为FixedThreadPool的实现
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
// 配置到satastore中,方便client或者是server关闭时同时关闭线程池
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
这里默认使用了一个最大线程数为200的线程池,具体线程池用于处理什么样的事件取决于使用怎样的线程派发模型。
2.2.2 调用服务
针对netty3和netty4接收channel数据的类和方法有所不同,以下以netty4为例,展示调用链,从NettyServerHandler进入是因为,在服务导出过程中向netty的bootstrap中注册了NettyServerHandler,根据不同的策略以下调用链中的具体的类可能有所不同,这里所列举的都是默认的处理类
NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
-> AbstractPeer#received
-> MultiMessageHandler#received // 多消息处理
-> HeartbeatHandler#received // 处理心跳请求,如果不是继续后续调用
-> AllChannelHandler#received // 将此次处理放入线程池执行
-> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑
-> DecodeHandler#received // 解码
-> HeaderExchangeHandler#received // 处理request或者response或者telnet
-> Dubboprotocol#reply // 获取服务导出过程中保存的对应的invoker
-> filter#invoker#invoke
-> invokerWrapper#invoke
-> AbstractProxyInvoker#invoke // 调用具体的方法,封装调用结果
-> 服务导出过程中生成的对应的代理类
这里线程池的execute执行的也就是ChannelEventRunnable,我们从ChannelEventRunnable开始分析
public class ChannelEventRunnable implements Runnable {
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
@Override
public void run() {
// 检测通道状态,对于请求或响应消息,此时 state = RECEIVED
if (state == ChannelState.RECEIVED) {
try {
// 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
handler.received(channel, message);
} catch (Exception e) {
logger.warn("... operation error, channel is ... message is ...");
}
}
// 其他消息类型通过 switch 进行处理
else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("... operation error, channel is ...");
}
break;
case DISCONNECTED:
// ...
case SENT:
// ...
case CAUGHT:
// ...
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
}
ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。
public class DecodeHandler extends AbstractChannelHandlerDelegate {
private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
// 对 Decodeable 接口实现类对象进行解码
decode(message);
}
if (message instanceof Request) {
// 对 Request 的 data 字段进行解码
decode(((Request) message).getData());
}
if (message instanceof Response) {
// 对 Request 的 result 字段进行解码
decode(((Response) message).getResult());
}
// 执行后续逻辑
handler.received(channel, message);
}
private void decode(Object message) {
// Decodeable 接口目前有两个实现类,
// 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult
if (message instanceof Decodeable) {
try {
// 执行解码逻辑
((Decodeable) message).decode();
if (log.isDebugEnabled()) {
log.debug("Decode decodeable message " + message.getClass().getName());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
}
DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler。
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
// 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
// 将调用结果返回给服务消费端
channel.send(res);
return;
}
// find handler by message class.
// 获取 data 字段值,也就是 RpcInvocation 对象
Object msg = req.getData();
try {
// 继续向下调用
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
// 设置 OK 状态码
res.setStatus(Response.OK);
// 设置调用结果
res.setResult(appResult);
} else {
// 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 将调用结果返回给服务消费端
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 处理请求对象
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
// 处理事件
handlerEvent(channel, request);
} else {
// 双向通信
if (request.isTwoWay()) {
// 向后调用服务,并得到调用结果
handleRequest(exchangeChannel, request);
}
// 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
else {
handler.received(exchangeChannel, request.getData());
}
}
}
// 处理响应对象,服务消费方会执行此处逻辑,后面分析
else if (message instanceof Response) {
handleResponse(channel, (Response) message);
}
// telnet 相关,忽略
else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
}
对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。
其中handler.reply调用的是dubboProtocol的匿名类对象逻辑
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
// 获取 Invoker 实例
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
// 参数回调 相关
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 通过 Invoker 调用具体的服务
Result result = invoker.invoke(inv);
// 返回入参
return result.completionFuture().thenApply(Function.identity());
}
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(PATH_KEY);
// if it's callback service on client side
// 本地存根
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
// 是否是客户端并且不是 本地存根
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
// 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如:
// dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
// 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
// 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
// 获取 Invoker 对象,并返回
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}
return exporter.getInvoker();
}
}
invoke方法逻辑在AbstractProxyInvoker中
public Result invoke(Invocation invocation) throws RpcException {
try {
// 调用 doInvoke 执行后续的调用,并将调用结果封装到 asyncRpcResult
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
future.whenComplete((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
asyncRpcResult.complete(result);
});
return asyncRpcResult;
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,创建逻辑如下
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 为目标类创建 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建匿名 Invoker 类对象,并实现 doInvoke 方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。
public class Wrapper1
extends Wrapper
implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
public String[] getPropertyNames() {
return pns;
}
public boolean hasProperty(String string) {
return pts.containsKey(string);
}
public Class getPropertyType(String string) {
return (Class)pts.get(string);
}
public String[] getMethodNames() {
return mns;
}
public String[] getDeclaredMethodNames() {
return dmns;
}
public void setPropertyValue(Object object, String string, Object object2) {
try {
DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
}
public Object getPropertyValue(Object object, String string) {
try {
DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
}
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoServiceImpl demoServiceImpl;
try {
demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoServiceImpl.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
}
}
2.2.3 服务提供方返回结果
执行完HeaderExchangeHandler#handleRequest中的handler.reply后,会调用channel.send将调用结果返回给服务消费者
HeaderExchangeHandler#handleRequest
-> HeaderExchangeChannel#send
-> NettyChannel#send
-> NioSocketChannel#writeAndFlush
2.2.4 服务消费方接收服务提供方返回
在服务引入过程中,client会在netty bootstrap中添加类型为NettyClientHandler的handler,当消费方收到服务提供方的返回后,就触发NettyClientHandler中的channelRead方法。
NettyClientHandler#channelRead
-> AbstractPeer#received
-> MultiMessageHandler#received
-> HeartbeatHandler#received
-> AllChannelHandler#received
-> ExecutorService#execute(Runnable)
-> DecodeHandler#received
-> HeaderExchangeHandler#received
感觉和服务提供者收到请求差不多。但是,从这里开始执行的是HeaderExchangeHandler#handleResponse。
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
public static void received(Channel channel, Response response) {
received(channel, response, false);
}
public static void received(Channel channel, Response response, boolean timeout) {
try {
// 在 HeaderExchangeChannel#request,中创建实例时添加到 FUTURES 中
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
// 没有超时则取消超时任务
if (!timeout) {
// decrease Time
t.cancel();
}
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
接下来会调用FutureAdapter
public FutureAdapter(CompletableFuture<AppResponse> future) {
this.appResponseFuture = future;
future.whenComplete((appResponse, t) -> {
if (t != null) {
if (t instanceof CompletionException) {
t = t.getCause();
}
this.completeExceptionally(t);
} else {
if (appResponse.hasException()) {
this.completeExceptionally(appResponse.getException());
} else {
#1
this.complete((V) appResponse.getValue());
}
}
});
}
从#1代码可以看出来,已经取出来了服务提供方返回的结果。
这里的this.complete就是层层调用complete,将结果不断的向上传递。
系列文章大致的介绍了一下dubbo的一些源码。但是依然有许多的内容没有介绍到,如:编码解码,返回结果的各种适配器,dubbo与spring的相关结合等等。文章中如有错误欢迎指出,一起讨论。