Hystrix
简要说明
在大中型分布式系统中,通常系统很多依赖。在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等。在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。一般来说,随着服务依赖数量的变多,服务不稳定的概率会成指数性提高。
解决的问题
一个依赖30个SOA服务的系统,每个服务99.99%可用。
99.99%的30次方 ≈ 99.7%,
0.3% 意味着一亿次请求 会有 3,000,00次失败, 换算成时间大约每月有2个小时服务不稳定。 解决这个问题的方案是对依赖进行隔离。Hystrix就是处理依赖隔离的框架,同时也是可以帮我们做依赖服务的治理和监控。
前期准备
rxjava
Hystrix引入rxjava 1
四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
BlockingObservable<T> ,一个阻塞的Observable 继承普通的Observable类,增加了一些可用于阻塞Observable发射的数据的操作符。BlockingObservable已经在Rxjava2中去掉了,在Rxjava2中已经集成到了Observable
线程池、Future
略
流程图
构建一个HystrixCommand 或 HystrixObservableCommand
HystrixCommand
R run()
R getFallback()
HystrixObservableCommand
Observable<R> construct()
resumeWithFallback()
执行Command
K value = command.execute(); -> queue().get()
Future<K> fValue = command.queue(); -> toObservable().toBlocking().toFuture();( Observable -> BlockingObservable -> Future(delegate) -> Future)
Observable<K> ohValue = command.observe(); //hot observable
Observable<K> ocValue = command.toObservable(); //cold observable
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
sourceSubscription.unsubscribe();
}
});
}
是否缓存
重写getCacheKey(),用来构造cache key。
HystrixRequestContext.initializeContext()和context.shutdown()构建context。
public class HelloWorld extends HystrixCommand<Boolean> {
@Override
protected String getCacheKey() {
return ?;
}
public static void main(String[] args) throws Exception {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
...
}finally {
context.shutdown();
}
}
}
Circuit是否打开
检测circuit-breaker是否打开,打开则直接进入fallback。否则进入下一步。
HystrixCommandProperties设置circuitBreaker
circuitBreaker.enabled
设置断路器是否生效。
circuitBreaker.requestVolumeThreshold
设置在一个滚动窗口中,打开断路器的最少请求数。
circuitBreaker.sleepWindowInMilliseconds
设置在断路器被打开,拒绝请求到再次尝试请求的时间间隔。
circuitBreaker.errorThresholdPercentage
设置打开断路器并启动回退逻辑的错误比率。(这个参数的效果受到circuitBreaker.requestVolumeThreshold和滚动时间窗口的时间长度影响)
circuitBreaker.forceOpen
强制断路器进入打开状态
circuitBreaker.forceClosed
强制断路器进入关闭状态
线程池、队列、信号是否占满
线程池、队列、信号是否占满的时候,将直接进入fallback
线程池配置
coreSize
心线程池的大小。
maximumSize
设置线程池最大值。
maxQueueSize
设置BlockingQueue最大的队列值。
queueSizeRejectionThreshold
设置队列拒绝的阈值
keepAliveTimeMinutes
设置存活时间,单位分钟。
allowMaximumSizeToDivergeFromCoreSize
该属性允许maximumSize起作用。
metrics.rollingStats.timeInMilliseconds
设置统计的滚动窗口的时间段大小。
metrics.rollingStats.numBuckets
设置滚动的统计窗口被分成的bucket的数目。
run()或者construct()
执行相应的run()或者construct()的方法。
计算Circuit Health
通过计算successes, failures, rejections, and timeouts,确定是否打开circuitBreaker。
getFallback()或者resumeWithFallback()
是否进入fallback
Failure Type | fallback |
---|---|
FAILURE | YES |
TIMEOUT | YES |
SHORT_CIRCUITED | YES |
THREAD_POOL_REJECTED | YES |
SEMAPHORE_REJECTED | YES |
BAD_REQUEST | YES |
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
折叠器
通过继承HystrixCollapser,实现多个请求折叠成单个请求。
HystrixCollapserProperties
名称 | 描述 | 默认值 |
---|---|---|
maxRequestsInBatch | 允许的最大请求数 | Integer.MAX_VALUE |
timerDelayInMilliseconds | 多少毫秒后出发执行 | 10毫秒 |
requestCache.enabled | 是否缓存 | true |