spring下使用Feign和RestTemplate的请求重试机制异同分析

依赖版本

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-spring-jaeger-cloud-starter</artifactId>
    <version>3.3.1</version>
</dependency>

<!-- feign -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
    <version>3.0.6</version>
</dependency>
<dependency>
    <groupId>io.github.openfeign.opentracing</groupId>
    <artifactId>feign-opentracing</artifactId>
    <version>0.4.0</version>
</dependency>

使用Feign做请求客户端

Feign重试机制核心逻辑,源码:SynchronousMethodHandler.java

  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Options options = findOptions(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template, options);
      } catch (RetryableException e) {
        try {
          retryer.continueOrPropagate(e);
        } catch (RetryableException th) {
          Throwable cause = th.getCause();
          if (propagationPolicy == UNWRAP && cause != null) {
            throw cause;
          } else {
            throw th;
          }
        }
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }

调用请求捕获到RetryableException时,会调用retryer的continueOrPropagate判断是否需要重试,该方法抛出异常则不重试,否则重试

Retryer自带两种实现,其中NEVER_RETRY的continueOrPropagate直接抛出异常,表示从不重试。源码:Retryer.java

public interface Retryer extends Cloneable {

  /**
   * if retry is permitted, return (possibly after sleeping). Otherwise propagate the exception.
   */
  void continueOrPropagate(RetryableException e);

  Retryer clone();

  class Default implements Retryer {...}

  Retryer NEVER_RETRY = new Retryer() {

    @Override
    public void continueOrPropagate(RetryableException e) {
      throw e;
    }

    @Override
    public Retryer clone() {
      return this;
    }
  };

Feign默认是不重试的,FeignClientsConfiguration.java源码:

    @Bean
    @ConditionalOnMissingBean
    public Retryer feignRetryer() {
        return Retryer.NEVER_RETRY;
    }

而Retryer.Default则是重试一定次数后再抛出异常,不再重试。源码:

public void continueOrPropagate(RetryableException e) {
      if (attempt++ >= maxAttempts) {
        throw e;
      }

      long interval;
      if (e.retryAfter() != null) {
        interval = e.retryAfter().getTime() - currentTimeMillis();
        if (interval > maxPeriod) {
          interval = maxPeriod;
        }
        if (interval < 0) {
          return;
        }
      } else {
        interval = nextMaxInterval();
      }
      try {
        Thread.sleep(interval);
      } catch (InterruptedException ignored) {
        Thread.currentThread().interrupt();
        throw e;
      }
      sleptForMillis += interval;
    }

    // 重试时等待时间逐渐变长,nextInterval *= 1.5
    long nextMaxInterval() {
      long interval = (long) (period * Math.pow(1.5, attempt - 1));
      return interval > maxPeriod ? maxPeriod : interval;
    }

那么何时才会抛出RetryableException呢,我们来看executeAndDecode方法源码:

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
    Request request = targetRequest(template);

    if (logLevel != Logger.Level.NONE) {
      logger.logRequest(metadata.configKey(), logLevel, request);
    }

    Response response;
    long start = System.nanoTime();
    try {
      response = client.execute(request, options);
      // ensure the request is set. TODO: remove in Feign 12
      response = response.toBuilder()
          .request(request)
          .requestTemplate(template)
          .build();
    } catch (IOException e) {
      if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
      }
      throw errorExecuting(request, e);
    }
    long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);


    if (decoder != null)
      return decoder.decode(response, metadata.returnType());

    CompletableFuture<Object> resultFuture = new CompletableFuture<>();
    asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
        metadata.returnType(),
        elapsedTime);

    try {
      if (!resultFuture.isDone())
        throw new IllegalStateException("Response handling not done");

      return resultFuture.join();
    } catch (CompletionException e) {
      Throwable cause = e.getCause();
      if (cause != null)
        throw cause;
      throw e;
    }
  }

注意这里的targetRequest方法,这个方法里调用了feign的拦截器链,这意味着每次重试都会把拦截器链全部再调一遍,如果在某个拦截器中添加了header,重试时添加的header因为同名会把前一次添加的覆盖。源码:

  Request targetRequest(RequestTemplate template) {
    for (RequestInterceptor interceptor : requestInterceptors) {
      interceptor.apply(template);
    }
    return target.apply(template);
  }

请求链路追踪在client.execute(request, options)这里,这里的client是opentracing下的TracingClient.java,同理,每次请求重试都会重新生成span,一个请求重试多次,每次traceid相同,spanid不同。源码:

public Response execute(Request request, Request.Options options) throws IOException {
        Span span = tracer.buildSpan(request.method())
            .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
            .start();

        for (FeignSpanDecorator spanDecorator: spanDecorators) {
            try {
                spanDecorator.onRequest(request, options, span);
            } catch (Exception ex) {
                log.log(Level.SEVERE, "Exception during decorating span", ex);
            }
        }

        request = inject(span.context(), request);

        try (Scope scope = tracer.activateSpan(span)) {
            Response response = delegate.execute(request, options);
            for (FeignSpanDecorator spanDecorator : spanDecorators) {
                try {
                    spanDecorator.onResponse(response, options, span);
                } catch (Exception ex) {
                    log.log(Level.SEVERE, "Exception during decorating span", ex);
                }
            }
            return response;
        } catch (Exception ex) {
            for (FeignSpanDecorator spanDecorator: spanDecorators) {
                try {
                    spanDecorator.onError(ex, request, span);
                } catch (Exception exDecorator) {
                    log.log(Level.SEVERE, "Exception during decorating span", exDecorator);
                }
            }

            throw ex;
        } finally {
            span.finish();
        }
    }

默认decoder == null,因此我们主要看asyncResponseHandler.handleResponse方法,从这个方法我们可以看出,这里主要靠errorDecoder.decode(configKey, response)这个方法将响应解析为Exception(decode404默认为false)。源码:

void handleResponse(CompletableFuture<Object> resultFuture,
                      String configKey,
                      Response response,
                      Type returnType,
                      long elapsedTime) {
    // copied fairly liberally from SynchronousMethodHandler
    boolean shouldClose = true;

    try {
      if (logLevel != Level.NONE) {
        response = logger.logAndRebufferResponse(configKey, logLevel, response,
            elapsedTime);
      }
      if (Response.class == returnType) {
        if (response.body() == null) {
          resultFuture.complete(response);
        } else if (response.body().length() == null
            || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
          shouldClose = false;
          resultFuture.complete(response);
        } else {
          // Ensure the response body is disconnected
          final byte[] bodyData = Util.toByteArray(response.body().asInputStream());
          resultFuture.complete(response.toBuilder().body(bodyData).build());
        }
      } else if (response.status() >= 200 && response.status() < 300) {
        if (isVoidType(returnType)) {
          resultFuture.complete(null);
        } else {
          final Object result = decode(response, returnType);
          shouldClose = closeAfterDecode;
          resultFuture.complete(result);
        }
      } else if (decode404 && response.status() == 404 && !isVoidType(returnType)) {
        final Object result = decode(response, returnType);
        shouldClose = closeAfterDecode;
        resultFuture.complete(result);
      } else {
        resultFuture.completeExceptionally(errorDecoder.decode(configKey, response));
      }
    } catch (final IOException e) {
      if (logLevel != Level.NONE) {
        logger.logIOException(configKey, logLevel, e, elapsedTime);
      }
      resultFuture.completeExceptionally(errorReading(response.request(), response, e));
    } catch (final Exception e) {
      resultFuture.completeExceptionally(e);
    } finally {
      if (shouldClose) {
        ensureClosed(response.body());
      }
    }
  }

ErrorDecoder的默认实现是根据返回的response header中的Retry-After值判断要不要重试及重试间隔。源码:

public interface ErrorDecoder {

  public Exception decode(String methodKey, Response response);

  public static class Default implements ErrorDecoder {

    private final RetryAfterDecoder retryAfterDecoder = new RetryAfterDecoder();

    @Override
    public Exception decode(String methodKey, Response response) {
      FeignException exception = errorStatus(methodKey, response);
      Date retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER));
      if (retryAfter != null) {
        return new RetryableException(
            response.status(),
            exception.getMessage(),
            response.request().httpMethod(),
            exception,
            retryAfter,
            response.request());
      }
      return exception;
    }

所以Feign配置重试机制需要做到两点:
1. 创建并注入Retryer开启重试
2. 自定义ErrorDecoder并注入,根据response定义规则何时需要重试(抛出RetryableException即触发重试),下面表示只要status不是2开头就需要重试(方法返回值类型是Response的除外)

public class FeignRetryConfig extends FeignConfiguration {
    @Bean
    public Retryer getRetryer() {
        return new Retryer.Default(100, SECONDS.toMillis(1), 3);
    }

    @Bean
    public ErrorDecoder getErrorDecoder() {
        return new CustomErrorDecoder();
    }

    public static class CustomErrorDecoder implements ErrorDecoder {

        @Override
        public Exception decode(String methodKey, Response response) {
            FeignException exception = errorStatus(methodKey, response);
            return new RetryableException(
                    response.status(),
                    exception.getMessage(),
                    response.request().httpMethod(),
                    exception,
                    null,
                    response.request());
        }
    }
}

feign基础配置,可以添加拦截器对请求做预处理,如添加auth请求头等

public class FeignConfiguration {

    @Bean
    public CustomInterceptor feignInterceptor() {
        return new CustomInterceptor();
    }

    public class CustomInterceptor implements RequestInterceptor {

        @Override
        public void apply(RequestTemplate template) {
            // template.header(AUTHENTICATION_HEADER_NAME, getToken());
        }

    }

}

FeignClient可以通过指定不同的配置类来确定是否需要重试,不同的client互不干扰

@FeignClient(name = "user-client", url = "${feign.user-service.url}", configuration = FeignRetryConfig.class)
public interface UserClient {

    @PostMapping(value = "/users", consumes = MediaType.APPLICATION_JSON_VALUE)
    List<User> getUsers(UserRequest request);
}

@FeignClient(name = "company-client", url = "${feign.company-service.url}", configuration = FeignConfiguration.class)
public interface CompanyClient {

    @PostMapping(value = "/companies", consumes = MediaType.APPLICATION_JSON_VALUE)
    List<Company> getCompanies(CompanyRequest request);
}

也可以通过配置文件的形式配置重试

feign:
  client:
    config:
      user-client:
        retryer: feign.Retryer.Default
        errorDecoder: xxx.config.FeignRetryConfig.CustomErrorDecoder

配置项类源码:FeignClientProperties.FeignClientConfiguration

public static class FeignClientConfiguration {

        private Logger.Level loggerLevel;

        private Integer connectTimeout;

        private Integer readTimeout;

        private Class<Retryer> retryer;

        private Class<ErrorDecoder> errorDecoder;

        private List<Class<RequestInterceptor>> requestInterceptors;

        private Map<String, Collection<String>> defaultRequestHeaders;

        private Map<String, Collection<String>> defaultQueryParameters;

        private Boolean decode404;

        private Class<Decoder> decoder;

        private Class<Encoder> encoder;

        private Class<Contract> contract;

        private ExceptionPropagationPolicy exceptionPropagationPolicy;

        private List<Class<Capability>> capabilities;

        private MetricsProperties metrics;

        private Boolean followRedirects;

使用RestTemplate做请求客户端

RestTemplate并没有自带重试机制,实现重试有两种方法,一是引入spring-retry模块,二是使用拦截器

使用spring-retry

自定义重试异常

public class RetryableException extends RuntimeException {
    public RetryableException(String message) {
        super(message);
    }
}

重试机制设置

@Configuration
public class RetryConfig {

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // 定义重试策略:仅当抛出RetryableException时重试,最大重试次数为3次
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3,
                Collections.singletonMap(RetryableException.class, true));
        retryTemplate.setRetryPolicy(retryPolicy);

        // 设置固定的重试间隔:每次重试间隔2秒
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(2000); // 2秒
        retryTemplate.setBackOffPolicy(backOffPolicy);

        return retryTemplate;
    }
}

请求调用

    public String sendRequestWithRetry(String url) {
        return retryTemplate.execute(context -> {
            // 使用RestTemplate发送请求
            String response = restTemplate.getForObject(url, String.class);

            // 根据响应内容判断是否需要重试
            if (isResponseInvalid(response)) {
                // 如果响应不符合预期,抛出异常以触发重试
                throw new RetryableException("Invalid response, need to retry");
            }

            // 响应正常,返回结果
            return response;
        }, context -> {
            // 重试耗尽后的回调,可以在这里处理失败情况
            return "Default response after retries exhausted";
        });
    }

    private boolean isResponseInvalid(String response) {
        // 根据实际业务逻辑判断响应是否有效
        // 例如,检查响应内容是否为空,状态码是否为特定值等
        return response == null || response.contains("error");
    }

使用拦截器做请求重试

拦截器代码

public class RetryInterceptor implements ClientHttpRequestInterceptor {

    private final int maxAttempts;
    private final long retryInterval;

    public RetryInterceptor(int maxAttempts, long retryInterval) {
        this.maxAttempts = maxAttempts;
        this.retryInterval = retryInterval;
    }

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        int attempt = 0;
        while (attempt < maxAttempts) {
            try {
                // 尝试执行请求
                ClientHttpResponse response = execution.execute(request, body);
                
                // 检查响应状态码或内容是否符合要求
                if (isResponseValid(response)) {
                    // 如果响应有效,直接返回
                    return response;
                } else {
                    // response可能需要手动释放 response.close()
                    System.out.println("Invalid response, retrying...");
                }
            } catch (RestClientException e) {
                System.out.println("Request failed: " + e.getMessage());
            }

            attempt++;
            if (attempt < maxAttempts) {
                try {
                    Thread.sleep(retryInterval);  // 等待重试间隔
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Thread was interrupted", e);
                }
            }
        }

        throw new RestClientException("Request failed after maximum retry attempts");
    }

    private boolean isResponseValid(ClientHttpResponse response) throws IOException {
        // 根据实际需求判断响应是否有效
        // 例如,检查响应状态码是否为200或检查响应内容
        return response.getStatusCode().is2xxSuccessful();
    }
}

restTemplate设置

@Configuration
public class AppConfig {

    @Bean
    public RestTemplate restTemplate() {
        RestTemplate restTemplate = new RestTemplate();

        // 创建拦截器实例
        RetryInterceptor retryInterceptor = new RetryInterceptor(3, 2000);

        // 将拦截器添加到RestTemplate中
        List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>(restTemplate.getInterceptors());
        interceptors.add(retryInterceptor);
        restTemplate.setInterceptors(interceptors);

        return restTemplate;
    }
}

注意拦截器和请求调用的顺序,InterceptingClientHttpRequest.java:

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
    ...
    private class InterceptingRequestExecution implements ClientHttpRequestExecution {
        private final Iterator<ClientHttpRequestInterceptor> iterator;

        public InterceptingRequestExecution() {
            this.iterator = InterceptingClientHttpRequest.this.interceptors.iterator();
        }

        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            } else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> {
                    delegate.getHeaders().addAll(key, value);
                });
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
                        streamingOutputMessage.setBody((outputStream) -> {
                            StreamUtils.copy(body, outputStream);
                        });
                    } else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }

                return delegate.execute();
            }
        }
    }
    ...
}

使用拦截器实现重试机制和使用retry做重试,重试时机略有不同,使用拦截器做重试时执行时机取决于拦截器顺序

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,427评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,551评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,747评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,939评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,955评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,737评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,448评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,352评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,834评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,992评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,133评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,815评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,477评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,022评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,147评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,398评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,077评论 2 355

推荐阅读更多精彩内容