本次是ribbon结合Nacos的源码解析,Nacos的版本为2.2.8,ribbon和getway的源码还是相对简单的
上一篇文章总结了LoadBalanced注解,但是没有贴出来被LoadBalanced标注的RestTemplate如何使用
String url = "http://user-system/order/findOrderByUserId/"+id;
R result = restTemplate.getForObject(url,R.class);
其中user-system就是微服务的名称,如果restTemplate没有被LoadBalanced标注,那么访问是会报错误的,
关键点就是restTemplate添加过滤器,本片文章分析添加了过滤器的restTemplate是如何把服务名称解析为ip地址,包括Nacos自带的路由规则.
R result = restTemplate.getForObject(url,R.class);
调用链
--org.springframework.web.client.RestTemplate#getForObject
--org.springframework.web.client.RestTemplate#execute
--org.springframework.web.client.RestTemplate#doExecute
doExecute的部分方法
try {
//创建ClientHttpRequest
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
1.createRequest(url, method)方法返回值
2.response = request.execute();方法
调用链
--org.springframework.http.client.AbstractBufferingClientHttpRequest#executeInternal(HttpHeaders)
--org.springframework.http.client.InterceptingClientHttpRequest#executeInternal
--org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#execute
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
//this.iterator = interceptors.iterator() 拦截器的迭代器,if (this.iterator.hasNext())这个代码证明该execute方法
//肯定会被递归的调用
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
//调用拦截器的intercept方法,这里的拦截器是LoadBalancerInterceptor
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
2.1LoadBalancerInterceptor#intercept方法
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
//这里是将服务名称转ip的方法
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
2.1.1分析return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
首先分析this.requestFactory.createRequest(request, body, execution),因为返回的是函数性接口(LoadBalancerRequest),这里不会马上的调用
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
//还记得上面org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#execute
//方法中if (this.iterator.hasNext())的判断吗,这里就是递归的调用
return execution.execute(serviceRequest, body);
};
}
2.1.2 this.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request, body, execution)); 中的execute方法
org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#execute
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//关注这里,会调用的rule.choose(key); rule是注入到容器的NacosRule
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
//这里会调用函数性接口(LoadBalancerRequest)
return execute(serviceId, ribbonServer, request);
}
2.1.2.1通过 getServer(loadBalancer, hint);最终调用到NacosRule的choose方法
2.1.2.2 NacosRule#choose
@Override
public Server choose(Object key) {
try {
String clusterName = this.nacosDiscoveryProperties.getClusterName();
String group = this.nacosDiscoveryProperties.getGroup();
DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer();
String name = loadBalancer.getName();
NamingService namingService = nacosServiceManager
.getNamingService(nacosDiscoveryProperties.getNacosProperties());
//获取服务名称对应的实例集合,包含了ip端口等信息
List<Instance> instances = namingService.selectInstances(name, group, true);
if (CollectionUtils.isEmpty(instances)) {
LOGGER.warn("no instance in service {}", name);
return null;
}
List<Instance> instancesToChoose = instances;
//这里就是通过nacos通过ClusterName标签实现就近访问的代码
//同一个ClusterName标签首先会互相调用,如果没有则调用其他服务
if (StringUtils.isNotBlank(clusterName)) {
List<Instance> sameClusterInstances = instances.stream()
.filter(instance -> Objects.equals(clusterName,
instance.getClusterName()))
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sameClusterInstances)) {
instancesToChoose = sameClusterInstances;
}
else {
LOGGER.warn(
"A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}",
name, clusterName, instances);
}
}
//通过权重随机调用
Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);
return new NacosServer(instance);
}
catch (Exception e) {
LOGGER.warn("NacosRule error", e);
return null;
}
}
返回到2
拦截器调用结束后,进入else方法,调用下游服务
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
//this.iterator = interceptors.iterator() 拦截器的迭代器,if (this.iterator.hasNext())这个代码证明该execute方法
//肯定会被递归的调用
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
//调用拦截器的intercept方法,这里的拦截器是LoadBalancerInterceptor
return nextInterceptor.intercept(request, body, this);
}
else {
//拦截器链调用结束后
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
//调用下游服务
return delegate.execute();
}
}