Spring Cloud Hystrix 分析(一)之流程图

随着业务的不断扩展,微服务比例逐渐增大,为了保证各个服务的稳定性以及服务之间的隔离,所以我们需要服务具备熔断、降级等功能,保证整个系统不会被拉垮,所以本节我们分析Hystrix熔断、降级,当然阿里的Sentinel、Spring官方推荐的Resilience4j也都具备这样的功能,但是本节重点分析Hystrix,虽然Hystrix已经停止更新了,但是现阶段许多微服务的熔断还是基于的Hystrix,那么现在我们开始进入正题!


Hystrix官方流程图

开局一张图,内容全靠懵!话有点多,我们直接上图,有点懵是吧?PS:正所谓,懵逼树上懵逼果,懵逼树下你和我,这就让笔者进行一波懵逼总结!因为Hystrix基于RxJava实现,如果对RxJava的被观察者和观察者不熟悉,那么正在阅读的同学可以先缓一缓,先去阅读下RxJava相关的用法,对RxJava的运作模式清晰之后,再来分析Hystrix就比较容易上手了,从官方Wiki文档上对流程的总结,我们能得知流程图为9部分组成,下文我们就看看每个步骤都做了什么工作!


步骤1、2(构建命令、执行命令)

命令分为HystrixCommand、HystrixObservableCommand两种,通过构建和使用两种命令,既可以使用Hystrix的熔断、降级等功能,两者用法基本一致,唯一有区别的就是HystrixCommand返回单个操作结果(发射一次数据),HystrixObservableCommand返回多个操作结果(发射多次数据),这样的区别其实只是在自定义实现HystrixCommand、HystrixObservableCommand时候才会有区别,对于Hystrix内部其实都是返回的Observable对象来获取结果

  • HystrixCommand
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    //当调用execute()、queue()方法时候需要实现此方法
    protected abstract R run() throws Exception;
    //同步执行,返回结果
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
    //异步执行,返回Future对象
    public Future<R> queue() {......}
}
  • HystrixObservableCommand
public abstract class HystrixObservableCommand<R> extends AbstractCommand<R> implements HystrixObservable<R>, HystrixInvokableInfo<R> {
    ......
    //当调用observe()、toObservable()方法时候需要实现此方法
    protected abstract Observable<R> construct();
    //调用后自动订阅,立即执行命令
    public Observable<R> observe() {
        ReplaySubject<R> subject = ReplaySubject.create();
        final Subscription sourceSubscription = toObservable().subscribe(subject);
        return subject.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                sourceSubscription.unsubscribe();
            }
        });
    }
    //调用后不会自动订阅,当调用方手动订阅之后,才执行命令
    public Observable<R> toObservable() {......}
}

步骤3(缓存)

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    public Observable<R> toObservable() {
        ......
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                //首先从缓存中获取是否存在相同命令的结果
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                //创建Observable
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                //放入缓存
                if (requestCacheEnabled && cacheKey != null) {
                    // 从缓存中包装Observable
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // 如果其他线程已经存入这个缓存命令,那么直接执行
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // 返回刚刚我们创建的ObservableCommand
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        //命令执行结束后的清理动作
                        .doOnTerminate(terminateCommandCleanup) 
                        //取消订阅后的清理动作
                        .doOnUnsubscribe(unsubscribeCommandCleanup) 
                        //命令执行完成后的Hook动作
                        .doOnCompleted(fireOnCompletedHook);
            }
        }
    }
}

从代码片段中我们能大致得知如果开启了缓存,那么优先从缓存中查找,是否存在相同的命令,如果存在相同的命令,那么直接从这个缓存命令返回结果,否则每次都创建新的命令并执行,返回最终的结果


步骤4(断路器/熔断)

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    public Observable<R> toObservable() {
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                //如果没有订阅,则中断当前事件流,never()不会执行订阅者的onCompleted
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                //Hystrix断路器、隔离相关逻辑
                return applyHystrixSemantics(_cmd);
            }
        };
    }
    ......
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        //扩展接口,提供给使用者扩展使用
        executionHook.onStart(_cmd);

        //断路器是否允许请求
        if (circuitBreaker.allowRequest()) {
            //获取执行的信号量
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            //信号量释放标志
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            //释放信号量的逻辑
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };
            //标志异常的逻辑
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
            //尝试获取信号量
            if (executionSemaphore.tryAcquire()) {
                try {
                    //跟踪线程执行时间
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    //执行命令
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                //获取信号量失败的逻辑
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            //直接执行断路逻辑
            return handleShortCircuitViaFallback();
        }
    }
}

通过注释信息,我们能得知,执行命令时,首先判断断路器是否允许请求,然后获取执行命令的信号量,获取不到就返回失败,以及在执行完命令之后信号量释放操作


步骤5(信号量、线程池隔离策略)

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        ......
        //异常情况的逻辑
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {......}
        Observable<R> execution;
        //是否启用超时
        //executeCommandWithSpecifiedIsolation根据隔离策略处理逻辑
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        //线程池隔离策略
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
          ......
        } else {
            //信号量隔离策略
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }
                    
                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    //存储正在运行的命令
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        //扩展接口
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        //执行用户任务
                        return getUserExecutionObservable(_cmd);  
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            });
        }
    }
}

信号量与线程池默认值都是10,当10个线程或者是信号量都被占用,那么将会拒绝执行命令


步骤6(执行用户任务)

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    protected abstract Observable<R> getExecutionObservable();
    private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
        Observable<R> userObservable;

        try {
            userObservable = getExecutionObservable();
        } catch (Throwable ex) {
            userObservable = Observable.error(ex);
        }

        return userObservable
                .lift(new ExecutionHookApplication(_cmd))
                .lift(new DeprecatedOnRunHookApplication(_cmd));
    }
}

对于HystrixCommand命令模式的需要实现run()方法,对于HystrixObservableCommand命令模式的需要实现construct()方法,调用到用户真正的任务


步骤7(断路器健康状况)

通过上报统计指标信息来判断当前断路器的健康状况,命令执行的情况(成功、失败、超时、异常等)都会上报给HystrixCommandMetrics这个类,最终通过HystrixMetricsPoller定期处理这些上报的数据(定时任务时间间隔2秒),在判断这些汇总数据和设定的条件来判断是否需要开启断路器/熔断


步骤8(命令执行失败)

通过流程图得知,开启断路器/熔断、线程池或者信号量拒绝、执行失败、执行超时等都会执行用户指定的Fallback逻辑,HystrixCommand类型的命令执行getFallback(),HystrixObservableCommand类型的命令执行resumeWithFallback()


步骤9(返回命令执行结果)

结果最终以Observable对象返回,用户可以通过Observable对应的不同策略进行获取,execute()、queue()、observe()、toObservable()等

根据流程图我们大致可以看出Hystrix具体做了哪些工作,对于每个步骤具体对应的逻辑相对比较复杂,我们放到后续分析逐步揭晓!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容