Spring Cloud Ribbon(后面简称Ribbon)是一个基于Netfix Ribbon实现的用于HTTP与TCP的客户端负载均衡工具。经过Spring Cloud的封装后,可以轻松的通过RestTemplate来实现服务的客户端负载均衡的调用。
一:概览
在使用Ribbon的时候,我们只需要经过简单的几个步骤即可使用【本章节基于Spring Cloud的Hoxton.RELEASE版本进行分析】
1,配置依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
2,将RestTemplate实例使用@LoadBalanced注解标注
@Bean // 初始化 Bean
@LoadBalanced // 实现负载均衡
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
return restTemplate ;
}
3,在配置文件中添加必要的配置
########指定客户端配置【覆盖全局配置】########
<clientName>.ribbon.listOfServers=ip1:port,ip2:port #服务所拥有的访问地址
<clientName>.ribbon.ConnectTimeout=250 #请求连接的超时时间
<clientName>.ribbon.ReadTimeout=1000 #请求处理的超时时间
<clientName>.ribbon.OkToRetryOnAllOperations=false #对所有的操作请求都进行重试[只对GET请求重试]
<clientName>.ribbon.MaxAutoRetriesNextServer=2 #切换实例的重试次数
<clientName>.ribbon.MaxAutoRetries=1 #对当前实例的重试次数
spring.cloud.loadbalancer.retry.enable=true #是否开启重试机制【默认开启】
########全局配置(缺少clientName)[适用于所有客户端]########
ribbon.ConnectTimeout=2000 #请求连接的超时时间,默认2000
ribbon.ReadTimeout=5000 #请求处理的超时时间,默认5000
ribbon.MaxAutoRetries=0 #对当前实例的重试次数,默认0
ribbon.MaxAutoRetriesNextServer=1 #切换实例的重试次数,默认1
ribbon.OkToRetryOnAllOperations=false #对所有的操作请求都进行重试[为false时只对GET请求重试],默认false
ribbon.retryableStatusCodes=500,501 #进行重试判断的响应状态码,多个逗号分隔【默认为空】
ribbon.listOfServers=ip3:port,ip4:port #服务所拥有的访问地址
########Ribbon 客户端行为配置########
ribbon.eager-load.enabled=flase #是否开启饥饿加载,即初始化就创建Ribon客户端而非等到第一次访问
ribbon.eager-load.clients=<clientName-1>,<clientName-2> #开启饥饿加载的客户端名,多个逗号分隔
其中的<clientName>需要替换成真正的服务名称,listOfServers后面的值为该服务所对应的访问地址【实例清单】,多个用英文逗号分隔。完整的格式含义:
<clientName>.<nameSpace>.<propertyName>=<value>
其nameSpace的默认值为ribbon。
当环境中引入了spring-retry包的时候会自动构建具备重试功能的拦截器(否则为不具备重试功能的拦截器),然后通过参数spring.cloud.loadbalancer.retry.enable可以控制是否开启重试功能【这个都是在comons中定义的,如果要集成类似Ribbon,只需要自定义一个LoadBalancedRetryFactory类型的Bean即可,然后就可以自定义自己使用的各种参数了】。
引入spring-retry从而配置具备重试功能的RestTemplate拦截器
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.2.5.RELEASE</version>
</dependency>
更多的默认值请参考:com.netflix.client.config.DefaultClientConfigImpl.java
4,使用RestTemplate实例发起服务的调用
restTemplate.getForObject("http://server-name/path/to/query",
String.class, new HashMap<String,String>()) ;
二:源码解析
接下来我们通过源码来进一步分析其实现原理,首先我们需要关注的是依赖的spring-cloud-commons-2.2.0.RELEASE.jar
1,spring-cloud-commons-2.2.0.RELEASE
Spring Cloud Commons是一组抽象和公共类,用于不同的Spring Cloud实现(比如Spring Cloud Netflix和Spring Cloud Consul)。
在该包的/META-INF/spring.factories中配置了一个关于负载均衡的启动类,如下所示
# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.CommonsClientAutoConfiguration,\
org.springframework.cloud.client.ReactiveCommonsClientAutoConfiguration,\
#配置ApacheHttpClient(默认)或者OkHttpClient
org.springframework.cloud.commons.httpclient.HttpClientConfiguration,\
org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerBeanPostProcessorAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancerAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.reactive.ReactorLoadBalancerClientAutoConfiguration
我们从非响应式的配置类LoadBalancerAutoConfiguration入手分析其启动过程(对于异步的AsyncLoadBalancerAutoConfiguration与此类似)
@Configuration(proxyBeanMethods = false)
//RestTemplate类位于类路径上时满足条件
@ConditionalOnClass(RestTemplate.class)
//LoadBalancerClient类型的Bean包含在BeanFactory中时满足条件
@ConditionalOnBean(LoadBalancerClient.class)
//读取spring.cloud.loadbalancer.retry.enable配置的值
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
//注入当前IOC环境中所有被@LoadBalanced注解标注的RestTemplate实例对象
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
//应用启动后,收集当前IOC中所有的RestTemplateCustomizer对象,并使用该对象对RestTemplate进行定制操作
//通过这种方式给RestTemplate设置拦截器等
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
/**
* 如果当前环境中没有LoadBalancerRequestFactory对象则创建
*
* 注意:这对象并不是RestTemplate里面使用的ClientHttpRequestFactory类型的对象,
* 而只是一个拥有LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
* final byte[] body,final ClientHttpRequestExecution execution)
* 方法的普通对象。 可看成一个工具类而已。
*/
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
////////////以下是配置不具备重试功能的拦截器//////////////
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
//配置拦截器
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
//通过该ClientHttpRequestInterceptor类型的拦截器来实现客户端负载均衡功能
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//给RestTemplate设置拦截器
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
////////////以下是配置具备重试功能的拦截器//////////////
/**
* 如果在当前环境中能找到RetryTemplate类,则使用以下方式进行配置[目的是让其支持重试功能]
* 需要引入spring-retry包
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RetryTemplate.class)
public static class RetryAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public LoadBalancedRetryFactory loadBalancedRetryFactory() {
return new LoadBalancedRetryFactory() {
};
}
}
/**
* 如果在当前环境中能找到RetryTemplate类,则使用以下方式进行配置拦截器[目的是让其支持重试功能]
* 需要引入spring-retry包
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RetryTemplate.class)
public static class RetryInterceptorAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RetryLoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRetryProperties properties,
LoadBalancerRequestFactory requestFactory,
LoadBalancedRetryFactory loadBalancedRetryFactory) {
return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
requestFactory, loadBalancedRetryFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
从上面的源码可知,只要当前环境中具备LoadBalancerClient类型的Bean,那么就会自动给RestTemplate添加上拦截器,从而达到负载均衡的目的。而具体怎么做负载均衡控制,都是在LoadBalancerClient的实现类中完成。为了进一步探究其执行过程,我们先来分析下LoadBalancerInterceptor对象,该对象的继承关系如下所示
LoadBalancerInterceptor的源码:
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
//该方法在RestTemplate中的InterceptingClientHttpRequest对象中被调用,可参考前面章节的介绍
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
//取出URI中的host作为服务名
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
//执行LoadBalancerClient实现类的execute方法并返回结果
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
}
可见,其LoadBalancerInterceptor 拦截器的作用就是为了调用LoadBalancerClient实例所提供的execute方法而已。
其中,LoadBalancerRequestFactory源码如下
public class LoadBalancerRequestFactory {
private LoadBalancerClient loadBalancer;
private List<LoadBalancerRequestTransformer> transformers;
public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
List<LoadBalancerRequestTransformer> transformers) {
this.loadBalancer = loadBalancer;
this.transformers = transformers;
}
public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
this.loadBalancer = loadBalancer;
}
//创建一个LoadBalancerRequest<T>类型的对象,返回后供LoadBalancerClient 使用
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
//创建一个匿名的LoadBalancerRequest类型的实例对象
//这里的instance为ServiceInstance类型的对象(即被选择的服务对象)
return instance -> {
//通过包装器来对请求进行处理
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
//继续执行其他的拦截器
return execution.execute(serviceRequest, body);
};
}
}
重点:LoadBalancerClient 的execute方法需要接收一个LoadBalancerRequest类型的实例对象,至于该对象在何处创建并不重要。
ServiceRequestWrapper源码如下。这里主要是对getURI重写,通过LoadBalancer来解析得出具体的url地址
public class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
private final LoadBalancerClient loadBalancer;
public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
LoadBalancerClient loadBalancer) {
super(request);
this.instance = instance;
this.loadBalancer = loadBalancer;
}
@Override
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}
}
其中的LoadBalancerRequest源码如下
public interface LoadBalancerRequest<T> {
T apply(ServiceInstance instance) throws Exception;
}
到此为止就是spring-cloud-commons-2.2.0.RELEASE为我们定义的使用RestTemplate做客户端负载均衡的处理逻辑,我们只需要在IOC中具备LoadBalancerClient实现类即可。接着我们看下LoadBalancerClient的继承关系
public interface ServiceInstanceChooser {
/**
* Chooses a ServiceInstance from the LoadBalancer for the specified service.
* @param serviceId The service ID to look up the LoadBalancer.
* @return A ServiceInstance that matches the serviceId.
*/
ServiceInstance choose(String serviceId);
}
public interface LoadBalancerClient extends ServiceInstanceChooser {
/**
* 在RestTemplate的拦截器中调用的方法
* Executes request using a ServiceInstance from the LoadBalancer for the specified
* service.
* @param serviceId The service ID to look up the LoadBalancer.
* @param request Allows implementations to execute pre and post actions, such as
* incrementing metrics.
* @param <T> type of the response
* @throws IOException in case of IO issues.
* @return The result of the LoadBalancerRequest callback on the selected
* ServiceInstance.
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* Executes request using a ServiceInstance from the LoadBalancer for the specified
* service.
* @param serviceId The service ID to look up the LoadBalancer.
* @param serviceInstance The service to execute the request to.
* @param request Allows implementations to execute pre and post actions, such as
* incrementing metrics.
* @param <T> type of the response
* @throws IOException in case of IO issues.
* @return The result of the LoadBalancerRequest callback on the selected
* ServiceInstance.
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException;
/**
* Creates a proper URI with a real host and port for systems to utilize. Some systems
* use a URI with the logical service name as the host, such as
* http://myservice/path/to/service. This will replace the service name with the
* host:port from the ServiceInstance.
* @param instance service instance to reconstruct the URI
* @param original A URI with the host as a logical service name.
* @return A reconstructed URI.
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
小结
在这里总结下上面的整个流程
收集所有被@LoadBalanced注解标注的RestTemplate对象
使用LoadBalancerClient类型的对象作为构造参数来创建LoadBalancerRequestFactory实例(是个普通的类,其createRequest方法返回LoadBalancerRequest类型实例对象)
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
return execution.execute(serviceRequest, body);
};
}
情景一:没有引入RetryTemplate的情况
使用LoadBalancerClient与LoadBalancerRequestFactory一起作为构造参数创建LoadBalancerInterceptor实例
构建RestTemplateCustomizer对象,将LoadBalancerInterceptor添加到RestTemplate的Interceptors中
情景二:引入RetryTemplate的情况
如果当前环境中没有LoadBalancedRetryFactory类型的Bean,则实例化LoadBalancedRetryFactory对象【在Ribbon中会创建LoadBalancedRetryFactory实例对象】
创建RetryLoadBalancerInterceptor实例对象【LoadBalancerClient, LoadBalancerRetryProperties, LoadBalancerRequestFactory,LoadBalancedRetryFactory作为构造参数】
构建RestTemplateCustomizer对象,将RetryLoadBalancerInterceptor添加到RestTemplate的Interceptors中
通过上面的步骤完成了对RestTemplate拦截器的创建与添加工作,最后当执行RestTemplate的方法进行HTTP请求时,其添加拦截器的intercept方法将会被调用【ClientHttpResponse intercept(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) 】。
如果是LoadBalancerInterceptor拦截器
1)取出URI中的Host作为服务名(serviceName)
2)调用requestFactory的createRequest方法得到LoadBalancerRequest类型对象
3)使用serviceName和LoadBalancerRequest做为参数调用LoadBalancerClient的execute方法并将结果返回
如果是RetryLoadBalancerInterceptor拦截器
1)取出URI中的Host作为服务名(serviceName)
2)创建重试策略-LoadBalancedRetryPolicy
3)创建RetryTemplate对象
4)然后执行RetryTemplate的execute方法并将结果返回
这里主要介绍了spring-cloud-commons包中给我们定义的使用RestTemplate进行负载均衡访问的方法,我们只需要自定义一个LoadBalancerClient的实现类,然后在实现类中完成负载均衡算法即可。
在后面的章节将继续解释Ribbon与RestTemplate的整合过程。
自定义LoadBalancerClient 以及使用举例:
public class CustomLoadBalancerClient implements LoadBalancerClient {
public CustomLoadBalancerClient () {
System.out.print("我被实例化了:" + CustomLoadBalancerClient.class.getName());
}
public static class CustomServiceInstance implements ServiceInstance {
private final String serviceId;
private String host;
private int port = 80;
private String scheme;
private String instanceId;
private Map<String, String> metadata;
public CustomServiceInstance(String serviceId, String instanceId, String host, int port, String scheme,
Map<String, String> metadata) {
this.serviceId = serviceId;
this.instanceId = instanceId;
this.host = host;
this.port = port;
this.scheme = scheme;
this.metadata = metadata;
}
@Override
public String getInstanceId() {
return this.instanceId;
}
@Override
public String getServiceId() {
return this.serviceId;
}
@Override
public String getHost() {
return this.host;
}
@Override
public int getPort() {
return this.port;
}
@Override
public URI getUri() {
return DefaultServiceInstance.getUri(this);
}
@Override
public Map<String, String> getMetadata() {
return this.metadata;
}
@Override
public String getScheme() {
return this.scheme;
}
@Override
public boolean isSecure() {
return false;
}
}
/**
* 重写方法,进行服务的筛选
* */
@Override
public ServiceInstance choose(String serviceId) {
System.out.println("需要进行选择的服务ID为:" + serviceId);
String instanceId = "instance-1001";
String host = "192.168.30.161";
int port = 8000;
String scheme = "http";
Map<String, String> metadata = null ;
return new CustomServiceInstance(serviceId, instanceId, host, port, scheme, metadata);
}
/**
* 被LoadBalancerInterceptor拦截器所调用的方法
* */
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
System.out.println("我被执行了哦:" + this.getClass().getName() + " -- > execute");
ServiceInstance serviceInstance = this.choose(serviceId) ;
return execute(serviceId, serviceInstance, request);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException {
try {
T returnVal = request.apply(serviceInstance);
return returnVal;
} catch (IOException ex) {
throw ex;
} catch (Exception ex) {
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
/**
* @param instance 经过筛选后要访问的服务实例信息
* @param original 原始访问路径
*/
@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String scheme = instance.getScheme();
String host = instance.getHost();
int port = instance.getPort();
return reconstructURIWithServer(scheme, host, port, original);
}
public URI reconstructURIWithServer(String scheme, String host, int port, URI original) {
if (host.equals(original.getHost()) && port == original.getPort() && scheme == original.getScheme()) {
return original;
}
if (scheme == null) {
scheme = original.getScheme();
}
if (scheme == null) {
scheme = deriveSchemeAndPortFromPartialUri(original).first();
}
try {
StringBuilder sb = new StringBuilder();
sb.append(scheme).append("://");
if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
sb.append(original.getRawUserInfo()).append("@");
}
sb.append(host);
if (port >= 0) {
sb.append(":").append(port);
}
sb.append(original.getRawPath());
if (!Strings.isNullOrEmpty(original.getRawQuery())) {
sb.append("?").append(original.getRawQuery());
}
if (!Strings.isNullOrEmpty(original.getRawFragment())) {
sb.append("#").append(original.getRawFragment());
}
URI newURI = new URI(sb.toString());
return newURI;
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
protected Pair<String, Integer> deriveSchemeAndPortFromPartialUri(URI uri) {
boolean isSecure = false;
String scheme = uri.getScheme();
if (scheme != null) {
isSecure = scheme.equalsIgnoreCase("https");
}
int port = uri.getPort();
if (port < 0 && !isSecure) {
port = 80;
} else if (port < 0 && isSecure) {
port = 443;
}
if (scheme == null) {
if (isSecure) {
scheme = "https";
} else {
scheme = "http";
}
}
return new Pair<String, Integer>(scheme, port);
}
public class Pair<E1, E2> implements Serializable {
private static final long serialVersionUID = 2L;
private E1 mFirst;
private E2 mSecond;
public Pair(E1 first, E2 second) {
mFirst = first;
mSecond = second;
}
public E1 first() {
return mFirst;
}
public E2 second() {
return mSecond;
}
public void setFirst(E1 first) {
mFirst = first;
}
public void setSecond(E2 second) {
mSecond = second;
}
}
}
然后添加配置
@Configuration
@AutoConfigureBefore({RibbonAutoConfiguration.class })
public class CustomLoadBalancerClientConfig {
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
return restTemplate ;
}
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new CustomLoadBalancerClient();
}
}
这时使用RestTemplate进行访问就可以看到打印的日志了。