基于上次介绍的Rxjava 的基础,我们今天再来聊一下 Hystrix,我们先看一下 Hystrix 整体处理流程图:
Hystrix 在设计之初主要为了解决以下问题:分布式系统环境下,各个服务之间相互依赖,当其中有一些服务发生故障时,依赖于这个服务的其他服务可会被这种故障影响,导致整个系统瘫痪,而 Hystrix 就是为了放置这种整体系统瘫痪而设计出来的,可以将爆炸半径缩小到有问题的服务上。我们接着看一下 Hystrix 的几种设计目标:
- 阻止故障发生连锁效应,导致系统雪崩。
- 发现问题快速失败,并能及时恢复。
- 提供优雅的降级策略。
- 能够提供实时的监控。
1.8.7.1 Hystrix 简单示例
OrderServiceProvider
随机超时并产生错误
public class OrderServiceProvider {
Random random = new Random();
public Integer queryByOrderId() throws InterruptedException {
int key = random.nextInt(10);
if (key > 4){
Thread.sleep(5000);
throw new RuntimeException("11");
}
return 1;
}
}
QueryOrderIdCommand
HystrixCommand 的实现类,主要是 初始化 Hystrix 配置信息,还有实现 call 和 getFallBack 方法。
public class QueryOrderIdCommand extends HystrixCommand<Integer> {
private OrderServiceProvider orderServiceProvider;
public QueryOrderIdCommand(OrderServiceProvider orderServiceProvider) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("orderService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("queryByOrderId"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(10)//至少有10个请求,熔断器才进行错误率的计算
.withCircuitBreakerSleepWindowInMilliseconds(5000)//熔断器中断请求5秒后会进入半打开状态,放部分流量过去重试
.withCircuitBreakerErrorThresholdPercentage(50)//错误率达到50开启熔断保护
.withExecutionTimeoutEnabled(true))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties
.Setter().withCoreSize(10)));
this.orderServiceProvider = orderServiceProvider;
}
@Override
protected Integer run() throws InterruptedException {
return orderServiceProvider.queryByOrderId();
}
@Override
protected Integer getFallback() {
return -1;
}
}
测试类
@Test
public void testQueryByOrderIdCommand() {
Integer r = new QueryOrderIdCommand(orderServiceProvider).execute();
System.out.println("result:{}"+ r);
}
我们结合上面的流程图先来介绍一些 Hystrix 的整体处理流程。
- 构造一个
HystrixCommand
或HystrixObservableCommand
对象,使用它对请求进行封装,我们示例中使用到了HystrixCommand
, 至于两者的区别后面将会说到。 - 调用刚才构造的包装类执行,主要有四个方法,下面也会说到。
- 判断是否走缓存,假如缓存开启并存有缓存就直接返回。
- 判断熔断开关是否打开,假如打开则直接跳到第八步,服务降级。
- 假如熔断开关没有打开或者是半开状态(一部分请求)请求进入第五步,判断缓存池或者信号量是否已经满了,满了走到第八步(限流实现)。
- 执行
HystrixObservableCommand.construct()
或HystrixCommand.run()
,如果执行失败或者超时,跳到第8步;否则,跳到第9步; - 熔断开关进行统计,看是否需要触发熔断。
- 服务降级策略,即上面示例代码中重写的 getFallBack 方法
- 请求响应
1.8.7.3 HystrixCommand和HystrixObservableCommand
-
HystrixCommand
用在依赖服务返回单个操作结果的时候。 -
HystrixObservableCommand
用在依赖服务返回多个操作结果的时候。
1.8.7.2 四种执行方法
-
execute()
:使用同步阻塞的方式调用command
中的run
方法。 -
queue()
:以异步非阻塞方式执行run()
,只支持接收一个值对象。调用queue()
就直接返回一个Future
对象。可通过Future.get()
拿到run()
的返回结果,但Future.get()
是阻塞执行的。若执行成功,Future.get()
返回单个返回值。 -
observe()
: 在调用subscribe()
前执行run()/construct()
,支持接收多个值对象,取决于发射源 (调用Obsever.onNext ()
方法)。调用observe()会返回一个hot Observable,也就是说,调用observe()自动触发执行run()/construct()
,无论是否存在订阅者。如果继承的是HystrixCommand,hystrix会从线程池中取一个线程以非阻塞方式执行run();如果继承的是HystrixObservableCommand,将以调用线程阻塞执行construct()。
observe()
使用方法:
- 调用
observe()
会返回一个Observable
对象- 调用这个
Observable
对象的subscribe()
方法完成事件注册,从而获取结果
-
toObservable()
在调用 subscribe() 的时候才调用Obsevalbe 的 run()/construct() 方法,不见兔子不撒鹰。支持接收多个值对象,取决于发射源 (调用Obsever.onNext ()
方法)。调用toObservable()会返回一个cold Observable,也就是说,调用toObservable()不会立即触发执行run()/construct(),必须有订阅者订阅Observable时才会执行。
如果继承的是HystrixCommand,hystrix会从线程池中取一个线程以非阻塞方式执行run(),调用线程不必等待run();如果继承的是HystrixObservableCommand,将以调用线程堵塞执行construct(),调用线程需等待construct()执行完才能继续往下走。
toObservable()
使用方法:
- 调用
observe()
会返回一个Observable
对象- 调用这个
Observable
对象的subscribe()
方法完成事件注册,从而获取结果 。
1.8.7.4 资源隔离
Hystrix 的资源隔离主要分为两种返回式一种是线程池,一种是信号量,我们下面将详细讲解一下。
1.8.7.4.1 线程池
我们先来看一下它实现的核心源码
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
...
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
根据上面代码我们可以知道,每个 commondKey 对应的线程池都是独立的,也就是当某个 commandKey 出现故障时候,即使它使用的线程池被耗尽,也不会对其他 commandKey 产生影响,这就起到了资源隔离的作用。
我们接下来讲讲其优缺点:
- 优点
- 隔离性强,即使其中某个服务错误也不会对其他服务产生影响。
- 当依赖从故障恢复正常时,应用程序会立即恢复正常的性能。
- 可以动态配置线程池信息,也就是当发现某个服务故障时,可以将最少的资源分配给该服务。不会造成资源浪费
- 缺点
- 使用线程池在创建线程消耗较大。
- 使用异步模式,线程上下文切换造成性能损耗。
1.8.7.4.1 信号量
使用线程池时,发送请求的线程和执行依赖服务的线程不是同一个,而使用信号量时,发送请求的线程和执行依赖服务的线程是同一个,都是发起请求的线程。客户端需向依赖服务发起请求时,首先要获取一个信号量才能真正发起调用,由于信号量的数量有限,当并发请求量超过信号量个数时,后续的请求都会直接拒绝,进入fallback流程。
- 优点
- 轻量级,不存在创建线程池和线程上下文切换的损耗
- 缺点
- 信号量不支持异步,也不支持超时,也就是说当所请求的服务不可用时,信号量会控制超过限制的请求立即返回,但是已经持有信号量的线程只能等待服务响应或从超时中返回,即可能出现长时间等待。线程池模式下,当超过指定时间未响应的服务,Hystrix会通过响应中断的方式通知线程立即结束并返回。
1.8.7.5 熔断
Hystrix 的熔断主要流程如下图
- 调用
allowRequest()
判断是否允许将请求提交到线程池,它主要由circuitBreaker.forceOpen
这个参数控制,即是否强制打开开关,我们有时候并不一定是故障,有可能存在需要紧急关停某些服务的时候。 - 调用
isOpen()
判断熔断器开关是否打开,如果打开进入第三步,- 》如果一个周期内总的请求数小于circuitBreaker.requestVolumeThreshold
的值,允许请求放行。 !-》如果一个周期内错误率小于circuitBreaker.errorThresholdPercentage
的值,允许请求放行。否则,打开熔断器开关,进入第三步。 - 调用
allowSingleTest()
判断是否允许单个请求通行,如果熔断器打开,且距离熔断器打开的时间或上一次试探请求放行的时间超过circuitBreaker.sleepWindowInMilliseconds
的值时,熔断器器进入半开状态,允许放行一个试探请求;否则,不允许放行。
1.8.7.6 回退降级
降级有如下策略
- 快速失败,直接抛出异常。
- 无声失败,即返回一个空值
- 静态降级,返回一个预设的静态值。
- cache ,返回缓存的旧的结果。
1.8.7.7 总结
今天学习完 Hystrix 我们对它实现的大体流程已经清楚了,接下来我们看一下 soul 是怎么通过 Hystrix 实现限流的。