Spring Cloud Hystrix实现了断路器、线程隔离等一系列服务保护功能。它是基于Netflix的开源框架Hystrix实现的,该框架的目标在于通过控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。Hystrix具备服务降级(fallback)、服务熔断、线程和信号隔离、请求缓存、请求合并以及服务监控等强大功能。
What Is Hystrix For?
- 保护和控制对依赖项的网络延时和失败情况
- 防止级联故障
- 快速失败、快速恢复
- fallback和服务降级
- 接近实时级的监控、报警和控制
How Does Hystrix Accomplish Its Goals?
- 使用命令模式将所有对外部服务(或依赖关系)的调用包装在HystrixCommand或HystrixObservableCommand对象中,并将该对象放在单独的线程中执行;
- 支持自定义依赖项的超时时间,一般稍高于99.5%的访问时间;
- 每个依赖都维护着一个线程池(或信号量),线程池被耗尽则拒绝请求(而不是让请求排队)。
- 记录请求成功,失败,超时和线程拒绝。
- 服务错误百分比超过了阈值,熔断器开关自动打开,一段时间内停止对该服务的所有请求。
- 请求失败,被拒绝,超时或熔断时执行降级逻辑。
-
近实时地监控指标和配置的修改。
设计模式
命令模式
观察者模式
舱壁模式
原理分析
工作流程
- 构造一个 HystrixCommand或HystrixObservableCommand对象,用于封装请求,并在构造方法配置请求被执行需要的参数;
- 执行命令,Hystrix提供了4种执行命令的方法,后面详述;
- 判断是否使用缓存响应请求,若启用了缓存,且缓存可用,直接使用缓存响应请求。Hystrix支持请求缓存,但需要用户自定义启动;
- 判断熔断器是否打开,如果打开,跳到第8步;
- 判断线程池/队列/信号量是否已满,已满则跳到第8步;
- 执行HystrixObservableCommand.construct()或HystrixCommand.run(),如果执行失败或者超时,跳到第8步;否则,跳到第9步;
- 统计熔断器监控指标;
- 走Fallback备用逻辑
- 返回请求响应
执行命令的几种方法
Hystrix提供了4种执行命令的方法,execute()和queue() 适用于HystrixCommand对象,而observe()和toObservable()适用于HystrixObservableCommand对象。
- execute()
以同步堵塞方式执行run(),只支持接收一个值对象。hystrix会从线程池中取一个线程来执行run(),并等待返回值。
- queue()
以异步非阻塞方式执行run(),只支持接收一个值对象。调用queue()就直接返回一个Future对象。可通过 Future.get()拿到run()的返回结果,但Future.get()是阻塞执行的。若执行成功,Future.get()返回单个返回值。当执行失败时,如果没有重写fallback,Future.get()抛出异常。
- observe()
事件注册前执行run()/construct(),支持接收多个值对象,取决于发射源。调用observe()会返回一个hot Observable,也就是说,调用observe()自动触发执行run()/construct(),无论是否存在订阅者。
如果继承的是HystrixCommand,hystrix会从线程池中取一个线程以非阻塞方式执行run();如果继承的是HystrixObservableCommand,将以调用线程阻塞执行construct()。
observe()使用方法:
- 调用observe()会返回一个Observable对象
- 调用这个Observable对象的subscribe()方法完成事件注册,从而获取结果
- toObservable()
事件注册后执行run()/construct(),支持接收多个值对象,取决于发射源。调用toObservable()会返回一个cold Observable,也就是说,调用toObservable()不会立即触发执行run()/construct(),必须有订阅者订阅Observable时才会执行。
如果继承的是HystrixCommand,hystrix会从线程池中取一个线程以非阻塞方式执行run(),调用线程不必等待run();如果继承的是HystrixObservableCommand,将以调用线程堵塞执行construct(),调用线程需等待construct()执行完才能继续往下走。
toObservable()使用方法:
- 调用observe()会返回一个Observable对象
- 调用这个Observable对象的subscribe()方法完成事件注册,从而获取结果
需注意的是,HystrixCommand也支持toObservable()和observe(),但是即使将HystrixCommand转换成Observable,它也只能发射一个值对象。只有HystrixObservableCommand才支持发射多个值对象。
断路器原理
依赖隔离
Hystrix为每个以来的服务创建一个独立的线程池,这样就算某个依赖服务出现延迟过高的情况,也只是对该依赖服务的调用有影响,而不会拖慢其他依赖服务。
另外Hystrix中除了可使用线程池之外,还可以使用信号量来控制单个以来服务的并发度,信号量的开销远比线程池的开销小,但是它不能设置超时和实现异步访问。
使用详解
构建一个如下架构图的服务调用关系
分析上述架构图,主要有以下几项工作:
- eureka-server工程: 服务注册中心,端口1111
- hello-service工程: HELLO-SERVICE服务单元,启动两个实例,端口分别为8081和8082
- ribbon-consumer工程: 使用Ribbon实现的服务消费者,端口9000
下面我们开始引入Spring Cloud Hystrix。
- 在ribbeon-consumer工程中引入依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
- 在ribbon-consumer工程主类ConsumerApplication中使用@Enable-CircuitBreaker注释开启断路器功能:
@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class ConsumerApplication {
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
public static void main (String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
HystrixCommand支持继承和注释两种写法,本文只介绍注释方法,对继承方法感兴趣的同学可以baidu一些资料。
- 同步执行
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand
public User getUserById(Long id) {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
}
}
- 异步执行
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand
public Future<User> getUserByIdAsync(final String id) {
return new AsyncResult<User>() {
@Override
public User invoke() {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
}
}
}
}
除了传统的同步执行与异步执行之外,我们还可以将HystrixCommand通过Observale来实现响应式执行方式。
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER)
public Observable<String> getUserById(final String id) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()) {
String user = restTemplate.getForObject("http://USER-SERVICE/users/{1}", String.class, id);
observer.onNext(user);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
}
}
下面是一个简单的Observable的使用示例。关于响应式编程,有兴趣的可以研究一下RxJava。
public void useObservable () {
Observable<String> observable = userService.getUserById("1");
observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//
}
});
}
- 定义服务降级
fallback是Hystrix命令执行失败时使用的后备方法,用来实现服务的降级处理逻辑。在HystrixCommand中可以通过重载getFallBack()方法来实现服务降级逻辑,Hystrix会再run()执行过程中出现错误、超时、线程池拒绝、断路器熔断等情况,执行getFallback()方法内的逻辑。
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand(fallbackMethod = "defaultUser")
public String getUserById(Long id) {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", String.class, id);
}
public String defaultUser() {
return "default user";
}
}
如果fallback逻辑还是不稳定,可以添加多级fallback逻辑。
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand(fallbackMethod = "defaultUser")
public String getUserById(Long id) {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", String.class, id);
}
@HystrixCommand(fallbackMethod = "defaultUserSec")
public String defaultUser() {
return "first default user";
}
public String defaultUserSec() {
return "second default user";
}
}
不论Hystrix命令是否实现了服务降级,命令状态和断路器状态都会更新,并且我们可以由此了解到命令执行的失败情况。
- 忽略异常
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand(fallbackMethod = "defaultUser", ignoreExceptions = {HystrixBadRequestException.class})
public String getUserById(Long id) {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", String.class, id);
}
public String defaultUser(Long id, Throwable e) {
return "default user";
}
}
- 命名名称、分组以及线程池划分
@HystrixCommand(commandKey = "getUserById", groupKey = "UserGroup", threadPoolKey = "getUserByIdThread")
public String getUserById(Long id) {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", String.class, id);
}
- 请求缓存
@CacheResult
@HystrixCommand
public String getUserById(@CacheKey("id") Long id) {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", String.class, id);
}
@CacheRemove(commandKey = "getUserById")
@HystrixCommand
public void update(@CacheKey("id") User user) {
return restTemplate.postForObject("http://USER-SERVICE/users/{1}", user, User.class);
}
- 请求合并
HystrixCollapser可以实现对以来服务请求的合并,以减少通信小号和线程数的占用。
@Service
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCollapser(batchMethod = "findAll", collapserProperties = {
@HystrixProperty(name = "timeDelayInMilliseconds", value = "100")
})
public String find(Long id) {
return null;
}
@HystrixCommand
public List<String> findAll(List<Long> ids) {
return restTemplate.getForObject("http://USER-SERVICE/users?ids={1}", List.class,
StringUtils.join(ids, ","));
}
}
属性详解
Hystrix仪表盘
Turbine集群监控
Reference
Netflix/Hystrix Wiki
Netflix/Hystrix Wiki How-To-Use
Hystrix原理与实战