依赖版本
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-spring-jaeger-cloud-starter</artifactId>
<version>3.3.1</version>
</dependency>
<!-- feign -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>3.0.6</version>
</dependency>
<dependency>
<groupId>io.github.openfeign.opentracing</groupId>
<artifactId>feign-opentracing</artifactId>
<version>0.4.0</version>
</dependency>
使用Feign做请求客户端
Feign重试机制核心逻辑,源码:SynchronousMethodHandler.java
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
调用请求捕获到RetryableException时,会调用retryer的continueOrPropagate判断是否需要重试,该方法抛出异常则不重试,否则重试
Retryer自带两种实现,其中NEVER_RETRY的continueOrPropagate直接抛出异常,表示从不重试。源码:Retryer.java
public interface Retryer extends Cloneable {
/**
* if retry is permitted, return (possibly after sleeping). Otherwise propagate the exception.
*/
void continueOrPropagate(RetryableException e);
Retryer clone();
class Default implements Retryer {...}
Retryer NEVER_RETRY = new Retryer() {
@Override
public void continueOrPropagate(RetryableException e) {
throw e;
}
@Override
public Retryer clone() {
return this;
}
};
Feign默认是不重试的,FeignClientsConfiguration.java源码:
@Bean
@ConditionalOnMissingBean
public Retryer feignRetryer() {
return Retryer.NEVER_RETRY;
}
而Retryer.Default则是重试一定次数后再抛出异常,不再重试。源码:
public void continueOrPropagate(RetryableException e) {
if (attempt++ >= maxAttempts) {
throw e;
}
long interval;
if (e.retryAfter() != null) {
interval = e.retryAfter().getTime() - currentTimeMillis();
if (interval > maxPeriod) {
interval = maxPeriod;
}
if (interval < 0) {
return;
}
} else {
interval = nextMaxInterval();
}
try {
Thread.sleep(interval);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
throw e;
}
sleptForMillis += interval;
}
// 重试时等待时间逐渐变长,nextInterval *= 1.5
long nextMaxInterval() {
long interval = (long) (period * Math.pow(1.5, attempt - 1));
return interval > maxPeriod ? maxPeriod : interval;
}
那么何时才会抛出RetryableException呢,我们来看executeAndDecode方法源码:
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (decoder != null)
return decoder.decode(response, metadata.returnType());
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);
try {
if (!resultFuture.isDone())
throw new IllegalStateException("Response handling not done");
return resultFuture.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
}
}
注意这里的targetRequest方法,这个方法里调用了feign的拦截器链,这意味着每次重试都会把拦截器链全部再调一遍,如果在某个拦截器中添加了header,重试时添加的header因为同名会把前一次添加的覆盖。源码:
Request targetRequest(RequestTemplate template) {
for (RequestInterceptor interceptor : requestInterceptors) {
interceptor.apply(template);
}
return target.apply(template);
}
请求链路追踪在client.execute(request, options)这里,这里的client是opentracing下的TracingClient.java,同理,每次请求重试都会重新生成span,一个请求重试多次,每次traceid相同,spanid不同。源码:
public Response execute(Request request, Request.Options options) throws IOException {
Span span = tracer.buildSpan(request.method())
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.start();
for (FeignSpanDecorator spanDecorator: spanDecorators) {
try {
spanDecorator.onRequest(request, options, span);
} catch (Exception ex) {
log.log(Level.SEVERE, "Exception during decorating span", ex);
}
}
request = inject(span.context(), request);
try (Scope scope = tracer.activateSpan(span)) {
Response response = delegate.execute(request, options);
for (FeignSpanDecorator spanDecorator : spanDecorators) {
try {
spanDecorator.onResponse(response, options, span);
} catch (Exception ex) {
log.log(Level.SEVERE, "Exception during decorating span", ex);
}
}
return response;
} catch (Exception ex) {
for (FeignSpanDecorator spanDecorator: spanDecorators) {
try {
spanDecorator.onError(ex, request, span);
} catch (Exception exDecorator) {
log.log(Level.SEVERE, "Exception during decorating span", exDecorator);
}
}
throw ex;
} finally {
span.finish();
}
}
默认decoder == null,因此我们主要看asyncResponseHandler.handleResponse方法,从这个方法我们可以看出,这里主要靠errorDecoder.decode(configKey, response)这个方法将响应解析为Exception(decode404默认为false)。源码:
void handleResponse(CompletableFuture<Object> resultFuture,
String configKey,
Response response,
Type returnType,
long elapsedTime) {
// copied fairly liberally from SynchronousMethodHandler
boolean shouldClose = true;
try {
if (logLevel != Level.NONE) {
response = logger.logAndRebufferResponse(configKey, logLevel, response,
elapsedTime);
}
if (Response.class == returnType) {
if (response.body() == null) {
resultFuture.complete(response);
} else if (response.body().length() == null
|| response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
shouldClose = false;
resultFuture.complete(response);
} else {
// Ensure the response body is disconnected
final byte[] bodyData = Util.toByteArray(response.body().asInputStream());
resultFuture.complete(response.toBuilder().body(bodyData).build());
}
} else if (response.status() >= 200 && response.status() < 300) {
if (isVoidType(returnType)) {
resultFuture.complete(null);
} else {
final Object result = decode(response, returnType);
shouldClose = closeAfterDecode;
resultFuture.complete(result);
}
} else if (decode404 && response.status() == 404 && !isVoidType(returnType)) {
final Object result = decode(response, returnType);
shouldClose = closeAfterDecode;
resultFuture.complete(result);
} else {
resultFuture.completeExceptionally(errorDecoder.decode(configKey, response));
}
} catch (final IOException e) {
if (logLevel != Level.NONE) {
logger.logIOException(configKey, logLevel, e, elapsedTime);
}
resultFuture.completeExceptionally(errorReading(response.request(), response, e));
} catch (final Exception e) {
resultFuture.completeExceptionally(e);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}
}
ErrorDecoder的默认实现是根据返回的response header中的Retry-After
值判断要不要重试及重试间隔。源码:
public interface ErrorDecoder {
public Exception decode(String methodKey, Response response);
public static class Default implements ErrorDecoder {
private final RetryAfterDecoder retryAfterDecoder = new RetryAfterDecoder();
@Override
public Exception decode(String methodKey, Response response) {
FeignException exception = errorStatus(methodKey, response);
Date retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER));
if (retryAfter != null) {
return new RetryableException(
response.status(),
exception.getMessage(),
response.request().httpMethod(),
exception,
retryAfter,
response.request());
}
return exception;
}
所以Feign配置重试机制需要做到两点:
1. 创建并注入Retryer开启重试
2. 自定义ErrorDecoder并注入,根据response定义规则何时需要重试(抛出RetryableException即触发重试),下面表示只要status不是2开头就需要重试(方法返回值类型是Response的除外)
public class FeignRetryConfig extends FeignConfiguration {
@Bean
public Retryer getRetryer() {
return new Retryer.Default(100, SECONDS.toMillis(1), 3);
}
@Bean
public ErrorDecoder getErrorDecoder() {
return new CustomErrorDecoder();
}
public static class CustomErrorDecoder implements ErrorDecoder {
@Override
public Exception decode(String methodKey, Response response) {
FeignException exception = errorStatus(methodKey, response);
return new RetryableException(
response.status(),
exception.getMessage(),
response.request().httpMethod(),
exception,
null,
response.request());
}
}
}
feign基础配置,可以添加拦截器对请求做预处理,如添加auth请求头等
public class FeignConfiguration {
@Bean
public CustomInterceptor feignInterceptor() {
return new CustomInterceptor();
}
public class CustomInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
// template.header(AUTHENTICATION_HEADER_NAME, getToken());
}
}
}
FeignClient可以通过指定不同的配置类来确定是否需要重试,不同的client互不干扰
@FeignClient(name = "user-client", url = "${feign.user-service.url}", configuration = FeignRetryConfig.class)
public interface UserClient {
@PostMapping(value = "/users", consumes = MediaType.APPLICATION_JSON_VALUE)
List<User> getUsers(UserRequest request);
}
@FeignClient(name = "company-client", url = "${feign.company-service.url}", configuration = FeignConfiguration.class)
public interface CompanyClient {
@PostMapping(value = "/companies", consumes = MediaType.APPLICATION_JSON_VALUE)
List<Company> getCompanies(CompanyRequest request);
}
也可以通过配置文件的形式配置重试
feign:
client:
config:
user-client:
retryer: feign.Retryer.Default
errorDecoder: xxx.config.FeignRetryConfig.CustomErrorDecoder
配置项类源码:FeignClientProperties.FeignClientConfiguration
public static class FeignClientConfiguration {
private Logger.Level loggerLevel;
private Integer connectTimeout;
private Integer readTimeout;
private Class<Retryer> retryer;
private Class<ErrorDecoder> errorDecoder;
private List<Class<RequestInterceptor>> requestInterceptors;
private Map<String, Collection<String>> defaultRequestHeaders;
private Map<String, Collection<String>> defaultQueryParameters;
private Boolean decode404;
private Class<Decoder> decoder;
private Class<Encoder> encoder;
private Class<Contract> contract;
private ExceptionPropagationPolicy exceptionPropagationPolicy;
private List<Class<Capability>> capabilities;
private MetricsProperties metrics;
private Boolean followRedirects;
使用RestTemplate做请求客户端
RestTemplate并没有自带重试机制,实现重试有两种方法,一是引入spring-retry模块,二是使用拦截器
使用spring-retry
自定义重试异常
public class RetryableException extends RuntimeException {
public RetryableException(String message) {
super(message);
}
}
重试机制设置
@Configuration
public class RetryConfig {
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 定义重试策略:仅当抛出RetryableException时重试,最大重试次数为3次
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3,
Collections.singletonMap(RetryableException.class, true));
retryTemplate.setRetryPolicy(retryPolicy);
// 设置固定的重试间隔:每次重试间隔2秒
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2000); // 2秒
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
}
请求调用
public String sendRequestWithRetry(String url) {
return retryTemplate.execute(context -> {
// 使用RestTemplate发送请求
String response = restTemplate.getForObject(url, String.class);
// 根据响应内容判断是否需要重试
if (isResponseInvalid(response)) {
// 如果响应不符合预期,抛出异常以触发重试
throw new RetryableException("Invalid response, need to retry");
}
// 响应正常,返回结果
return response;
}, context -> {
// 重试耗尽后的回调,可以在这里处理失败情况
return "Default response after retries exhausted";
});
}
private boolean isResponseInvalid(String response) {
// 根据实际业务逻辑判断响应是否有效
// 例如,检查响应内容是否为空,状态码是否为特定值等
return response == null || response.contains("error");
}
使用拦截器做请求重试
拦截器代码
public class RetryInterceptor implements ClientHttpRequestInterceptor {
private final int maxAttempts;
private final long retryInterval;
public RetryInterceptor(int maxAttempts, long retryInterval) {
this.maxAttempts = maxAttempts;
this.retryInterval = retryInterval;
}
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
int attempt = 0;
while (attempt < maxAttempts) {
try {
// 尝试执行请求
ClientHttpResponse response = execution.execute(request, body);
// 检查响应状态码或内容是否符合要求
if (isResponseValid(response)) {
// 如果响应有效,直接返回
return response;
} else {
// response可能需要手动释放 response.close()
System.out.println("Invalid response, retrying...");
}
} catch (RestClientException e) {
System.out.println("Request failed: " + e.getMessage());
}
attempt++;
if (attempt < maxAttempts) {
try {
Thread.sleep(retryInterval); // 等待重试间隔
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread was interrupted", e);
}
}
}
throw new RestClientException("Request failed after maximum retry attempts");
}
private boolean isResponseValid(ClientHttpResponse response) throws IOException {
// 根据实际需求判断响应是否有效
// 例如,检查响应状态码是否为200或检查响应内容
return response.getStatusCode().is2xxSuccessful();
}
}
restTemplate设置
@Configuration
public class AppConfig {
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
// 创建拦截器实例
RetryInterceptor retryInterceptor = new RetryInterceptor(3, 2000);
// 将拦截器添加到RestTemplate中
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>(restTemplate.getInterceptors());
interceptors.add(retryInterceptor);
restTemplate.setInterceptors(interceptors);
return restTemplate;
}
}
注意拦截器和请求调用的顺序,InterceptingClientHttpRequest.java:
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
...
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = InterceptingClientHttpRequest.this.interceptors.iterator();
}
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
return nextInterceptor.intercept(request, body, this);
} else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> {
delegate.getHeaders().addAll(key, value);
});
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
streamingOutputMessage.setBody((outputStream) -> {
StreamUtils.copy(body, outputStream);
});
} else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
...
}
使用拦截器实现重试机制和使用retry做重试,重试时机略有不同,使用拦截器做重试时执行时机取决于拦截器顺序