随着业务的不断扩展,微服务比例逐渐增大,为了保证各个服务的稳定性以及服务之间的隔离,所以我们需要服务具备熔断、降级等功能,保证整个系统不会被拉垮,所以本节我们分析Hystrix熔断、降级,当然阿里的Sentinel、Spring官方推荐的Resilience4j也都具备这样的功能,但是本节重点分析Hystrix,虽然Hystrix已经停止更新了,但是现阶段许多微服务的熔断还是基于的Hystrix,那么现在我们开始进入正题!
开局一张图,内容全靠懵!话有点多,我们直接上图,有点懵是吧?PS:正所谓,懵逼树上懵逼果,懵逼树下你和我,这就让笔者进行一波懵逼总结!因为Hystrix基于RxJava实现,如果对RxJava的被观察者和观察者不熟悉,那么正在阅读的同学可以先缓一缓,先去阅读下RxJava相关的用法,对RxJava的运作模式清晰之后,再来分析Hystrix就比较容易上手了,从官方Wiki文档上对流程的总结,我们能得知流程图为9部分组成,下文我们就看看每个步骤都做了什么工作!
步骤1、2(构建命令、执行命令)
命令分为HystrixCommand、HystrixObservableCommand两种,通过构建和使用两种命令,既可以使用Hystrix的熔断、降级等功能,两者用法基本一致,唯一有区别的就是HystrixCommand返回单个操作结果(发射一次数据),HystrixObservableCommand返回多个操作结果(发射多次数据),这样的区别其实只是在自定义实现HystrixCommand、HystrixObservableCommand时候才会有区别,对于Hystrix内部其实都是返回的Observable对象来获取结果
- HystrixCommand
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
......
//当调用execute()、queue()方法时候需要实现此方法
protected abstract R run() throws Exception;
//同步执行,返回结果
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
//异步执行,返回Future对象
public Future<R> queue() {......}
}
- HystrixObservableCommand
public abstract class HystrixObservableCommand<R> extends AbstractCommand<R> implements HystrixObservable<R>, HystrixInvokableInfo<R> {
......
//当调用observe()、toObservable()方法时候需要实现此方法
protected abstract Observable<R> construct();
//调用后自动订阅,立即执行命令
public Observable<R> observe() {
ReplaySubject<R> subject = ReplaySubject.create();
final Subscription sourceSubscription = toObservable().subscribe(subject);
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
sourceSubscription.unsubscribe();
}
});
}
//调用后不会自动订阅,当调用方手动订阅之后,才执行命令
public Observable<R> toObservable() {......}
}
步骤3(缓存)
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
public Observable<R> toObservable() {
......
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
//首先从缓存中获取是否存在相同命令的结果
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//创建Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
//放入缓存
if (requestCacheEnabled && cacheKey != null) {
// 从缓存中包装Observable
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// 如果其他线程已经存入这个缓存命令,那么直接执行
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// 返回刚刚我们创建的ObservableCommand
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
//命令执行结束后的清理动作
.doOnTerminate(terminateCommandCleanup)
//取消订阅后的清理动作
.doOnUnsubscribe(unsubscribeCommandCleanup)
//命令执行完成后的Hook动作
.doOnCompleted(fireOnCompletedHook);
}
}
}
}
从代码片段中我们能大致得知如果开启了缓存,那么优先从缓存中查找,是否存在相同的命令,如果存在相同的命令,那么直接从这个缓存命令返回结果,否则每次都创建新的命令并执行,返回最终的结果
步骤4(断路器/熔断)
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
......
public Observable<R> toObservable() {
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//如果没有订阅,则中断当前事件流,never()不会执行订阅者的onCompleted
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
//Hystrix断路器、隔离相关逻辑
return applyHystrixSemantics(_cmd);
}
};
}
......
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//扩展接口,提供给使用者扩展使用
executionHook.onStart(_cmd);
//断路器是否允许请求
if (circuitBreaker.allowRequest()) {
//获取执行的信号量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
//信号量释放标志
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
//释放信号量的逻辑
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
//标志异常的逻辑
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
//跟踪线程执行时间
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//执行命令
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//获取信号量失败的逻辑
return handleSemaphoreRejectionViaFallback();
}
} else {
//直接执行断路逻辑
return handleShortCircuitViaFallback();
}
}
}
通过注释信息,我们能得知,执行命令时,首先判断断路器是否允许请求,然后获取执行命令的信号量,获取不到就返回失败,以及在执行完命令之后信号量释放操作
步骤5(信号量、线程池隔离策略)
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
......
//异常情况的逻辑
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {......}
Observable<R> execution;
//是否启用超时
//executeCommandWithSpecifiedIsolation根据隔离策略处理逻辑
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//线程池隔离策略
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
......
} else {
//信号量隔离策略
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
//存储正在运行的命令
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
//扩展接口
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
//执行用户任务
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
}
}
信号量与线程池默认值都是10,当10个线程或者是信号量都被占用,那么将会拒绝执行命令
步骤6(执行用户任务)
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
......
protected abstract Observable<R> getExecutionObservable();
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
userObservable = getExecutionObservable();
} catch (Throwable ex) {
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
}
对于HystrixCommand命令模式的需要实现run()方法,对于HystrixObservableCommand命令模式的需要实现construct()方法,调用到用户真正的任务
步骤7(断路器健康状况)
通过上报统计指标信息来判断当前断路器的健康状况,命令执行的情况(成功、失败、超时、异常等)都会上报给HystrixCommandMetrics这个类,最终通过HystrixMetricsPoller定期处理这些上报的数据(定时任务时间间隔2秒),在判断这些汇总数据和设定的条件来判断是否需要开启断路器/熔断
步骤8(命令执行失败)
通过流程图得知,开启断路器/熔断、线程池或者信号量拒绝、执行失败、执行超时等都会执行用户指定的Fallback逻辑,HystrixCommand类型的命令执行getFallback(),HystrixObservableCommand类型的命令执行resumeWithFallback()
步骤9(返回命令执行结果)
结果最终以Observable对象返回,用户可以通过Observable对应的不同策略进行获取,execute()、queue()、observe()、toObservable()等
根据流程图我们大致可以看出Hystrix具体做了哪些工作,对于每个步骤具体对应的逻辑相对比较复杂,我们放到后续分析逐步揭晓!