说起负载均衡一般都会想到服务端的负载均衡,常用产品包括LBS硬件或云服务、Nginx等,都是耳熟能详的产品。
而Spring Cloud提供了让服务调用端具备负载均衡能力的Ribbon,通过和Eureka的紧密结合,不用在服务集群内再架设负载均衡服务,很大程度简化了服务集群内的架构。
具体也不想多写虚的介绍,反正哪里都能看得到相关的介绍。
直接开撸代码,通过代码来看Ribbon是如何实现的。
配置
详解:
1.RibbonAutoConfiguration配置生成RibbonLoadBalancerClient实例。
代码位置:
spring-cloud-netflix-core-1.3.5.RELEASE.jar
org.springframework.cloud.netflix.ribbon
RibbonAutoConfiguration.class
按 Ctrl+C 复制代码
按 Ctrl+C 复制代码
先看配置条件项,RibbonAutoConfiguration配置必须在LoadBalancerAutoConfiguration配置前执行,因为在LoadBalancerAutoConfiguration配置中会使用RibbonLoadBalancerClient实例。
RibbonLoadBalancerClient继承自LoadBalancerClient接口,是负载均衡客户端,也是负载均衡策略的调用方。
2.LoadBalancerInterceptorConfig配置生成:
1).负载均衡拦截器LoadBalancerInterceptor实例
包含:
LoadBalancerClient实现类的RibbonLoadBalancerClient实例
负载均衡的请求创建工厂LoadBalancerRequestFactory:实例
2).RestTemplate自定义的RestTemplateCustomizer实例
代码位置:
spring-cloud-commons-1.2.4.RELEASE.jar
org.springframework.cloud.client.loadbalancer
LoadBalancerAutoConfiguration.class
按 Ctrl+C 复制代码
按 Ctrl+C 复制代码
先看配置条件项:
要求在项目环境中必须要有RestTemplate类。
要求必须要有LoadBalancerClient接口的实现类的实例,也就是上一步生成的RibbonLoadBalancerClient。
3.通过上面一步创建的RestTemplateCustomizer配置所有RestTemplate实例,就是将负载均衡拦截器设置给RestTemplate实例。
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)publicclass LoadBalancerAutoConfiguration {
// 略 @Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
finalList customizers) {
returnnew SmartInitializingSingleton() {
@Override
publicvoid afterSingletonsInstantiated() {
for(RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for(RestTemplateCustomizer customizer : customizers) {customizer.customize(restTemplate);}} } }; }// 略 @Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
staticclass LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
returnnew LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
returnnew RestTemplateCustomizer() {
@Override
publicvoid customize(RestTemplate restTemplate) {List list =newArrayList<>(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);} }; } }// 略}
restTemplate.setInterceptors(list)这个地方就是注入负载均衡拦截器的地方LoadBalancerInterceptor。
从这个地方实际上也可以猜出来,RestTemplate可以通过注入的拦截器来构建相应的请求实现负载均衡。
也能看出来可以自定义拦截器实现其他目的。
4.RibbonClientConfiguration配置生成ZoneAwareLoadBalancer实例
代码位置:
spring-cloud-netflix-core-1.3.5.RELEASE.jar
org.springframework.cloud.netflix.ribbon
RibbonClientConfiguration.class
@SuppressWarnings("deprecation")
@Configuration
@EnableConfigurationProperties//Order is important here, last should be the default, first should be optional// see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653@Import({OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})publicclass RibbonClientConfiguration {
// 略 @Bean
@ConditionalOnMissingBean
public ILoadBalancerribbonLoadBalancer(IClientConfig config, ServerList serverList, ServerListFilter serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if(this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
returnthis.propertiesFactory.get(ILoadBalancer.class, config, name);
}
returnnewZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);}// 略}
ZoneAwareLoadBalancer继承自ILoadBalancer接口,该接口有一个方法:
/** * Choose a server from load balancer.
*
* @param key An object that the load balancer may use to determine which server to return. null if
* the load balancer does not use this parameter.
* @return server chosen
*/publicServer chooseServer(Object key);
ZoneAwareLoadBalancer就是一个具体的负载均衡实现类,也是默认的负载均衡类,通过对chooseServer方法的实现选取某个服务实例。
拦截&请求
1.使用RestTemplate进行Get、Post等各种请求,都是通过doExecute方法实现
代码位置:spring-web-4.3.12.RELEASE.jar
org.springframework.web.client
RestTemplate.class
publicclassRestTemplateextendsInterceptingHttpAccessorimplements RestOperations {
// 略protectedTdoExecute(URI url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor)throws RestClientException {
Assert.notNull(url, "'url' must not be null");
Assert.notNull(method, "'method' must not be null");
ClientHttpResponse response =null;
try{ClientHttpRequest request = createRequest(url, method);if(requestCallback !=null) { requestCallback.doWithRequest(request); }response = request.execute();handleResponse(url, method, response);if(responseExtractor !=null) {
return responseExtractor.extractData(response);
}
else {
returnnull;
}
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query !=null? resource.substring(0, resource.indexOf('?')) : resource);
thrownewResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if(response !=null) {
response.close();
}
}
}
// 略}
支持的各种http请求方法最终都是调用doExecute方法,该方法内调用创建方法创建请求实例,并执行请求得到响应对象。
2.生成请求实例创建工厂
上一步代码中,调用createRequest方法创建请求实例,这个方法是定义在父类中。
先整理出主要的继承关系:
createRequest方法实际是定义在HttpAccessor抽象类中。
publicabstractclass HttpAccessor {
privateClientHttpRequestFactory requestFactory =new SimpleClientHttpRequestFactory();publicvoid setRequestFactory(ClientHttpRequestFactory requestFactory) {
Assert.notNull(requestFactory, "ClientHttpRequestFactory must not be null");
this.requestFactory = requestFactory;
}
public ClientHttpRequestFactory getRequestFactory() {
returnthis.requestFactory;
}
protectedClientHttpRequestcreateRequest(URI url, HttpMethod method)throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);if (logger.isDebugEnabled()) {
logger.debug("Created " + method.name() + " request for \"" + url + "\"");
}
return request;
}
}
在createRequest方法中调用getRequestFactory方法获得请求实例创建工厂,实际上getRequestFactory并不是当前HttpAccessor类中定义的,而是在子类InterceptingHttpAccessor中定义的。
publicabstractclassInterceptingHttpAccessorextends HttpAccessor {
privateList interceptors =newArrayList();
publicvoidsetInterceptors(List interceptors) {
this.interceptors = interceptors;
}
publicList getInterceptors() {
return interceptors;
}
@Override
publicClientHttpRequestFactorygetRequestFactory() {ClientHttpRequestFactory delegate =super.getRequestFactory();if(!CollectionUtils.isEmpty(getInterceptors())) {returnnew InterceptingClientHttpRequestFactory(delegate, getInterceptors()); }else {
return delegate;
}
}
}
在这里做了个小动作,首先还是通过HttpAccessor类创建并获得SimpleClientHttpRequestFactory工厂,这个工厂主要就是在没有拦截器的时候创建基本请求实例。
其次,在有拦截器注入的情况下,创建InterceptingClientHttpRequestFactory工厂,该工厂就是创建带拦截器的请求实例,因为注入了负载均衡拦截器,所以这里就从InterceptingClientHttpRequestFactory工厂创建。
3.通过工厂创建请求实例
创建实例就看工厂的createRequest方法。
publicclassInterceptingClientHttpRequestFactoryextends AbstractClientHttpRequestFactoryWrapper {
privatefinalList interceptors;
public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
List interceptors) {
super(requestFactory);
this.interceptors = (interceptors !=null? interceptors : Collections.emptyList());
}
@Override
protectedClientHttpRequestcreateRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {returnnewInterceptingClientHttpRequest(requestFactory,this.interceptors, uri, httpMethod);
}
}
就是new了个InterceptingClientHttpRequest实例,并且把拦截器、基本请求实例创建工厂注进去。
4.请求实例调用配置阶段注入的负载均衡拦截器的拦截方法intercept
可从第1步看出,创建完请求实例后,通过执行请求实例的execute方法执行请求。
ClientHttpRequest request = createRequest(url, method);if(requestCallback !=null) {
requestCallback.doWithRequest(request);
}
response =request.execute();
实际请求实例是InterceptingClientHttpRequest,execute实际是在它的父类中。
类定义位置:
spring-web-4.3.12.RELEASE.jar
org.springframework.http.client
InterceptingClientHttpRequest.class
看一下它们的继承关系。
在execute方法中实际调用了子类实现的executeInternal方法。
publicabstractclassAbstractClientHttpRequestimplements ClientHttpRequest {
privatefinalHttpHeaders headers =new HttpHeaders();
privatebooleanexecuted =false;
@Override
publicfinal HttpHeaders getHeaders() {
return(this.executed ? HttpHeaders.readOnlyHttpHeaders(this.headers) :this.headers);
}
@Override
publicfinalOutputStream getBody()throws IOException {
assertNotExecuted();
returngetBodyInternal(this.headers);
}
@Override
publicfinalClientHttpResponseexecute()throws IOException {
assertNotExecuted();
ClientHttpResponse result =executeInternal(this.headers);
this.executed =true;
return result;
}
protectedvoid assertNotExecuted() {
Assert.state(!this.executed, "ClientHttpRequest already executed");
}
protectedabstractOutputStream getBodyInternal(HttpHeaders headers)throws IOException;
protectedabstractClientHttpResponse executeInternal(HttpHeaders headers)throws IOException;
}
其实就是InterceptingClientHttpRequest类的executeInternal方法,其中,又调用了一个执行器InterceptingRequestExecution的execute,通关判断如果有拦截器注入进来过,就调用拦截器的intercept方法。
这里的拦截器实际上就是在配置阶段注入进RestTemplate实例的负载均衡拦截器LoadBalancerInterceptor实例,可参考上面配置阶段的第2步。
classInterceptingClientHttpRequestextends AbstractBufferingClientHttpRequest {
// 略 @Override
protectedfinalClientHttpResponseexecuteInternal(HttpHeaders headers,byte[] bufferedOutput)throws IOException {
InterceptingRequestExecution requestExecution =new InterceptingRequestExecution();
returnrequestExecution.execute(this, bufferedOutput);
}
privateclassInterceptingRequestExecutionimplements ClientHttpRequestExecution {
privatefinalIterator iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
@Override
publicClientHttpResponse execute(HttpRequest request,byte[] body)throws IOException {
if(this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor =this.iterator.next();
returnnextInterceptor.intercept(request, body,this); }else {
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
for(Map.Entry> entry : request.getHeaders().entrySet()) {
List values = entry.getValue();
for (String value : values) {
delegate.getHeaders().add(entry.getKey(), value);
}
}
if(body.length > 0) {
StreamUtils.copy(body, delegate.getBody());
}
return delegate.execute();
}
}
}
}
5.负载均衡拦截器调用负载均衡客户端
在负载均衡拦截器LoadBalancerInterceptor类的intercept方法中,又调用了负载均衡客户端LoadBalancerClient实现类的execute方法。
publicclassLoadBalancerInterceptorimplements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibilitythis(loadBalancer,new LoadBalancerRequestFactory(loadBalancer));
}
@Override
publicClientHttpResponseintercept(finalHttpRequest request,finalbyte[] body,
finalClientHttpRequestExecution execution)throws IOException {
finalURI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName !=null, "Request URI does not contain a valid hostname: " + originalUri);
returnthis.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); }}
在配置阶段的第1步,可以看到实现类是RibbonLoadBalancerClient。
6.负载均衡客户端调用负载均衡策略选取目标服务实例并发起请求
在RibbonLoadBalancerClient的第一个execute方法以及getServer方法中可以看到,实际上是通过ILoadBalancer的负载均衡器实现类作的chooseServer方法选取一个服务,交给接下来的请求对象发起一个请求。
这里的负载均衡实现类默认是ZoneAwareLoadBalancer区域感知负载均衡器实例,其内部通过均衡策略选择一个服务。
ZoneAwareLoadBalancer的创建可以参考配置阶段的第4步。
publicclassRibbonLoadBalancerClientimplements LoadBalancerClient { @Override
public T execute(String serviceId, LoadBalancerRequest request)throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server= getServer(loadBalancer);if(server ==null) {
thrownewIllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer =new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request); } @Overridepublic T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request)throws IOException {
Server server =null;
if(serviceInstanceinstanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if(server ==null) {
thrownewIllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context =this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder =new RibbonStatsRecorder(context, server);
try{T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal);return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctlycatch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
returnnull;
}
// 略 protected Server getServer(ILoadBalancer loadBalancer) {
if(loadBalancer ==null) {
returnnull;
}
returnloadBalancer.chooseServer("default");// TODO: better handling of key }
protected ILoadBalancer getLoadBalancer(String serviceId) {
returnthis.clientFactory.getLoadBalancer(serviceId);
}
publicstaticclassRibbonServerimplements ServiceInstance {
privatefinal String serviceId;
privatefinal Server server;
privatefinalboolean secure;
privateMap metadata;
public RibbonServer(String serviceId, Server server) {
this(serviceId, server,false, Collections. emptyMap());
}
publicRibbonServer(String serviceId, Server server,boolean secure,
Map metadata) {
this.serviceId = serviceId;
this.server = server;
this.secure = secure;
this.metadata = metadata;
}
// 略 }
}
代码撸完,总结下。
普通使用RestTemplate请求其他服务时,内部使用的就是常规的http请求实例发送请求。
为RestTemplate增加了@LoanBalanced 注解后,实际上通过配置,为RestTemplate注入负载均衡拦截器,让负载均衡器选择根据其对应的策略选择合适的服务后,再发送请求。
End
注:喜欢的小伙伴可以点赞关注,一起学习进步