最近一个新的项目,选择的熔断降级框架是Resilience4j。以前自己了解的做熔断限流处理的框架有Hystrix和Sentinel,Resilience4j倒是第一次听说,因此特地学习学习。
简介
随着微服务的流行,熔断作为其中一项很重要的技术也广为人知。当微服务的运行质量低于某个临界值时,启动熔断机制,暂停微服务调用一段时间,以保障后端的微服务不会因为持续过负荷而宕机。
Hystrix官方已停止维护,官方推荐使用Resilience4j来替代Hystrix实现服务治理。作为新一代的熔断器,Resilience4j有很多优势,比如依赖少,模块化程度较好等优势。
Resilience4j是一款轻量级,易于使用的容错库,其灵感来自于Netflix Hystrix,但是专为Java 8和函数式编程而设计。轻量级,因为库只使用了Vavr,它没有任何其他外部依赖下。相比之下,Netflix Hystrix对Archaius具有编译依赖性,Archaius具有更多的外部库依赖性。
要使用Resilience4j,不需要引入所有依赖,只需要选择你需要的,Resilience4j提供了以下的核心模块和拓展模块:
- resilience4j-circuitbreaker: Circuit breaking(熔断器)
- resilience4j-ratelimiter: Rate limiting(限流器)
- resilience4j-bulkhead: Bulkheading(隔离器)
- resilience4j-retry: Automatic retrying (sync and async)(重试、同步&异步)
- resilience4j-cache: Result caching(缓存)
- resilience4j-timelimiter: Timeout handling(超时处理)
Circuitbreaker(熔断器)
CircuitBreaker通常具有三种正常状态的有限状态机实现:CLOSED,OPEN和HALF_OPEN以及两个特殊状态DISABLED和FORCED_OPEN。当熔断器关闭时,所有的请求都会通过熔断器。如果失败率超过设定的阈值,熔断器就会从关闭状态转换到打开状态,这时所有的请求都会被拒绝。当经过一段时间后,熔断器会从打开状态转换到半开状态,这时仅有一定数量的请求会被放入,并重新计算失败率,如果失败率超过阈值,则变为打开状态,如果失败率低于阈值,则变为关闭状态。
Resilience4j记录请求状态的数据结构和Hystrix不同,Hystrix是使用滑动窗口来进行存储的,而Resilience4j采用的是Ring Bit Buffer(环形缓冲区)。Ring Bit Buffer在内部使用BitSet这样的数据结构来进行存储,BitSet的结构如下图所示:
每一次请求的成功或失败状态只占用一个bit位,与boolean数组相比更节省内存。BitSet使用long[]数组来存储这些数据,意味着16个值(64bit)的数组可以存储1024个调用状态。
计算失败率需要填满环形缓冲区。例如,如果环形缓冲区的大小为10,则必须至少请求满10次,才会进行故障率的计算,如果仅仅请求了9次,即使9个请求都失败,熔断器也不会打开。但是CLOSE状态下的缓冲区大小设置为10并不意味着只会进入10个 请求,在熔断器打开之前的所有请求都会被放入。
当故障率高于设定的阈值时,熔断器状态会从由CLOSE变为OPEN。这时所有的请求都会抛出CallNotPermittedException异常。当经过一段时间后,熔断器的状态会从OPEN变为HALF_OPEN,HALF_OPEN状态下同样会有一个Ring Bit Buffer,用来计算HALF_OPEN状态下的故障率,如果高于配置的阈值,会转换为OPEN,低于阈值则装换为CLOSE。与CLOSE状态下的缓冲区不同的地方在于,HALF_OPEN状态下的缓冲区大小会限制请求数,只有缓冲区大小的请求数会被放入。
除此以外,熔断器还会有两种特殊状态:DISABLED(始终允许访问)和FORCED_OPEN(始终拒绝访问)。这两个状态不会生成熔断器事件(除状态装换外),并且不会记录事件的成功或者失败。退出这两个状态的唯一方法是触发状态转换或者重置熔断器。
熔断器关于线程安全的保证措施有以下几个部分:
- 熔断器的状态使用AtomicReference保存的
- 更新熔断器状态是通过无状态的函数或者原子操作进行的
- 更新事件的状态用synchronized关键字保护
意味着同一时间只有一个线程能够修改熔断器状态或者记录事件的状态。
可配置参数
配置属性 | 默认值 | 描述 |
---|---|---|
failureRateThreshold | 50 | 以百分比配置失败率阈值。当故障率等于或大于阈值时,CircuitBreaker 转换为打开并开始短路呼叫。 |
slowCallRateThreshold | 100 | 以百分比配置阈值。当呼叫持续时间大于等于或大于阈值时,CircuitBreaker 将呼叫视为慢速呼叫。 当慢速呼叫的百分比等于或大于阈值时,CircuitBreaker 转换为打开并开始短路呼叫。slowCallDurationThreshold
|
slowCallDurationThreshold | 60000(ms) | 配置持续时间阈值,超过该阈值呼叫被视为慢速并提高慢速呼叫率。 |
permittedNumberOfCallsInHalfOpenState | 10 | 配置 CircuitBreaker 半开时允许的调用次数。 |
maxWaitDurationInHalfOpenState | 0(ms) | 配置最大等待持续时间,该持续时间控制断路器在切换到打开之前可以保持在半开状态的最长时间。 值 0 表示断路器将在 HalfOpen 状态下无限等待,直到所有允许的调用都已完成。 |
slidingWindowType | COUNT_BASED | 配置用于在CircuitBreaker关闭时记录调用结果的滑动窗口类型。 滑动窗口可以是基于计数或基于时间的。 |
slidingWindowSize | 100 | 如果滑动窗口是 COUNT_BASED,则记录并汇总最后一次调用。 如果滑动窗口是TIME_BASED,则记录并汇总最后几秒的调用。slidingWindowSize slidingWindowSize 配置用于在 CircuitBreaker 关闭时记录调用结果的滑动窗口的大小。 |
minimumNumberOfCalls | 100 | 配置在 CircuitBreaker 计算错误率或慢速调用率之前所需的最小调用次数(每个滑动窗口周期)。 例如,如果 minimumNumberOfCalls 为 10,则必须记录至少 10 次呼叫,然后才能计算失败率。 如果仅记录了 9 个呼叫,即使所有 9 个呼叫都失败,CircuitBreaker 也不会转换为打开状态。 |
waitDurationInOpenState | 60000 (ms) | 断路器在从打开转换为半打开之前应等待的时间。 |
automaticTransitionFromOpenToHalfOpenEnabled | false | 如果设置为 true,则意味着 CircuitBreaker 将自动从打开状态转换为半打开状态,无需调用即可触发转换。一旦 waitDurationInOpenState 通过,就会创建一个线程来监视 CircuitBreakers 的所有实例以将它们转换为 HALF_OPEN。然而,如果设置为 false,则只有在调用时才会发生到 HALF_OPEN 的转换,即使在 waitDurationInOpenState 被传递之后也是如此。这里的优点是没有线程监视所有 CircuitBreaker 的状态。 |
recordExceptions | empty | 记录为失败并因此增加失败率的异常列表。 任何匹配或从列表之一继承的异常都算作失败,除非通过. 如果您指定异常列表,则所有其他异常都算成功,除非它们被明确忽略。 ignoreExceptions ignoreExceptions
|
ignoreExceptions | empty | 被忽略且既不视为失败也不视为成功的异常列表。 任何匹配或从列表之一继承的异常都不会被视为失败或成功,即使异常是. recordExceptions
|
recordFailurePredicate | throwable -> true 默认情况下,所有异常都记录为失败。 |
一个自定义谓词,用于评估是否应将异常记录为失败。 如果异常应计为失败,则谓词必须返回 true。 如果异常应计为成功,则谓词必须返回 false,除非异常被显式忽略。 ignoreExceptions
|
ignoreExceptionPredicate | throwable -> false 默认情况下不会忽略任何异常。 |
一个自定义谓词,用于评估是否应忽略异常并且既不计为失败也不计为成功。 如果应忽略异常,谓词必须返回 true。 如果异常应计为失败,则谓词必须返回 false。 |
测试demo
1.添加依赖
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.2.0</version>
</dependency>
resilience4j-spring-boot集成了circuitbeaker、retry、bulkhead、ratelimiter几个模块,就直接引入resilience4j-spring-boot依赖。
2.断路器配置(application-yml)
# "order"为断路器的名字
#熔断器关闭时的缓冲区大小
resilience4j:
circuitbreaker:
backends:
order:
ringBufferSizeInClosedState: 5 # 熔断器关闭时的缓冲区大小
ringBufferSizeInHalfOpenState: 3 # 熔断器半开时的缓冲区大小
waitDurationInOpenState: 5000 # 熔断器从打开到半开需要的时间
failure-rate-threshold: 60 # 熔断器打开的失败阈值
eventConsumerBufferSize: 10 # 事件缓冲区大小
registerHealthIndicator: true # 健康监测
automaticTransitionFromOpenToHalfOpenEnabled: false # 是否自动从打开到半开,不需要触发
3.使用注解方式实现断路器
package com.lx.cloud.demo.controller;
import com.lx.cloud.demo.entity.User;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Resilience4jController
* @author lx
*/
@RestController
@Slf4j
public class Resilience4jController {
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry;
@GetMapping("/circuitBreakerAOPTestNoFallbackMethod")
@CircuitBreaker(name = "order")
public User circuitBreakerAOPTestNoFallbackMethod() throws Exception{
throw new Exception("服务异常");
}
@GetMapping("/circuitBreakerAOPTest")
@CircuitBreaker(name = "order", fallbackMethod = "fallBack")
public User circuitBreakerAOPTest() throws Exception{
throw new Exception("服务异常");
}
private User fallBack(CallNotPermittedException e){
log.info("熔断器已经打开,拒绝访问被保护方法~");
io.github.resilience4j.circuitbreaker.CircuitBreaker order = circuitBreakerRegistry.circuitBreaker("order");
io.github.resilience4j.circuitbreaker.CircuitBreaker.Metrics metrics = order.getMetrics();
log.info("方法降级中:" + "state=" + order.getState() + " , metrics[ failureRate=" + metrics.getFailureRate() +
", bufferedCalls=" + metrics.getNumberOfBufferedCalls() +
", failedCalls=" + metrics.getNumberOfFailedCalls() +
", successCalls=" + metrics.getNumberOfSuccessfulCalls() +
", maxBufferCalls=" + metrics.getNumberOfBufferedCalls() +
", notPermittedCalls=" + metrics.getNumberOfNotPermittedCalls() +
" ]"
);
return new User("熔断测试", 0);
}
}
调用没有服务降级处理的方法circuitBreakerAOPTestNoFallbackMethod()时,熔断前服务调用结果:
{
"timestamp": "2022-06-01T02:48:25.998+0000",
"status": 500,
"error": "Internal Server Error",
"message": "CircuitBreaker 'order' is OPEN and does not permit further calls",
"trace": "io.github.resilience4j.circuitbreaker.CallNotPermittedException: CircuitBreaker 'order' is OPEN and does not permit further calls\n\tat ....后续太多,不做记录了可以自行实验",
"path": "/circuitBreakerAOPTestNoFallbackMethod"
}
调用有服务降级处理的方法circuitBreakerAOPTest()时,熔断后服务调用结果:
控制台输出如下:
熔断器已经打开,拒绝访问被保护方法~
方法降级中:state=OPEN , metrics[ failureRate=100.0, bufferedCalls=3, failedCalls=3, successCalls=0, maxBufferCalls=3, notPermittedCalls=1 ]
熔断器已经打开,拒绝访问被保护方法~
方法降级中:state=OPEN , metrics[ failureRate=100.0, bufferedCalls=3, failedCalls=3, successCalls=0, maxBufferCalls=3, notPermittedCalls=2 ]
RateLimiter(限流器)
高频控制是可以限制服务调用频率,Resilience4j的RateLimiter可以对频率进行纳秒级别的控制,在每一个周期刷新可以调用的次数,还可以设定线程等待权限的时间,一般用于服务提供方,保护自己不受到冲击。
可配置参数
配置参数 | 默认值 | 描述 |
---|---|---|
timeoutDuration | 5000[ms] | 线程等待权限的默认等待时间 |
limitRefreshPeriod | 500[ns] | 权限刷新的时间,每个周期结束后,RateLimiter将会把权限计数设置为limitForPeriod的值 |
limiteForPeriod | 50 | 一个限制刷新期间的可用权限数,也就是一个限制周期内可访问次数 |
测试demo
1.首先添加POM依赖。不需要引入新的依赖,已经集成在resilience4j-spring-boot2中了。
2.限流控制器配置(application.yml)
resilience4j:
ratelimiter:
configs:
default:
limitForPeriod: 5 # 一个限制周期内可访问次数
limitRefreshPeriod: 10000 # 限制周期,每个周期之后,速率限制器将重置回limitForPeriod值
timeoutDuration: 5000 # 线程等待允许执行时间
instances:
ratelimiterA: # 限流器的名字
baseConfig: default
limitForPeriod: 3 # 一个限制周期内可访问次数
ratelimiterB: # 限流器的名字
baseConfig: default
limitRefreshPeriod: 10s
3.使用注解的方式限流控制器
package com.lx.cloud.demo.controller;
import com.alibaba.fastjson.JSONObject;
import com.lx.cloud.demo.entity.User;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.concurrent.TimeoutException;
/**
* Resilience4jController
* @author lx
*/
@RestController
@Slf4j
public class Resilience4jController {
@Autowired
private RateLimiterRegistry rateLimiterRegistry;
/**
* 限流
* @return
* @throws Exception
*/
@GetMapping("/ratelimiterTestNoFallbackMethod")
@RateLimiter(name = "ratelimiterA")
public User ratelimiterTestNoFallbackMethod() throws Exception{
User user = new User("liuxiao", 27);
log.info(JSONObject.toJSONString(user) + new Date());
return user;
}
/**
* 限流有fallback方法
* @return
* @throws Exception
*/
@GetMapping("/ratelimiterTestFallbackMethod")
@RateLimiter(name = "ratelimiterA",fallbackMethod = "fallBackByRatelimiter")
public User ratelimiterTestFallbackMethod() throws TimeoutException, InterruptedException {
User user = new User("liuxiao", 27);
log.info(JSONObject.toJSONString(user));
return user;
}
private User fallBackByRatelimiter(RequestNotPermitted e){
log.info("限流控制器已经打开,拒绝访问被保护方法~");
io.github.resilience4j.ratelimiter.RateLimiter ratelimiterA = rateLimiterRegistry.rateLimiter("ratelimiterA");
io.github.resilience4j.ratelimiter.RateLimiter.Metrics metrics = ratelimiterA.getMetrics();
log.info("方法限流中:" + "metrics[ availablePermissions=" + metrics.getAvailablePermissions() +
", numberOfWaitingThreads=" + metrics.getNumberOfWaitingThreads() +
" ]"
);
User user = new User("限流控制器降级测试", 0);
log.info(JSONObject.toJSONString(user));
return user;
}
}
使用postman在一个限制周期内连续调用4次,控制台结果如下:
前三次能正常输出,第4次时限流器进行了限流处理,抛出异常。并且由于配置了timeOutDuration为5000ms,因此第4次接口调用时等待了5000ms才返回结果信息。
{
"timestamp": "2022-06-01T03:44:41.563+0000",
"status": 500,
"error": "Internal Server Error",
"message": "RateLimiter 'ratelimiterA' does not permit further calls",
"trace": "io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'ratelimiterA' does not permit further calls\n\tat....",
"path": "/ratelimiterTestNoFallbackMethod"
}
调用服务降级的限流方法ratelimiterTestFallbackMethod
使用postman在一个限制周期内连续调用4次,控制台结果如下:
{"age":27,"name":"liuxiao"}
{"age":27,"name":"liuxiao"}
{"age":27,"name":"liuxiao"}
限流控制器已经打开,拒绝访问被保护方法~
方法限流中:metrics[ availablePermissions=0, numberOfWaitingThreads=0 ]
{"age":0,"name":"限流控制器降级测试"}
Bulkhead(并发控制器)
Bulkhead(舱壁)是用来控制并行(parallel)调用的次数。Resilence4j的Bulkhead提供两种实现,一种是基于信号量的(SemaphoreBulkhead),另一种是基于有等待队列的固定大小的线程池的(FixedThreadPoolBulkhead),由于基于信号量的Bulkhead能很好地在多线程和I/O模型下工作,所以选择介绍基于信号量的Bulkhead的使用。
可配置参数
配置参数 | 默认值 | 描述 |
---|---|---|
maxConcurrentCalls | 25 | 可允许的最大并发线程数 |
maxWaitDuration | 0 | 尝试进入饱和舱壁时应阻止线程的最大时间 |
测试demo
1.首先添加POM依赖。不需要引入新的依赖,已经集成在resilience4j-spring-boot2中了。
2.并发控制器配置(application.yml)
resilience4j:
bulkhead:
backends:
bulkheadA:
maxconcurrentcalls: 5 # 可允许的最大并发线程数
maxwaittime: 1 # 尝试进入饱和舱壁时应阻止线程的最大时间
3.使用注解的方式实现并发控制器
package com.lx.cloud.demo.controller;
import com.alibaba.fastjson.JSONObject;
import com.lx.cloud.demo.entity.User;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Resilience4jController
* @author lx
*/
@RestController
@Slf4j
public class Resilience4jController {
@Autowired
private BulkheadRegistry bulkheadRegistry;
/**
* 并发
* @return
* @throws Exception
*/
@GetMapping("/bulkheadTestNoFallbackMethod")
@Bulkhead(name = "bulkheadA")
public User bulkheadTestNoFallbackMethod() throws Exception{
Thread.sleep(500);
User user = new User("liuxiao", 27);
System.out.println(JSONObject.toJSONString(user));
return user;
}
/**
* 并发降级
* @return
* @throws Exception
*/
@GetMapping("/bulkheadTestFallbackMethod")
@Bulkhead(name = "bulkheadA",fallbackMethod = "fallBackByBulkhead")
public User bulkheadTestFallbackMethod() throws Exception{
Thread.sleep(5000);
User user = new User("liuxiao", 27);
System.out.println(JSONObject.toJSONString(user));
return user;
}
//降级处理
private User fallBackByBulkhead(BulkheadFullException e){
System.out.println("并发控制器已经打开,拒绝访问被保护方法~");
io.github.resilience4j.bulkhead.Bulkhead bulkheadA = bulkheadRegistry.bulkhead("bulkheadA");
io.github.resilience4j.bulkhead.Bulkhead.Metrics metrics = bulkheadA.getMetrics();
System.out.println("方法降级中:" + "metrics[ availableConcurrentCalls=" + metrics.getAvailableConcurrentCalls() +
", maxAllowedConcurrentCalls=" + metrics.getMaxAllowedConcurrentCalls() +
" ]"
);
User user = new User("并发控制器降级测试", 0);
System.out.println(JSONObject.toJSONString(user));
return user;
}
}
使用Jmeter进行并发测试,调用bulkheadTestFallbackMethod服务降级方法。同时发送10个请求。控制台输出结果如下:
并发控制器已经打开,拒绝访问被保护方法~
方法降级中:metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=5 ]
{"age":0,"name":"并发控制器降级测试"}
并发控制器已经打开,拒绝访问被保护方法~
方法降级中:metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=5 ]
{"age":0,"name":"并发控制器降级测试"}
并发控制器已经打开,拒绝访问被保护方法~
方法降级中:metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=5 ]
{"age":0,"name":"并发控制器降级测试"}
并发控制器已经打开,拒绝访问被保护方法~
方法降级中:metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=5 ]
{"age":0,"name":"并发控制器降级测试"}
并发控制器已经打开,拒绝访问被保护方法~
方法降级中:metrics[ availableConcurrentCalls=0, maxAllowedConcurrentCalls=5 ]
{"age":0,"name":"并发控制器降级测试"}
{"age":27,"name":"liuxiao"}
{"age":27,"name":"liuxiao"}
{"age":27,"name":"liuxiao"}
{"age":27,"name":"liuxiao"}
{"age":27,"name":"liuxiao"}
参考文档:
官方文档