客户端服务接口所持有的对象为RefererInvocationHandler,jdk的代理实现,invoke调用,先封装DefaultRequest,遍历clusters,如果存在降级的则跳过,默认选第一个非降级的cluster
- clusters处理
for (Cluster<T> cluster : clusters) {
String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();
Switcher switcher = switcherService.getSwitcher(protocolSwitcher);
// 跳过降级的cluster
if (switcher != null && !switcher.isOn()) {
continue;
}
// request设置各种数据
// 最后通过cluster.call调用
response = cluster.call(request);
}
- 高可用策略处理
// cluster.call,通过配置的HA策略 来实现调用(默认配置:FailoverHaStrategy)
public Response call(Request request) {
if (available.get()) {
try {
// loadBalance将在haStrategy中处理
return haStrategy.call(request, loadBalance);
} catch (Exception e) {
return callFalse(request, e);
}
}
return callFalse(request, new MotanServiceException(MotanErrorMsgConstant.SERVICE_UNFOUND));
}
- 负载均衡处理
// 选择Refer,通过loadBalance.selectToHolder(request, referers);选择
List<Referer<T>> referers = selectReferers(request, loadBalance);
URL refUrl = referers.get(0).getUrl();
//
// 从第一个url中获取重试次数
int tryCount = refUrl.getMethodParameter(request.getMethodName(),
request.getParamtersDesc(), URLParamType.retries.getName(),
URLParamType.retries.getIntValue());
// 如果有问题,则设置为不重试
if (tryCount < 0) {
tryCount = 0;
}
for (int i = 0; i <= tryCount; i++) {
// 选择一个服务
Referer<T> refer = referers.get(i % referers.size());
try {
// 设置重试次数
request.setRetries(i);
// 开始远程调用
return refer.call(request);
} catch (RuntimeException e) {
// 对于业务异常,直接抛出
if (ExceptionUtil.isBizException(e)) {
throw e;
} else if (i >= tryCount) {
throw e;
}
LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
}
// selectToHolder的处理(选择refers)
public void selectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = this.referers;
if (referers == null) {
throw new MotanServiceException(this.getClass().getSimpleName() + " No available referers for call : referers_size= 0 "
+ MotanFrameworkUtil.toString(request));
}
if (referers.size() > 1) {
// 多于一个,继续选(这里就根据配置的负载均衡策略来处理,
// 默认的是RoundRobinLoadBalance),但也会返回多个refer
doSelectToHolder(request, refersHolder);
} else if (referers.size() == 1 && referers.get(0).isAvailable()) {
// 只有一个,只记录当前的
refersHolder.add(referers.get(0));
}
if (refersHolder.isEmpty()) {
throw new MotanServiceException(this.getClass().getSimpleName() + " No available referers for call : referers_size="
+ referers.size() + " " + MotanFrameworkUtil.toString(request));
}
}
- refer.call最后在DefaultRpcProtocol的DefaultRpcReferer中通过NettyClient发出请求
protected Response doCall(Request request) {
try {
// 为了能够实现跨group请求,需要使用server端的group。
request.setAttachment(URLParamType.group.getName(), serviceUrl.getGroup());
return client.request(request);
} catch (TransportException exception) {
throw new MotanServiceException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
}
}
- 最终通过netty将消息发出
ChannelFuture writeFuture = this.channel.write(request);
// 注册一个回调,用来处理调用情况
response.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
// 成功的调用
nettyClient.resetErrorCount();
} else {
// 失败的调用
nettyClient.incrErrorCount();
}
}
});
服务器端通过Netty接收客户端的请求,入口为NettyChannelHandler的messageReceived
- netty 处理
if (message instanceof Request) {
processRequest(ctx, e); // 处理客户端发来的请求
} else if (message instanceof Response) {
processResponse(ctx, e);
}
//
// 使用线程池处理调用请求
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try{
RpcContext.init(request);
processRequest(ctx, request, processStartTime);
}finally{
RpcContext.destroy();
}
}
});
- ProviderProtectedMessageRouter处理
// key:motan-demo-rpc/com.weibo.motan.demo.service.MotanDemoService/1.0
String serviceKey = MotanFrameworkUtil.getServiceKey(request);
// 根据key获取Provider(DefaultProvider)
Provider<?> provider = providers.get(serviceKey);
// 通过反射调用
Method method = lookup(request);
Object value = method.invoke(proxyImpl, request.getArguments());
- 然后进入service层代码处理,后续就是结果返回处理了。
大概的示意图: