我们知道在dubbo底层使用了netty进行通信的。通过前期学习netty正好可以看一下优秀的框架是如何使用netty这个NIO框架的。在上一篇文章中我们已经介绍了客户端是如何创建代理类。创建代理类后客户端使用orderService调用方法实际是执行invoke方法。可以找到代理类的invoke方法看一下调用流程。
//com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
上面的InvokerInvocationHandler
就是代理类进入所有客户端调用代理类执行方法时都会执行这个类的invoke方法
//com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
在invoker.invoke()
方法之中有一个非常深的调用链我把主要的就调用链抽出来如下:
org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke
org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
org.apache.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient#request(java.lang.Object, int)
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
org.apache.dubbo.remoting.transport.AbstractPeer#send
org.apache.dubbo.remoting.transport.netty4.NettyChannel#send
在DubboInvoker里面用get方法阻塞调用
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#get(int)
有了上面的调用链大家可以很方便的跟踪调用流程。下面我们再来分析这个里面比较重要的调用步骤
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation); //方法名:createOrder
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient; //此处的currentClient是ReferenceCountExchangeClient ReferenceCountExchangeClient里面还有一个ExchangeClient是HeaderExchangeClient
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); //返回:false
boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); //返回false
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
//异步无返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
}
//异步有返回值
else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
// register resultCallback, sometimes we need the asyn result being processed by the filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
}
//同步调用
else {
//走的是这个代码块 currentClient对象引用的是ReferenceCountExchangeClient
RpcContext.getContext().setFuture(null);
//ReferenceCountExchangeClient继续调用HeaderExchangeClient
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
我们在项目中一般采用同步调用,需要阻塞等待服务器调用完毕发送响应给客户端。所以一般都是走的同步调用注意这句:(Result) currentClient.request(inv, timeout).get();
分两部分执行
-
currentClient.request(inv, timeout)
是客户端发送请求 -
ResponseFuture.get()
是客户端阻塞等待服务器响应,这个地方就是设置响应超时间生效的地方
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
//这个是返回值,返回值里面已经持有NettyChannel
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
//此处先调用AbstractPeer.send 然后调用NettyChannel的send方法。
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
上面方法创建了请求的request对象,还new了响应对象DefaultFuture。然后调用send方法。最终这个send是调用NettyChannel的send方法
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
在响应对象DefaultFuture持有了NettyChannel还有一个重要的还有FUTURES的Map对象。这个Map在后面接受服务器响应时是需要使用的。
//org.apache.dubbo.remoting.transport.netty4.NettyChannel#send
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
ChannelFuture future = channel.writeAndFlush(message);
将message数据发送到服务器这里就已经是netty实现的。channel对象是在前面调用链中get到的。调用的方法也是这个类里面的getOrAddChannel
具体源码如下:
//org.apache.dubbo.remoting.transport.netty4.NettyChannel#getOrAddChannel
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
if (ch.isActive()) {
ret = channelMap.putIfAbsent(ch, nettyChannel);
}
if (ret == null) {
ret = nettyChannel;
}
}
return ret;
}
上面的代码也很好理解就是创建了一个NettyChannel对象。需要注意的是ChannelHandle这个类。因为服务器调用完毕会会发送响应到客户端这个底层同样是考netty来实现的,用过netty客户端的朋友应该之后到如何来处理服务器发送过来的响应。下一篇我们再来聊一下客户端接收响应的过程。
总结一下
- 客户端通过
JavassistProxyFactory#getProxy
创建代理类并把这个代理对象(Proxy0)交给程序员使用在程序员看来这个Proxy0就是OrderService。- 程序员拿到代理对象(Proxy0)调用被代理对象(OrderService)的方法然后就进入了代理过程
- 通过调用链将用户发送的参数进过encode编码交给netty发送给服务端
- 客户端通过阻塞等待服务端给出响应