1. 服务容错概述与背景
1.1 微服务架构下的挑战
在微服务架构中,服务之间通过远程调用进行通信,这种分布式特性带来了新的挑战:
- 服务雪崩效应:一个服务故障可能引发连锁反应,导致整个系统崩溃
- 网络不可靠性:网络延迟、超时、连接中断等问题难以避免
- 资源竞争:多个服务竞争有限的系统资源
- 性能波动:不同服务的响应时间存在差异
1.2 容错模式的重要性
服务容错通过一系列设计模式来增强系统的韧性:
- 断路器模式:防止重复调用可能失败的服务
- 限流模式:控制资源使用,防止系统过载
- 重试模式:智能地重试失败的操作
- 超时控制:避免长时间等待无响应的服务
- 隔离模式:将故障隔离在特定范围内
1.3 Resilience4j 简介
Resilience4j 是一个轻量级、易用的容错库,专门为 Java 8 和函数式编程设计。与 Netflix Hystrix 相比,Resilience4j 具有以下优势:
- 零依赖,体积更小
- 函数式编程支持更好
- 模块化设计,按需引入
- 与 Spring Boot 深度集成
2. Resilience4j 核心模块详解
2.1 限流 ratelimiter
基本原理
限流用于控制系统的请求处理速率,防止系统过载。常见的限流算法包括:
- 令牌桶:以固定速率向桶中添加令牌,请求需要消耗令牌才能被处理。
- 漏桶:请求以固定速率处理,超出部分排队或拒绝。
- 计数器:在固定时间窗口内限制请求数量。
resilience4j:
ratelimiter:
instances:
apiService:
limitForPeriod: 100 # 周期内允许的请求数
limitRefreshPeriod: 1s # 令牌刷新周期
timeoutDuration: 0 # 等待令牌的超时时间
allowHealthIndicator: true # 启用健康检查
subscribeForEvents: true # 订阅事件
eventConsumerBufferSize: 50 # 事件缓冲区大小
registerHealthIndicator: true # 注册健康指示器
| 配置属性 | 默认值 | 描述 |
|---|---|---|
| timeoutDuration | 5【s】 | 一个线程等待许可的默认等待时间 |
| limitRefreshPeriod | 500【ns】 | 限制刷新的周期。在每个周期之后,速率限制器将其权限计数设置回 limitForPeriod 值 |
| limitForPeriod | 50 | 一个 limitRefreshPeriod (周期)允许访问的数量(许可数量) |
2.2 重试 retry
注意指定需要重试的异常,不是所有的异常重试都有效。比如 DB 相关校验异常,如唯一约束等,重试也不会成功的。
重试策略类型
// 1. 固定间隔重试
RetryConfig fixedConfig = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofSeconds(1))
.build();
// 2. 指数退避重试
RetryConfig exponentialConfig = RetryConfig.custom()
.maxAttempts(5)
.intervalFunction(IntervalFunction.ofExponentialBackoff(
Duration.ofSeconds(1), 2.0)) // 初始1s,倍数2
.build();
// 3. 随机间隔重试
RetryConfig randomConfig = RetryConfig.custom()
.maxAttempts(4)
.intervalFunction(IntervalFunction.ofRandomized(
Duration.ofSeconds(1), 0.5)) // 基础1s,随机因子0.5
.build();
完整配置示例
resilience4j:
retry:
instances:
backendService:
maxAttempts: 3 # 最大重试次数
waitDuration: 500ms # 重试间隔
enableExponentialBackoff: true # 启用指数退避
exponentialBackoffMultiplier: 2 # 退避乘数
exponentialMaxWaitDuration: 5s # 最大等待时间
# 重试条件配置
retryExceptions:
- org.springframework.web.client.ResourceAccessException
- java.util.concurrent.TimeoutException
- org.springframework.dao.TransientDataAccessException
# 忽略的异常
ignoreExceptions:
- org.springframework.web.client.HttpClientErrorException.NotFound
- javax.validation.ValidationException
# 结果断言重试
resultPredicate: |
{ result ->
result == null ||
(result instanceof ResponseEntity &&
((ResponseEntity) result).getStatusCode().is5xxServerError())
}
| 配置属性 | 默认值 | 描述 |
|---|---|---|
| maxAttempts | 3 | 最大重试次数(包括第一次) |
| waitDuration | 500【ms】 | 两次重试之间的等待间隔 |
| intervalFunction | numOfAttempts -> waitDuration | 修改失败后等待间隔的函数。默认情况下,等待时间是个常量。 |
| retryOnResultPredicate | result->false | 配置一个判断结果是否应该重试的 predicate 函数。如果结果应该重试,Predicate 必须返回 true,否则它必须返回 false。 |
| retryExceptionPredicate | throwable -> true | 和 retryOnResultPredicate 类似,如果要重试,Predicate 必须返回true,否则返回 false。 |
| retryExceptions | 空 | 需要重试的异常类型列表 |
| ignoreExceptions | 空 | 不需要重试的异常类型列表 |
| failAfterMaxAttempts | false | 当重试达到配置的 maxAttempts 并且结果仍未通过 retryOnResultPredicate 时启用或禁用抛出 MaxRetriesExceededException 的布尔值 |
| intervalBiFunction | (numOfAttempts, Either<throwable, result>) -> waitDuration | 根据 maxAttempts 和结果或异常修改失败后等待间隔时间的函数。与 intervalFunction 一起使用时会抛出 IllegalStateException。 |
2.3 超时 TimeLimiter
超时配置:
resilience4j:
timelimiter:
instances:
slowService:
timeoutDuration: 3s # 超时时间
cancelRunningFuture: true # 是否取消正在执行的Future
- 超时配置比较简单,主要是配置 timeoutDuration 也就是超时的时间。
- cancelRunningFuture 的意思是:是否应该在运行的 Future 调用 cancel 去掉调用。
2.4 断路器 circuitbreaker

image.png
工作原理
- 断路器有几种状态:关闭、打开、半开。注意:打开,意味着不能访问,会迅速失败。
- CircuitBreaker 使用滑动窗口来存储和汇总调用结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。基于计数的滑动窗口聚合最后 N 次调用的结果。基于时间的滑动窗口聚合了最后 N 秒的调用结果。
- 断路器通过状态机来保护系统:
// 断路器状态转换
CLOSED → OPEN (当失败率超过阈值)
OPEN → HALF_OPEN (经过等待时间后)
HALF_OPEN → CLOSED (当测试调用成功时)
HALF_OPEN → OPEN (当测试调用失败时)
| 配置属性 | 默认值 | 描述 |
|---|---|---|
| slidingWindowSize | 100 | 记录断路器关闭状态下(可以访问的情况下)的调用的滑动窗口大小 |
| failureRateThreshold | 50(百分比) | 当失败比例超过 failureRateThreshold 的时候,断路器会打开,并开始短路呼叫 |
| slowCallDurationThreshold | 60000【ms】 | 请求被定义为慢请求的阈值 |
| slowCallRateThreshold | 100(百分比) | 慢请求百分比大于等于该值时,打开断路器开关 |
| permittedNumberOfCalls | 10 | 半开状态下允许通过的请求数 |
| maxWaitDurationInHalfOpenState | 0 | 配置最大等待持续时间,该持续时间控制断路器在切换到打开之前可以保持在半开状态的最长时间。 |
值 0 表示断路器将在 HalfOpen 状态下无限等待,直到所有允许的调用都已完成。
2.5 壁仓 bulkhead
resilience4j 提供了两种实现壁仓的方法:
- SemaphoreBulkhead: 使用 Semaphore 实现
- FixedThreadPoolBulkhead: 使用有界队列和固定线程池实现
resilience4j.bulkhead:
instances:
backendA:
maxConcurrentCalls: 10
backendB:
maxWaitDuration: 10ms # 最大等待时间
maxConcurrentCalls: 20 # 最大并发调用数
resilience4j.thread-pool-bulkhead:
instances:
backendC:
maxThreadPoolSize: 1
coreThreadPoolSize: 1
queueCapacity: 1
2.5.1 SemaphoreBulkhead
| 配置属性 | 默认值 | 描述 |
|---|---|---|
| maxConcurrentCalls | 25 | 允许的并发执行的数量 |
| maxWaitDuration | 0 | 尝试进入饱和隔板时线程应被阻止的最长时间 |
2.5.2 FixedThreadPoolBulkhead
| 配置属性 | 默认值 | 描述 |
|---|---|---|
| maxThreadPoolSize | Runtime.getRuntime().availableProcessors() | 线程池最大线程个数 |
| coreThreadPoolSize | Runtime.getRuntime().availableProcessors()-1 | 线程池核心线程个数 |
| queueCapacity | 100 | 线程池队列容量 |
| keepAliveDuration | 20【ms】 | 线程数超过核心线程数之后,空余线程在终止之前等待的最长时间 |
3. Spring Boot 集成实战
3.1 项目配置与依赖
Maven 依赖配置
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Resilience4j 核心 -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.7.0</version>
</dependency>
<!-- Spring Boot 集成 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- 指标监控 -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-micrometer</artifactId>
<version>1.7.0</version>
</dependency>
<!-- 反应式编程支持 -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
应用配置
spring:
application:
name: resilience4j-demo
management:
endpoints:
web:
exposure:
include: health,metrics,circuitbreakers
endpoint:
health:
show-details: always
metrics:
enabled: true
logging:
level:
io.github.resilience4j: DEBUG
3.2 基础使用模式
注解方式使用
@Service
@Slf4j
public class UserService {
private final RestTemplate restTemplate;
private final ObjectMapper objectMapper;
public UserService(RestTemplate restTemplate, ObjectMapper objectMapper) {
this.restTemplate = restTemplate;
this.objectMapper = objectMapper;
}
/**
* 综合容错保护 - 用户信息查询
*/
@CircuitBreaker(name = "userService", fallbackMethod = "getUserFallback")
@Retry(name = "userService", fallbackMethod = "getUserFallback")
@RateLimiter(name = "userService", fallbackMethod = "getUserRateLimitFallback")
@TimeLimiter(name = "userService", fallbackMethod = "getUserTimeoutFallback")
@Bulkhead(name = "userService", fallbackMethod = "getUserBulkheadFallback")
public CompletableFuture<UserDTO> getUserById(Long userId) {
log.info("查询用户信息: {}", userId);
return CompletableFuture.supplyAsync(() -> {
String url = String.format("http://user-service/users/%d", userId);
ResponseEntity<String> response = restTemplate.getForEntity(url, String.class);
if (response.getStatusCode().is2xxSuccessful()) {
return objectMapper.readValue(response.getBody(), UserDTO.class);
} else {
throw new UserServiceException("用户服务响应异常: " + response.getStatusCode());
}
});
}
// 断路器降级方法
public CompletableFuture<UserDTO> getUserFallback(Long userId, Exception e) {
log.warn("用户服务断路器打开,使用降级数据,用户ID: {}", userId, e);
return CompletableFuture.completedFuture(
UserDTO.builder()
.id(userId)
.name("降级用户")
.email("fallback@example.com")
.build()
);
}
// 限流降级方法
public CompletableFuture<UserDTO> getUserRateLimitFallback(Long userId, Exception e) {
log.warn("用户服务限流触发,用户ID: {}", userId);
return CompletableFuture.completedFuture(
UserDTO.builder()
.id(userId)
.name("限流用户")
.email("rate-limited@example.com")
.build()
);
}
// 超时降级方法
public CompletableFuture<UserDTO> getUserTimeoutFallback(Long userId, TimeoutException e) {
log.warn("用户服务调用超时,用户ID: {}", userId);
return CompletableFuture.completedFuture(
UserDTO.builder()
.id(userId)
.name("超时用户")
.email("timeout@example.com")
.build()
);
}
// 舱壁隔离降级方法
public CompletableFuture<UserDTO> getUserBulkheadFallback(Long userId, BulkheadFullException e) {
log.warn("用户服务舱壁已满,用户ID: {}", userId);
return CompletableFuture.completedFuture(
UserDTO.builder()
.id(userId)
.name("隔离用户")
.email("bulkhead@example.com")
.build()
);
}
}
编程式使用
@Service
@Slf4j
public class OrderService {
private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final RateLimiter rateLimiter;
private final Bulkhead bulkhead;
private final TimeLimiter timeLimiter;
private final OrderRepository orderRepository;
private final PaymentService paymentService;
public OrderService(CircuitBreakerRegistry circuitBreakerRegistry,
RetryRegistry retryRegistry,
RateLimiterRegistry rateLimiterRegistry,
BulkheadRegistry bulkheadRegistry,
TimeLimiterRegistry timeLimiterRegistry,
OrderRepository orderRepository,
PaymentService paymentService) {
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("orderService");
this.retry = retryRegistry.retry("orderService");
this.rateLimiter = rateLimiterRegistry.rateLimiter("orderService");
this.bulkhead = bulkheadRegistry.bulkhead("orderService");
this.timeLimiter = timeLimiterRegistry.timeLimiter("orderService");
this.orderRepository = orderRepository;
this.paymentService = paymentService;
}
/**
* 创建订单 - 编程式容错保护
*/
public CompletableFuture<Order> createOrder(CreateOrderRequest request) {
// 组合多个容错器
Supplier<CompletableFuture<Order>> orderSupplier = () ->
CompletableFuture.supplyAsync(() -> createOrderInternal(request));
// 应用容错保护
Supplier<CompletableFuture<Order>> decoratedSupplier = Decorators.ofSupplier(orderSupplier)
.withCircuitBreaker(circuitBreaker)
.withRetry(retry)
.withRateLimiter(rateLimiter)
.withBulkhead(bulkhead)
.withTimeLimiter(timeLimiter, ScheduledExecutorService.newSingleThreadScheduledExecutor())
.decorate();
try {
return decoratedSupplier.get();
} catch (Exception e) {
log.error("创建订单失败", e);
return CompletableFuture.completedFuture(createFallbackOrder(request));
}
}
private Order createOrderInternal(CreateOrderRequest request) {
log.info("开始创建订单: {}", request);
// 验证库存
validateInventory(request.getItems());
// 处理支付
PaymentResult paymentResult = paymentService.processPayment(request.getPaymentInfo());
if (!paymentResult.isSuccess()) {
throw new PaymentException("支付处理失败: " + paymentResult.getMessage());
}
// 创建订单
Order order = Order.builder()
.userId(request.getUserId())
.items(request.getItems())
.totalAmount(calculateTotalAmount(request.getItems()))
.paymentId(paymentResult.getPaymentId())
.status(OrderStatus.CREATED)
.createdAt(LocalDateTime.now())
.build();
return orderRepository.save(order);
}
private Order createFallbackOrder(CreateOrderRequest request) {
return Order.builder()
.userId(request.getUserId())
.items(request.getItems())
.totalAmount(calculateTotalAmount(request.getItems()))
.status(OrderStatus.FAILED)
.createdAt(LocalDateTime.now())
.fallback(true)
.build();
}
// 其他辅助方法...
private void validateInventory(List<OrderItem> items) {
// 库存验证逻辑
}
private BigDecimal calculateTotalAmount(List<OrderItem> items) {
return items.stream()
.map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
}
4. 高级特性与定制化
4.1 自定义容错策略
自定义断路器配置
@Configuration
@Slf4j
public class CustomResilienceConfig {
/**
* 自定义断路器配置
*/
@Bean
public CircuitBreakerConfigCustomizer customCircuitBreakerConfig() {
return CircuitBreakerConfigCustomizer
.of("customService", builder -> builder
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED)
.slidingWindowSize(60) // 60秒窗口
.minimumNumberOfCalls(5) // 最少5次调用
.failureRateThreshold(30.0f) // 30%失败率
.waitDurationInOpenState(Duration.ofSeconds(30))
.permittedNumberOfCallsInHalfOpenState(3)
.recordException(throwable -> {
// 自定义异常记录逻辑
log.debug("记录断路器异常: {}", throwable.getMessage());
return throwable instanceof RuntimeException &&
!(throwable instanceof IllegalArgumentException);
})
.ignoreExceptions(IllegalArgumentException.class)
);
}
/**
* 自定义重试配置 - 带指数退避和随机抖动
*/
@Bean
public RetryConfigCustomizer customRetryConfig() {
return RetryConfigCustomizer
.of("customService", builder -> builder
.maxAttempts(5)
.intervalFunction(IntervalFunction.ofExponentialRandomBackoff(
Duration.ofSeconds(1), // 初始间隔
2.0, // 乘数
0.5 // 随机因子
))
.retryOnException(throwable -> {
// 自定义重试条件
return throwable instanceof TransientDataAccessException ||
throwable instanceof ResourceAccessException;
})
.failAfterMaxAttempts(true) // 达到最大重试次数后抛出异常
);
}
/**
* 自定义限流配置
*/
@Bean
public RateLimiterConfigCustomizer customRateLimiterConfig() {
return RateLimiterConfigCustomizer
.of("customService", builder -> builder
.limitForPeriod(50)
.limitRefreshPeriod(Duration.ofSeconds(10))
.timeoutDuration(Duration.ofSeconds(5))
);
}
}
自定义事件监听器
@Component
@Slf4j
public class ResilienceEventListener {
private final MeterRegistry meterRegistry;
public ResilienceEventListener(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
/**
* 断路器事件监听
*/
@EventListener
public void onCircuitBreakerEvent(CircuitBreakerOnStateTransitionEvent event) {
String circuitBreakerName = event.getCircuitBreakerName();
CircuitBreaker.StateTransition stateTransition = event.getStateTransition();
log.info("断路器状态变更: {} -> {}", circuitBreakerName, stateTransition);
// 记录指标
Counter.builder("circuitbreaker.state.transitions")
.tag("name", circuitBreakerName)
.tag("from", stateTransition.getFromState().name())
.tag("to", stateTransition.getToState().name())
.register(meterRegistry)
.increment();
}
@EventListener
public void onCircuitBreakerCallNotPermitted(CircuitBreakerOnCallNotPermittedEvent event) {
log.warn("断路器阻止调用: {}", event.getCircuitBreakerName());
Counter.builder("circuitbreaker.calls.not_permitted")
.tag("name", event.getCircuitBreakerName())
.register(meterRegistry)
.increment();
}
/**
* 重试事件监听
*/
@EventListener
public void onRetryEvent(RetryOnRetryEvent event) {
log.info("重试事件 - 名称: {}, 重试次数: {}, 异常: {}",
event.getName(),
event.getNumberOfRetryAttempts(),
event.getLastThrowable().getMessage());
}
@EventListener
public void onRetrySuccess(RetryOnSuccessEvent event) {
log.info("重试成功 - 名称: {}, 重试次数: {}",
event.getName(),
event.getNumberOfRetryAttempts());
}
/**
* 限流事件监听
*/
@EventListener
public void onRateLimiterEvent(RateLimiterOnFailureEvent event) {
log.warn("限流触发 - 名称: {}, 等待时间: {}",
event.getRateLimiterName(),
event.getWaitDuration());
}
/**
* 舱壁事件监听
*/
@EventListener
public void onBulkheadEvent(BulkheadOnCallRejectedEvent event) {
log.warn("舱壁拒绝调用 - 名称: {}", event.getBulkheadName());
Counter.builder("bulkhead.calls.rejected")
.tag("name", event.getBulkheadName())
.register(meterRegistry)
.increment();
}
}
4.2 响应式编程支持
@Service
@Slf4j
public class ReactiveUserService {
private final WebClient webClient;
private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final TimeLimiter timeLimiter;
public ReactiveUserService(WebClient.Builder webClientBuilder,
CircuitBreakerRegistry circuitBreakerRegistry,
RetryRegistry retryRegistry,
TimeLimiterRegistry timeLimiterRegistry) {
this.webClient = webClientBuilder.baseUrl("http://user-service").build();
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("reactiveUserService");
this.retry = retryRegistry.retry("reactiveUserService");
this.timeLimiter = timeLimiterRegistry.timeLimiter("reactiveUserService");
}
/**
* 响应式用户查询 - 带容错保护
*/
public Mono<UserDTO> findUserReactive(Long userId) {
Mono<UserDTO> userMono = webClient.get()
.uri("/users/{id}", userId)
.retrieve()
.bodyToMono(UserDTO.class)
.timeout(Duration.ofSeconds(3))
.doOnSuccess(user -> log.info("成功获取用户: {}", user.getId()))
.doOnError(error -> log.error("获取用户失败: {}", error.getMessage()));
// 应用容错保护
return Mono.defer(() -> userMono)
.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
.transformDeferred(RetryOperator.of(retry))
.timeout(Duration.ofSeconds(5))
.onErrorResume(throwable -> {
log.warn("用户查询降级,用户ID: {}", userId, throwable);
return Mono.just(createFallbackUser(userId));
});
}
/**
* 批量用户查询 - 带批量操作容错
*/
public Flux<UserDTO> findUsersReactive(List<Long> userIds) {
return Flux.fromIterable(userIds)
.flatMap(this::findUserReactive, 5) // 控制并发度
.onErrorContinue((throwable, userId) -> {
log.error("处理用户ID {} 时发生异常: {}", userId, throwable.getMessage());
});
}
/**
* 并行用户查询 - 带资源限制
*/
public Flux<UserDTO> findUsersParallel(List<Long> userIds) {
return Flux.fromIterable(userIds)
.parallel(3) // 并行度
.runOn(Schedulers.parallel())
.flatMap(this::findUserReactive)
.sequential()
.timeout(Duration.ofSeconds(10))
.onErrorResume(throwable -> {
log.error("并行查询失败", throwable);
return Flux.empty();
});
}
private UserDTO createFallbackUser(Long userId) {
return UserDTO.builder()
.id(userId)
.name("降级用户")
.email("fallback@example.com")
.fallback(true)
.build();
}
}
5. 监控与运维
5.1 Spring Boot Actuator 集成
management:
endpoints:
web:
exposure:
include: health,info,metrics,circuitbreakers,prometheus
endpoint:
health:
show-details: always
show-components: always
metrics:
enabled: true
prometheus:
enabled: true
metrics:
export:
prometheus:
enabled: true
tags:
application: ${spring.application.name}
environment: dev
resilience4j:
circuitbreaker:
metrics:
enabled: true
retry:
metrics:
enabled: true
ratelimiter:
metrics:
enabled: true
bulkhead:
metrics:
enabled: true
timelimiter:
metrics:
enabled: true
5.2 健康检查配置
@Component
@Slf4j
public class ResilienceHealthIndicator implements HealthIndicator {
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final RetryRegistry retryRegistry;
private final BulkheadRegistry bulkheadRegistry;
public ResilienceHealthIndicator(CircuitBreakerRegistry circuitBreakerRegistry,
RetryRegistry retryRegistry,
BulkheadRegistry bulkheadRegistry) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.retryRegistry = retryRegistry;
this.bulkheadRegistry = bulkheadRegistry;
}
@Override
public Health health() {
Map<String, Object> details = new HashMap<>();
// 断路器健康状态
details.put("circuitBreakers", getCircuitBreakerStatus());
// 重试器健康状态
details.put("retries", getRetryStatus());
// 舱壁健康状态
details.put("bulkheads", getBulkheadStatus());
// 总体健康状态
boolean isHealthy = isOverallHealthy(details);
return isHealthy ?
Health.up().withDetails(details).build() :
Health.down().withDetails(details).build();
}
private Map<String, Object> getCircuitBreakerStatus() {
Map<String, Object> status = new HashMap<>();
circuitBreakerRegistry.getAllCircuitBreakers().forEach((name, circuitBreaker) -> {
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
Map<String, Object> cbStatus = new HashMap<>();
cbStatus.put("state", circuitBreaker.getState().name());
cbStatus.put("failureRate", metrics.getFailureRate());
cbStatus.put("bufferedCalls", metrics.getNumberOfBufferedCalls());
cbStatus.put("failedCalls", metrics.getNumberOfFailedCalls());
cbStatus.put("notPermittedCalls", metrics.getNumberOfNotPermittedCalls());
status.put(name, cbStatus);
});
return status;
}
private Map<String, Object> getRetryStatus() {
Map<String, Object> status = new HashMap<>();
retryRegistry.getAllRetries().forEach((name, retry) -> {
Retry.Metrics metrics = retry.getMetrics();
Map<String, Object> retryStatus = new HashMap<>();
retryStatus.put("successfulCallsWithoutRetry", metrics.getNumberOfSuccessfulCallsWithoutRetryAttempt());
retryStatus.put("successfulCallsWithRetry", metrics.getNumberOfSuccessfulCallsWithRetryAttempt());
retryStatus.put("failedCallsWithoutRetry", metrics.getNumberOfFailedCallsWithoutRetryAttempt());
retryStatus.put("failedCallsWithRetry", metrics.getNumberOfFailedCallsWithRetryAttempt());
status.put(name, retryStatus);
});
return status;
}
private Map<String, Object> getBulkheadStatus() {
Map<String, Object> status = new HashMap<>();
bulkheadRegistry.getAllBulkheads().forEach((name, bulkhead) -> {
Bulkhead.Metrics metrics = bulkhead.getMetrics();
Map<String, Object> bhStatus = new HashMap<>();
bhStatus.put("availableConcurrentCalls", metrics.getAvailableConcurrentCalls());
bhStatus.put("maxAllowedConcurrentCalls", metrics.getMaxAllowedConcurrentCalls());
status.put(name, bhStatus);
});
return status;
}
private boolean isOverallHealthy(Map<String, Object> details) {
// 检查是否有断路器处于OPEN状态
@SuppressWarnings("unchecked")
Map<String, Object> circuitBreakers = (Map<String, Object>) details.get("circuitBreakers");
for (Object cbStatus : circuitBreakers.values()) {
@SuppressWarnings("unchecked")
Map<String, Object> status = (Map<String, Object>) cbStatus;
if ("OPEN".equals(status.get("state"))) {
return false;
}
}
return true;
}
}
5.3 自定义指标收集
@Component
@Slf4j
public class ResilienceMetricsCollector {
private final MeterRegistry meterRegistry;
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final Map<String, Timer> circuitBreakerTimers = new ConcurrentHashMap<>();
public ResilienceMetricsCollector(MeterRegistry meterRegistry,
CircuitBreakerRegistry circuitBreakerRegistry) {
this.meterRegistry = meterRegistry;
this.circuitBreakerRegistry = circuitBreakerRegistry;
initializeMetrics();
}
private void initializeMetrics() {
// 初始化断路器指标
circuitBreakerRegistry.getAllCircuitBreakers().forEach((name, circuitBreaker) -> {
// 断路器状态指标
Gauge.builder("resilience4j.circuitbreaker.state")
.description("Circuit breaker state")
.tag("name", name)
.register(meterRegistry, cb -> {
CircuitBreaker.State state = circuitBreaker.getState();
return state.ordinal(); // CLOSED=0, OPEN=1, HALF_OPEN=2, DISABLED=3, FORCED_OPEN=4
});
// 调用耗时计时器
Timer timer = Timer.builder("resilience4j.circuitbreaker.calls.duration")
.description("Duration of circuit breaker calls")
.tag("name", name)
.register(meterRegistry);
circuitBreakerTimers.put(name, timer);
});
// 自定义业务指标
Counter.builder("resilience4j.business.fallback.calls")
.description("Number of fallback calls")
.tag("application", "user-service")
.register(meterRegistry);
}
/**
* 记录断路器调用耗时
*/
public void recordCircuitBreakerCall(String circuitBreakerName, long duration, TimeUnit unit) {
Timer timer = circuitBreakerTimers.get(circuitBreakerName);
if (timer != null) {
timer.record(duration, unit);
}
}
/**
* 记录降级调用
*/
public void recordFallbackCall(String serviceName) {
Counter.builder("resilience4j.business.fallback.calls")
.tag("service", serviceName)
.register(meterRegistry)
.increment();
}
}
6. 实战案例:电商系统容错设计
6.1 完整服务架构
@Service
@Slf4j
public class OrderProcessingService {
private final UserService userService;
private final ProductService productService;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final NotificationService notificationService;
private final CircuitBreaker orderCircuitBreaker;
private final Retry paymentRetry;
private final Bulkhead notificationBulkhead;
private final ResilienceMetricsCollector metricsCollector;
public OrderProcessingService(UserService userService,
ProductService productService,
InventoryService inventoryService,
PaymentService paymentService,
NotificationService notificationService,
CircuitBreakerRegistry circuitBreakerRegistry,
RetryRegistry retryRegistry,
BulkheadRegistry bulkheadRegistry,
ResilienceMetricsCollector metricsCollector) {
this.userService = userService;
this.productService = productService;
this.inventoryService = inventoryService;
this.paymentService = paymentService;
this.notificationService = notificationService;
this.metricsCollector = metricsCollector;
this.orderCircuitBreaker = circuitBreakerRegistry.circuitBreaker("orderProcessing");
this.paymentRetry = retryRegistry.retry("paymentProcessing");
this.notificationBulkhead = bulkheadRegistry.bulkhead("notification");
}
/**
* 处理订单 - 综合容错保护
*/
@Async
public CompletableFuture<OrderResult> processOrder(OrderRequest orderRequest) {
long startTime = System.currentTimeMillis();
Supplier<CompletableFuture<OrderResult>> orderProcessingSupplier =
() -> processOrderInternal(orderRequest);
// 构建容错装饰器
Supplier<CompletableFuture<OrderResult>> decoratedSupplier = Decorators
.ofSupplier(orderProcessingSupplier)
.withCircuitBreaker(orderCircuitBreaker)
.withRetry(paymentRetry)
.withBulkhead(notificationBulkhead)
.withTimeLimiter(Duration.ofSeconds(30))
.withFallback(Arrays.asList(Exception.class),
throwable -> handleOrderFallback(orderRequest, throwable))
.decorate();
return decoratedSupplier.get()
.whenComplete((result, throwable) -> {
long duration = System.currentTimeMillis() - startTime;
metricsCollector.recordCircuitBreakerCall(
"orderProcessing", duration, TimeUnit.MILLISECONDS);
if (throwable != null) {
log.error("订单处理失败: {}", orderRequest.getOrderId(), throwable);
} else {
log.info("订单处理成功: {}", orderRequest.getOrderId());
}
});
}
private CompletableFuture<OrderResult> processOrderInternal(OrderRequest orderRequest) {
return CompletableFuture.supplyAsync(() -> {
log.info("开始处理订单: {}", orderRequest.getOrderId());
// 1. 验证用户
UserDTO user = userService.getUserById(orderRequest.getUserId())
.orElseThrow(() -> new UserNotFoundException("用户不存在"));
// 2. 验证商品
List<ProductDTO> products = productService.getProductsByIds(
orderRequest.getProductIds());
if (products.size() != orderRequest.getProductIds().size()) {
throw new ProductNotFoundException("部分商品不存在");
}
// 3. 检查库存
inventoryService.checkInventory(orderRequest.getProductIds());
// 4. 扣减库存
inventoryService.deductInventory(orderRequest.getProductIds());
// 5. 处理支付
PaymentResult paymentResult = paymentService.processPayment(
orderRequest.getPaymentInfo());
if (!paymentResult.isSuccess()) {
// 支付失败,恢复库存
inventoryService.restoreInventory(orderRequest.getProductIds());
throw new PaymentException("支付失败: " + paymentResult.getMessage());
}
// 6. 创建订单
Order order = createOrder(orderRequest, user, products, paymentResult);
// 7. 发送通知(异步,不阻塞主流程)
sendNotificationsAsync(order, user);
return OrderResult.builder()
.orderId(order.getId())
.status(OrderStatus.SUCCESS)
.paymentId(paymentResult.getPaymentId())
.message("订单创建成功")
.build();
});
}
private CompletableFuture<OrderResult> handleOrderFallback(OrderRequest orderRequest,
Throwable throwable) {
log.warn("订单处理降级,订单ID: {}", orderRequest.getOrderId(), throwable);
metricsCollector.recordFallbackCall("orderProcessing");
OrderResult fallbackResult = OrderResult.builder()
.orderId(orderRequest.getOrderId())
.status(OrderStatus.FAILED)
.message("系统繁忙,请稍后重试")
.fallback(true)
.build();
return CompletableFuture.completedFuture(fallbackResult);
}
private void sendNotificationsAsync(Order order, UserDTO user) {
CompletableFuture.runAsync(() -> {
try {
// 使用舱壁保护通知服务
notificationBulkhead.executeRunnable(() -> {
notificationService.sendOrderConfirmation(user.getEmail(), order);
notificationService.sendSMS(user.getPhone(),
String.format("您的订单 %s 已创建成功", order.getId()));
});
} catch (Exception e) {
log.warn("发送通知失败,但不影响主流程", e);
}
});
}
private Order createOrder(OrderRequest orderRequest, UserDTO user,
List<ProductDTO> products, PaymentResult paymentResult) {
// 订单创建逻辑
return Order.builder()
.id(orderRequest.getOrderId())
.userId(user.getId())
.products(products)
.totalAmount(calculateTotalAmount(products))
.paymentId(paymentResult.getPaymentId())
.status(OrderStatus.CREATED)
.createdAt(LocalDateTime.now())
.build();
}
private BigDecimal calculateTotalAmount(List<ProductDTO> products) {
return products.stream()
.map(ProductDTO::getPrice)
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
}
6.2 全局异常处理
@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {
private final ResilienceMetricsCollector metricsCollector;
public GlobalExceptionHandler(ResilienceMetricsCollector metricsCollector) {
this.metricsCollector = metricsCollector;
}
/**
* 处理断路器阻止的调用
*/
@ExceptionHandler(CallNotPermittedException.class)
public ResponseEntity<ErrorResponse> handleCallNotPermitted(CallNotPermittedException e) {
log.warn("服务调用被断路器阻止: {}", e.getMessage());
metricsCollector.recordFallbackCall("circuitBreakerBlocked");
ErrorResponse errorResponse = ErrorResponse.builder()
.code("SERVICE_UNAVAILABLE")
.message("服务暂时不可用,请稍后重试")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(errorResponse);
}
/**
* 处理限流异常
*/
@ExceptionHandler(RequestNotPermitted.class)
public ResponseEntity<ErrorResponse> handleRequestNotPermitted(RequestNotPermitted e) {
log.warn("请求被限流: {}", e.getMessage());
ErrorResponse errorResponse = ErrorResponse.builder()
.code("RATE_LIMIT_EXCEEDED")
.message("请求过于频繁,请稍后重试")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(errorResponse);
}
/**
* 处理舱壁已满异常
*/
@ExceptionHandler(BulkheadFullException.class)
public ResponseEntity<ErrorResponse> handleBulkheadFull(BulkheadFullException e) {
log.warn("舱壁已满: {}", e.getMessage());
ErrorResponse errorResponse = ErrorResponse.builder()
.code("BULKHEAD_FULL")
.message("系统繁忙,请稍后重试")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(errorResponse);
}
/**
* 处理超时异常
*/
@ExceptionHandler(TimeoutException.class)
public ResponseEntity<ErrorResponse> handleTimeout(TimeoutException e) {
log.warn("请求超时: {}", e.getMessage());
ErrorResponse errorResponse = ErrorResponse.builder()
.code("REQUEST_TIMEOUT")
.message("请求处理超时,请稍后重试")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).body(errorResponse);
}
/**
* 处理所有其他异常
*/
@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleGenericException(Exception e) {
log.error("系统异常: {}", e.getMessage(), e);
ErrorResponse errorResponse = ErrorResponse.builder()
.code("INTERNAL_ERROR")
.message("系统内部错误")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorResponse);
}
}
7. 最佳实践与性能优化
7.1 配置优化建议
resilience4j:
circuitbreaker:
configs:
default:
slidingWindowSize: 100
minimumNumberOfCalls: 10
failureRateThreshold: 50
waitDurationInOpenState: 60s
permittedNumberOfCallsInHalfOpenState: 10
slowCallDurationThreshold: 2s
slowCallRateThreshold: 30
strict:
failureRateThreshold: 20
waitDurationInOpenState: 120s
lenient:
failureRateThreshold: 80
waitDurationInOpenState: 10s
retry:
configs:
default:
maxAttempts: 3
waitDuration: 500ms
aggressive:
maxAttempts: 5
waitDuration: 200ms
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
bulkhead:
configs:
default:
maxConcurrentCalls: 25
maxWaitDuration: 100ms
conservative:
maxConcurrentCalls: 10
maxWaitDuration: 500ms
ratelimiter:
configs:
default:
limitForPeriod: 50
limitRefreshPeriod: 1s
timeoutDuration: 100ms
strict:
limitForPeriod: 10
limitRefreshPeriod: 1s
7.2 性能监控与调优
@Configuration
@EnableConfigurationProperties(ResilienceProperties.class)
@Slf4j
public class ResilienceOptimizationConfig {
private final ResilienceProperties properties;
private final MeterRegistry meterRegistry;
public ResilienceOptimizationConfig(ResilienceProperties properties,
MeterRegistry meterRegistry) {
this.properties = properties;
this.meterRegistry = meterRegistry;
}
@Bean
@ConditionalOnMissingBean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerRegistry registry = CircuitBreakerRegistry.ofDefaults();
// 注册全局事件消费者
registry.getEventPublisher().onEntryAdded(entryAddedEvent -> {
CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry();
log.info("CircuitBreaker {} 已添加", addedCircuitBreaker.getName());
// 注册指标
registerCircuitBreakerMetrics(addedCircuitBreaker);
});
return registry;
}
@Bean
@ConditionalOnMissingBean
public RetryRegistry retryRegistry() {
return RetryRegistry.ofDefaults();
}
@Bean
@ConditionalOnMissingBean
public BulkheadRegistry bulkheadRegistry() {
return BulkheadRegistry.ofDefaults();
}
private void registerCircuitBreakerMetrics(CircuitBreaker circuitBreaker) {
// 成功率指标
Gauge.builder("resilience4j.circuitbreaker.success.rate")
.description("Circuit breaker success rate")
.tag("name", circuitBreaker.getName())
.register(meterRegistry, cb -> {
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
int totalCalls = metrics.getNumberOfSuccessfulCalls() +
metrics.getNumberOfFailedCalls();
return totalCalls > 0 ?
(double) metrics.getNumberOfSuccessfulCalls() / totalCalls * 100 : 100.0;
});
// 调用次数指标
Gauge.builder("resilience4j.circuitbreaker.calls.total")
.description("Total circuit breaker calls")
.tag("name", circuitBreaker.getName())
.register(meterRegistry,
cb -> circuitBreaker.getMetrics().getNumberOfBufferedCalls());
}
/**
* 定期清理不活跃的实例
*/
@Scheduled(fixedRate = 300000) // 5分钟
public void cleanUpInactiveInstances() {
CircuitBreakerRegistry registry = circuitBreakerRegistry();
Set<String> activeInstances = getActiveCircuitBreakerInstances();
registry.getAllCircuitBreakers().forEach((name, circuitBreaker) -> {
if (!activeInstances.contains(name) &&
circuitBreaker.getMetrics().getNumberOfBufferedCalls() == 0) {
registry.remove(name);
log.info("移除不活跃的 CircuitBreaker: {}", name);
}
});
}
private Set<String> getActiveCircuitBreakerInstances() {
// 实现获取当前活跃实例的逻辑
// 可以从配置中心、数据库或其他来源获取
return Collections.emptySet();
}
}
8. 总结
通过本文的详细讲解,我们全面掌握了 Resilience4j 在 Spring Boot 中的应用。从基础概念到高级特性,从简单使用到复杂场景的实战,我们学习了:
- 核心容错模式:断路器、重试、限流、舱壁隔离、超时控制
- 多种集成方式:注解式、编程式、响应式编程支持
- 高级特性:自定义配置、事件监听、指标收集
- 监控运维:健康检查、指标暴露、性能监控
- 最佳实践:配置优化、异常处理、性能调优
参考:
https://article.juejin.cn/post/7147654376314634253
https://www.jb51.net/program/34088688m.htm
https://blog.csdn.net/qq_41581588/article/details/154916817
https://cloud.tencent.com/developer/article/2134850