Dubbo的分支: 3.0
服务消费者发出请求与提供者返回响应的过程,包括了代理与传输部分,主要的内容都在第2篇里面都说过了,这篇就算补充下吧。
举个例子:消费者使用dubbo协议调用HelloService.sayHello(String name)为例,分为:
- Dubbo为消费者端生成、注入代理对象;
- 消费者发起网络调用请求;
- 提供者收到请求,执行调用,返回响应结果;
- 消费者收到响应结果,处理后,返回给上层调用方。
Dubbo为消费者端生成、注入代理对象
我们以SpringMVC模式,用源码来看下代理的实际内容是什么。
-
提供侧-定义服务接口api:HelloService
public interface HelloService { String sayHello(String name); }
-
消费侧-业务中定义的Controller:HelloController
@Controller public class HelloController { @Autowired private HelloService helloService; @GetMapping("/sayHello") public String sayHello(){ return helloService.sayHello("张三"); } }
-
消费侧-Spring容器向HelloController注入的HelloService实现对象,是由Dubbo框架动态生成的,源码为:
public class proxy1 implements ClassGenerator.DC, Destroyable, EchoService, HelloService { public static Method[] methods; private InvocationHandler handler; public proxy1() { } public proxy1(InvocationHandler invocationHandler) { this.handler = invocationHandler; } public String sayHello(String string) { Object[] objectArray = new Object[] { string }; Object object = this.handler.invoke(this, methods[0], objectArray); return (String) object; } public Object $echo(Object object) { Object[] objectArray = new Object[] { object }; Object object2 = this.handler.invoke(this, methods[1], objectArray); return object2; } public void $destroy() { Object[] objectArray = new Object[] {}; Object object = this.handler.invoke(this, methods[2], objectArray); } }
消费侧-到这可以看出,controller实际调用的是代理对象proxy1.sayHello(),sayHello内部实际调用的是InvocationHandler.invoker(...)。每次在创建代理对象时,都会使用有参构造器,传入一个InvocationHandler对象。
-
消费侧-InvocationHandler的源码
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; private URL url; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; this.url = invoker.getUrl(); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return invoker.toString(); } else if ("$destroy".equals(methodName)) { invoker.destroy(); return null; } else if ("hashCode".equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args); RpcContext.setRpcContext(url); return invoker.invoke(rpcInvocation).recreate(); } }
-
消费侧-构造InvokerInvocationHandler时,会传入一个Invoker,这个Invoker实际为ClusterInvoker类型,代表HelloService服务的所有提供者invoker集群。这个ClusterInvoker有多个实现类,比如FailoverClusterInvoker、FailfastClusterInvoker、BroadcastClusterInvoker等等,假如此处是FailfastClusterInvoker,那么最后实际调用的是FailfastClusterInvoker.invoke(),也就是父类AbstractClusterInvoker.invoke()。
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> { @Override public Result invoke(final Invocation invocation) throws RpcException { //省略... //使用路由器Router从ClusterInvoker.directory存储的所有的invoker中,路由出符合路由规则的一批invoker。 List<Invoker<T>> invokers = list(invocation); //上面选出了方向正确的一批invoker,现在需要负载均衡选一个执行了。此处初始化负载均衡器LoadBalance。 LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //使用负载均衡器选一个invoker,发起调用,返回响应。 return doInvoke(invocation, invokers, loadbalance); } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { //获取Directory,调用list方法 return getDirectory().list(invocation); } }
-
消费侧-假如FailfastClusterInvoker中的directory为StaticDirectory类型,即实际调用的是StaticDirectory.list
public class StaticDirectory<T> extends AbstractDirectory<T> { private final List<Invoker<T>> invokers; public StaticDirectory(URL url, List<Invoker<T>> invokers, RouterChain<T> routerChain) { //省略... this.invokers = invokers; } //AbstractDirectory.list最终会调用子类的doList @Override protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { List<Invoker<T>> finalInvokers = invokers; if (routerChain != null) { try { //路由器链依次执行路由,最终过滤出符合所有路由规则的Invoker集合 finalInvokers = routerChain.route(getConsumerUrl(), invocation); } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } return finalInvokers == null ? Collections.emptyList() : finalInvokers; } }
消费侧-过滤出一批符合路由规则的invoker后,会构建负载均衡器LoadBalance,然后调用FailfastClusterInvoker.doInvoke。
```java
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//负载均衡器选出一个invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
//设置调用上下文,发起调用
return invokeWithContext(invoker, invocation);
} catch (Throwable e) {
//省略...
}
}
}
```
-
消费侧-invokeWithContext(invoker, invocation);会调用AbstractInvoker.invoke方法
public abstract class AbstractInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation inv) throws RpcException { //如果是异步执行会生成并附着调用id RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //所有调用都是封装为异步结果 AsyncRpcResult asyncResult; try { asyncResult = (AsyncRpcResult) doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception //省略 } catch (RpcException e) { //省略 } catch (Throwable e) { //省略 } //如果是同步调用,阻塞等待响应结果返回 waitForResultIfSync(asyncResult, invocation); return asyncResult; } }
-
消费侧-doInvoke是抽象方法,由具体的子类负责实现,因为HelloService是使用dubbo协议暴露,所以这里对应的子类Invoker为DubboInvoker,也就是调用DubboInvoker.doInvoke
public class DubboInvoker<T> extends AbstractInvoker<T> { //客户端,类似于连接池 private final ExchangeClient[] clients; @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { //多个会轮询 currentClient = clients[index.getAndIncrement() % clients.length]; } try { //单程,也就是无需服务提供方返回响应结果,直观来说就是方法无返回值。 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); //获取超时时间 int timeout = calculateTimeout(invocation, methodName); invocation.put(TIMEOUT_KEY, timeout); if (isOneway) { //单程 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); //把请求发过去就可以了 currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { //需要服务提供方返回响应结果,直观来说就是方法有返回值。 //获取线程执行器 ExecutorService executor = getCallbackExecutor(getUrl(), inv); //创建CompletableFuture,发送请求,强转响应结果的类型 CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { } catch (RemotingException e) { } } }
消费侧-从这里可以看到,真正的网络传输发生在currentClient.send()和currentClient.request()方法。
消费者发起网络调用请求
-
消费侧-currentClient.send()和currentClient.request()方法的底层实现是使用Netty4客户端来发送请求。实际上提供测返回的响应结果也是调用这个方法发送给消费侧的。
final class NettyChannel extends AbstractChannel { @Override public void send(Object message, boolean sent) throws RemotingException { boolean success = true; int timeout = 0; try { //netty channel发送请求数据到提供侧 ChannelFuture future = channel.writeAndFlush(message); if (sent) { timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null) { throw cause; } } catch (Throwable e) { } if (!success) { throw new RemotingException("xxx"); } } }
-
消费侧-currentClient.request()方法在发送请求的时候,会调用DefaultFuture.newFuture创建DefaultFuture,DefaultFuture会将request.id和对应的DefaultFuture存储在DefaultFuture.FUTURES中。在拿到响应后,根据response.id来匹配对应的DefaultFuture,set响应结果。
public class DefaultFuture extends CompletableFuture<Object> { private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>(); private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>(); private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); CHANNELS.put(id, channel); } }
提供者收到请求,执行调用,返回响应结果
-
提供侧-Dubbo默认使用Netty通讯,按照Netty的使用方法,想要知道收到请求后做了什么,要看下Netty服务端的初始化过程。
public class NettyServer extends AbstractServer implements RemotingServer { //缓存的处于alive状态下的worker channel。<ip:port, dubbo channel> private Map<String, Channel> channels; private ServerBootstrap bootstrap; private io.netty.channel.Channel channel; @Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss"); workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker"); //Netty服务端使用的IO处理器 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); //省略ssl代码 ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); } //省略... }
-
提供侧-Netty服务端使用的IO处理器为NettyServerHandler。具体的业务逻辑是由构造器传入的handler来处理的。handler是在暴露服务时,根据要暴露的协议,传入的对应handler。dubbo协议对应 DubboProtocol.requestHandler
@io.netty.channel.ChannelHandler.Sharable public class NettyServerHandler extends ChannelDuplexHandler { //<ip:port, dubbo channel> private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); private final URL url; private final ChannelHandler handler; /** * Netty server handler * @param url url * @param handler 由协议决定实际的handler对象,如果是dubbo协议,handler为DubboProtocol.requestHandler。 * 接口实现里都是由实际的handler来处理业务逻辑。 */ public NettyServerHandler(URL url, ChannelHandler handler) { this.url = url; this.handler = handler; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { super.write(ctx, msg, promise); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.sent(channel, msg); } }
-
提供侧-DubboProtocol.requestHandler源码
public class DubboProtocol extends AbstractProtocol { //dubbo协议的IO请求处理器 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { //message为DecodeableRpcInvocation类型 Invocation inv = (Invocation) message; //根据暴露的Protocol,拿到Exporter,再拿到DubboInvoker Invoker<?> invoker = getInvoker(channel, inv); //invoker实际为invoker filter chain责任链, //最核心的那个invoker是AbstractProxyInvoker的匿名实现类对象, //它的实现会调用serviceImpl的Wrapper类对象,最终调用serviceImpl对象返回结果。 Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } //NettyServerHandler.received实际调用该方法 @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } }; Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); return exporter.getInvoker(); } }
-
提供侧-HelloServiceImpl的Wrapper类
public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { //Proxy.getProxy(interfaces)生成的动态代理类源码详见Proxy类头注释 //动态代理类调用有参构造器创建代理类对象 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { //为proxy(HelloServiceImpl)生成通用的包装类对象,具体的Wrapper类源码参见Wrapper.makeWrapper的注释 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //这个Invoker的生成过程很像jdk动态代理的形式 //每个协议类型都会生成一个对应的invoker,如url开头为registry://xxx,或者dubbo://xxx等 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { //调用serviceImpl包装类的invokeMethod方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
-
提供侧-Wrapper源码,其中传入的object为Spring生成的HelloServiceImpl对象。
public class Wrapper1 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; public String[] getPropertyNames() { return pns; } public boolean hasProperty(String string) { return pts.containsKey(string); } public Class getPropertyType(String string) { return (Class) pts.get(string); } public String[] getMethodNames() { return mns; } public String[] getDeclaredMethodNames() { return dmns; } public void setPropertyValue(Object object, String string, Object object2) { try { HelloServiceImpl helloServiceImpl = (HelloServiceImpl) object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"") .append(string) .append("\" field or setter method in class org.apache.dubbo.demo.provider.HelloServiceImpl.") .toString()); } public Object getPropertyValue(Object object, String string) { try { HelloServiceImpl helloServiceImpl = (HelloServiceImpl) object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"") .append(string) .append("\" field or getter method in class org.apache.dubbo.demo.provider.HelloServiceImpl.") .toString()); } public Object invokeMethod(Object object, String string, Class[] classArray, Object[] objectArray) throws InvocationTargetException { HelloServiceImpl helloServiceImpl; try { helloServiceImpl = (HelloServiceImpl) object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { if ("sayHello".equals(string) && classArray.length == 0) { return helloServiceImpl.sayHello(); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"") .append(string) .append("\" in class org.apache.dubbo.demo.provider.HelloServiceImpl.") .toString()); } }
到此供应侧就得到了响应结果,调用前面说的NettyClient.send()方法将响应发回给消费侧。
消费者收到响应结果,处理后,返回给上层调用方
-
消费侧-收到响应后,会根据response.id去DefaultFuture.FUTURES中获取对应的DefaultFuture。然后执行CompletableFuture.complete方法,set响应结果。
public class DefaultFuture extends CompletableFuture<Object> { private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>(); public static void sent(Channel channel, Request request) { DefaultFuture future = FUTURES.get(request.getId()); if (future != null) { future.doSent(); } } public static void received(Channel channel, Response response) { received(channel, response, false); } public static void received(Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { Timeout t = future.timeoutCheckTask; if (!timeout) { // decrease Time t.cancel(); } future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response status is " + response.getStatus() + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result."); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // the result is returning, but the caller thread may still waiting // to avoid endless waiting for whatever reason, notify caller thread to return. if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception.")); } } } }
到此大概的调用和响应过程就说完了。