有了前两章服务发布和服务引用的模型基础,调用将会把两部分串联起来。先来回顾下引用的模型(因为入口在这里)
然后我们先来看章大图,有个总体调用印象,下面将会一一讲解
我们已经知道,消费端引用的实际对象是生成的对接口的代理对象,所以最后进入InvokerInrocationHandler中
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
.....
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
引用模型发现此时invoker对象为MockClusterInvoker,显然这是一层mock配置的包装,看一下代码
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
的确是,在配置为force强制走mock或非业务异常时都会调用mock逻辑,此处不影响主流程,我们跳过,有机会单聊聊容错机制。主流程调用下一链invoker--->FailoverClusterInvoker的invoke方法,此方法在其爸爸抽象类AbstractClusterInvoker中
public Result invoke(final Invocation invocation) throws RpcException {
//检查invoke状态
checkWhetherDestroyed();
//负载算法载体
LoadBalance loadbalance;
//从目录类中找出此请求对应的所有invoker
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
//根据扩展点机制获取负载策略
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
//如果是异步操作添加相关标志
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//模版方式调用子类方法
return doInvoke(invocation, invokers, loadbalance);
}
通过这段代码,我们可以看到重点是生成了一个LoadBalance实体,然后调用各clusterInvoker子类实现的doInvoke方法,下面先分析下调用核心LoadBalance,先看下扩展获取到的适配类
ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))
显然默认配置扩展点为DEFAULT_LOADBALANCE=random,看下配置文件
//随机,按权重设置随机概率
random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
//轮循,按公约后的权重设置轮循比率
roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
//最少活跃数
leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance
//一致型哈希算法
consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
dubbo一共实现了四种常见的分布负载算法,我们这里简单分析一下常用RandomLoadBalance实现原理,其他三种算法有机会单独细细分享
- RandomLoadBalance:随机,按权重设置随机概率
int length = invokers.size(); // 总个数
int totalWeight = 0; // 总权重
boolean sameWeight = true; // 权重是否都一样
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // 累计总权重
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; // 计算所有权重是否一样
}
}
if (totalWeight > 0 && !sameWeight) {
// 如果权重不相同且权重大于0则按总权重数随机
int offset = random.nextInt(totalWeight);
// 并确定随机值落在哪个片断上
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果权重相同或权重为0则均等随机
return invokers.get(random.nextInt(length));
此时的invokers中的服务的顺序有6种情况,分别为ABC,BAC,ACB,CAB,BCA,CBA 此时的 totalWeight=1+2+3=6,此时offset=random.nextInt(totalWeight)=random.nextInt(6)=【0,1,2,3,4,5】
可以通过一个列表描述碰撞到A,B,C的情况:
随机数 | offset | offset | offset | offset | offset | offset |
---|---|---|---|---|---|---|
invoker排序 | 0 | 1 | 2 | 3 | 4 | 5 |
ABC | A | B | B | C | C | C |
BAC | B | B | A | C | C | C |
ACB | A | C | C | C | B | B |
CAB | C | C | C | A | B | B |
BCA | B | B | C | C | C | A |
CBA | C | C | C | B | B | A |
当ABC以不同顺序排列时,offset按0到5到顺序时概率保持1/6,1/3和1/2,在看下代码实现中offset到值
如果offset=0的时候,如果invokers=ABC 则调用的是A,因为0-1<0 ;
offset=1的时候,会经过几个循环
1、offset=1-1=0,此时offset=0,继续下一步
2、offset=0-1=-1>0,此时调用B
offset=3的时候,会经过几个循环
1、offset=3-1=2,此时offset=2,继续下一步
2、offset=2-2=0,继续下一步
3、offset=0-3=-3, 此时调用C
以此类推,每次减去ABC到权重值,offset重新赋值为与其趋紧值
RoundRobinLoadBalance 权重轮训调度算法:
1.根据服务器的不同处理能力,给每个服务器分配不同的权值,使其能够接受相应权值数的服务请求。LeastActiveLoadBalance最少活跃数
1、最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差;
2、使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。ConsistentHashLoadBalance 一致性hash算法:
1.一致性 Hash,相同参数的请求总是发到同一提供者。
2.当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
回归主题,根据模型图继续调用进入FailoverClusterInvoker的doInvoke方法,
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
.....
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
//重新检查一下
checkInvokers(copyinvokers, invocation);
}
//loadbance算法选择,invoked用来提出已经选择过的invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
.....
进行向下调用
Result result = invoker.invoke(invocation);
.....
}
}
发现FailoverClusterInvoker有个for循环进行轮询,当异常时会重试其他服务器。
根据模型继续向下,此时的invoker为RegistryDirectory$InvokerDelegete,外边还有层层的filters,此处跳过,InvokerDelegete中包装的就是我们的DubboInvoker,进入其doInvoker方法
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
//获取调用方法名
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
//获取通训客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//异步调用,通过回调通知方式
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
//只发送消息,不管返回结果直接结束方式
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
//普通调用
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
这时获取到了通讯的客户端,并且dubbo除了同步返回结果,还支持两种调用方式
- oneway 当方法或接口有此注解,则只发送消息,用在void方法上
- isAsync异步处理结果的方式
我主要分析正常同步调用
return (Result) currentClient.request(inv, timeout).get();
currentClient是我们上一章获取到到客户端,从引用模型图中可以看出这client是ReferenceCountExchangeChient,这只是只是个包装类,进入内部HeaderExchangeClient,然后是进入成员属性HeaderExchangeChannel中
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
//设置通道属性为双向
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
此处包装了两个重要的模型Request和DefaultFuture用做请求数据和返回数据的载体,其中DefaultFuture下面返回时还会介绍。
先看channel.send方法,这时channel就是我们的NettyClient了,根据上一章的内容,我们知道最终会从NettyChannel缓存中得到一个NettyChannel,然后调用NettyChannel的send方法
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.write(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
这就没什么说的了,标准的netty通讯发送消息的写法。
发送完消息,回到我们的HeaderExchangeChannelrequest方法中
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
此方法返回的是刚才的生成的DefaultFuture类。层层返回,就回到了我们最初的DubboInvoker类的doInvoke方法中
return (Result) currentClient.request(inv, timeout).get();
因此currentClient.request(inv, timeout)返回的是一个DefaultFuture实体,然后调用其get()方法,获取返回结果。
这里先让大家想象一下,Netty是异步通信框架,发送和接受是两个线程,那dubbo是怎么接受到返回结果的呢?
哈哈,不知道大家想到没,常用的做法就是通过并发编程线程通信的方式,来做两个线程间的双向通信绑定
public Object get() throws RemotingException {
return get(timeout);
}
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
//判断是否返回了结果
while (!isDone()) {
//阻塞当前线程,等待返回结果,唤醒线程
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
简单的并发编程操作,通过get方法,请求线程进行了阻塞。然后我们在来想象一下线程通信控制时,必须两个线程持有同一个对象,那接收线程怎么找到这个引用这个对象的呢?
回顾下我们的应用模型图,NettyClient中作为信息接受体的是NettyClientHandler类,根据netty操作,我们知道接受信息进入channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
然后我们看到了handler.received(channel, msg)方法,我们根据模型图层层递进,进入到AllChannelHandler方法中
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
//fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
看到这里起了个异步线程cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))进行处理返回值
public void run() {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case RECEIVED:
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
看到RECEIVED分支下调用了传进来到 handler.received(channel, message)方法,那么你猜下这个handler是谁呢?对了正是DecodeHandler,根据上一章只是我们知道,引用服务时,有一段 new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);来生成包装后的client,此时new DecodeHandler(new HeaderExchangeHandler(handler))这个就被赋值到了AllChannelHandler中,根据模型图我们也可以看到这点。DecodeHandler是用来网络通信编码的,直接进入HeaderExchangeHandler方法
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//处理返回结果
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
然后顺藤摸瓜,进入 handleResponse(channel, (Response) message)方法
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
看,我们又回到了DefaultFuture类中!
public static void received(Channel channel, Response response) {
try {
//根据id找到原有请求线程
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
在DefaultFuture的方法中我们找到 DefaultFuture future = FUTURES.remove(response.getId())这句话,原来response中有个id用来连接两个线程啊!我们回头找下
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
原来在DefaultFuture构造方法里就已经缓存了这个id和示例的对应关系,后边就不用说了,调用DefaultFuture的doReceived肯定解锁,获取返回结果了
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
//解锁,原有get方法激活
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
此时 get()接到通知就可以舒舒服服去拿到返回结果,给调用方了。
到此,调用过程我们已经分析完上半部分了------客户端发送和接收返回结果。
下章我们介绍服务端接收请求,处理请求,返回结果。
下一篇 dubbo调用源码之服务端
首页 dubbo源码欣赏简介