不怕难之Spring Cloud系列之Hystrix

一、简介

1. 引言

什么是熔断器?

为什么要有熔断器?

熔断器有哪些考虑指标?

熔断器有哪些适用的设计模式?

熔断器有哪些通用的使用场景?

如果让你来设计,如何设计熔断器的架构?

2. 介绍

circuit-breaker: circuit表示电路,译为熔断器。

回想起小时候,家里保险丝突然被烧断,需 手工更换一根新的保险丝;后来,保险丝被取代,电流过大时会跳闸,闸拉上去后立马恢复供电;等到上大学时,只要打开功率高一点的电吹风,砰的一声就断电,但过10分钟就自动来电。在电流过大时,通过熔断机制以保护电路和家电。

Hystrix 属于上面的第三种,一种自动恢复的智能熔断器,区别在于它保护的是系统,且判断 "电流过大" 的方式是:不断收集请求指标信息(sucess、failure、timeout、rejection),当达到设定熔断条件时(默认是请求失败率达到50%)进行熔断。

3. 能做什么

(1). 在通过第三方客户端访问(通常是通过网络)依赖服务出现高延迟或者失败时,为系统提供保护和控制

(2). 在分布式系统中防止级联失败

(3). 快速失败(Fail fast)同时能快速恢复

(4). 提供失败回退(Fallback)和优雅的服务降级机制

(5). 提供近实时的监控、报警和运维控制手段

4. 功能点

隔离(线程池隔离和信号量隔离):限制调用分布式服务的资源使用,某一个调用的服务出现问题不会影响其他服务调用。

优雅的降级机制:超时降级、资源不足时(线程或信号量)降级,降级后可以配合降级接口返回托底数据。

融断:当失败率达到阀值自动触发降级(如因网络故障/超时造成的失败率高),熔断器触发的快速失败会进行快速恢复。

缓存:提供了请求缓存、请求合并实现。支持实时监控、报警、控制(修改配置)


二、原理

在统计中,会使用一定数量的样本,并将样本进行分组,最后进行统计分析。

Hystrix 有点类似,例如:以秒为单位来统计请求的处理情况(成功请求数量、失败请求数、超时请求数、被拒绝的请求数),然后每次取最近10秒的数据来进行计算,如果失败率超过50%,就进行熔断,不再处理任何请求

三、整体流程图


根据执行流程,将实现分为如下几块:

(1)主流程HystrixCommand 包装成 Observable 执行:AbstractCommand

(2)异步执行:HystrixContextScheduler

(3)超时中断:HystrixObservableTimeoutOperator

(4)断路器:HystrixCircuitBreaker

(5)滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()

(6)事件处理流:HystrixEventStream

(7)getFallback() 执行

四、运行流程

五、整体架构

熔断是参考电路而产生的一种保护性机制,即系统中如果存在某个服务失败率过高时,将开启熔断器,对于后续的调用,不在继续请求服务,而是进行Fallback操作。

熔断所依靠的数据即是Metrics中的HealthCount所统计的错误率。


六、Hystrix实现原理-命令模式

将所有请求外部系统(或者叫依赖服务)的逻辑封装到 HystrixCommand或者HystrixObservableCommand(依赖RxJava)对象中

Run()方法为要实现的业务逻辑,这些逻辑将会在独立的线程中被执行当请求依赖服务时出现拒绝服务、超时或者短路(多个依赖服务顺序请求,前面的依赖服务请求失败,则后面的请求不会发出)时,执行该依赖服务的失败回退逻辑(Fallback)


七、Hystrix实现原理-舱壁模式

货船为了进行防止漏水和火灾的扩散,会将货仓分隔为多个,当发生灾害时,将所在货仓进行隔离就可以降低整艘船的风险。


八、 Hystrix实现原理-隔离策略

应用在复杂的分布式结构中,可能会依赖许多其他的服务,并且这些服务都不可避免地有失效的可能。如果一个应用没有与依赖服务的失效隔离开来,那么它将有可能因为依赖服务的失效而失效。

Hystrix将货仓模式运用到了服务调用者上。为每一个依赖服务维护一个线程池(或者信号量),当线程池占满,该依赖服务将会立即拒绝服务而不是排队等待。

每个依赖服务都被隔离开来,Hystrix 会严格控制其对资源的占用,并在任何失效发生时,执行失败回退逻辑。



九、源码分析

1. @EnableHystrix注解

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

@Documented

@Inherited

@EnableCircuitBreaker

public @interface EnableHystrix {

}

2. HystrixCircuitBreakerConfiguration源码

@Beanpublic HystrixCommandAspect hystrixCommandAspect() {

    return new HystrixCommandAspect();

}

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")

public void hystrixCommandAnnotationPointcut() {

}

依赖组件:
MetaHolderFactory:生成拦截方法元信息

HystrixCommandFactory:生成 HystrixInvokable

HystrixInvokable

CommandCollapser

GenericObservableCommand

GenericCommand

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")

public void hystrixCollapserAnnotationPointcut() {

}

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")

public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {

    Method method = getMethodFromTarget(joinPoint);

    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);

    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {

        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +

                "annotations at the same time");

    }

    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));

    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);

    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);

    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?

            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;

    try {

        result = CommandExecutor.execute(invokable, executionType, metaHolder);

    } catch (HystrixBadRequestException e) {

        throw e.getCause();

    }

    return result;

}


这个方法中,一开始先获取拦截的Method,然后判断,如果方法上同时加了@HystrixCommand和@HystrixCollapser两个注解的话,就抛异常。

在创建MetaHolder的时候,调用了MetaHolderFactory的create方法,MetaHolderFactory有两个子类,CollapserMetaHolderFactory和CommandMetaHolderFactory,最终执行的是子类的create方法


private static class CommandMetaHolderFactory extends MetaHolderFactory {

    @Override

    public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {

        HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);

        ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());

        MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);

        return builder.defaultCommandKey(method.getName())

                        .hystrixCommand(hystrixCommand)

                        .observableExecutionMode(hystrixCommand.observableExecutionMode())

                        .executionType(executionType)

                        .observable(ExecutionType.OBSERVABLE == executionType)

                        .build();

    }

}



MetaHolder.Builder metaHolderBuilder(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {

    MetaHolder.Builder builder = MetaHolder.builder()

            .args(args).method(method).obj(obj).proxyObj(proxy)

            .defaultGroupKey(obj.getClass().getSimpleName())

            .joinPoint(joinPoint);

    if (isCompileWeaving()) {

        builder.ajcMethod(getAjcMethodFromTarget(joinPoint));

    }

    FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(obj.getClass(), method);

    if (fallbackMethod.isPresent()) {

        fallbackMethod.validateReturnType(method);

        builder

                .fallbackMethod(fallbackMethod.getMethod())

                .fallbackExecutionType(ExecutionType.getExecutionType(fallbackMethod.getMethod().getReturnType()));

    }

    return builder;

}

public HystrixInvokable create(MetaHolder metaHolder) {

    HystrixInvokable executable;

    if (metaHolder.isCollapserAnnotationPresent()) {

        executable = new CommandCollapser(metaHolder);

    } else if (metaHolder.isObservable()) {

        executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));

    } else {

        executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));

    }

    return executable;

}



public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {

    Validate.notNull(invokable);

    Validate.notNull(metaHolder);

    switch (executionType) {

        case SYNCHRONOUS: {

            return castToExecutable(invokable, executionType).execute();

        }

        case ASYNCHRONOUS: {

            HystrixExecutable executable = castToExecutable(invokable, executionType);

            if (metaHolder.hasFallbackMethodCommand()

                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {

                return new FutureDecorator(executable.queue());

            }

            return executable.queue();

        }

        case OBSERVABLE: {

            HystrixObservable observable = castToObservable(invokable);

            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();

        }

        default:

            throw new RuntimeException("unsupported execution type: " + executionType);

    }

}

public R execute() {

    try {

        return queue().get();

    } catch (Exception e) {

        throw decomposeException(e);

    }

}


@Override

protected Object getFallback() {

    if (getFallbackAction() != null) {

        final CommandAction commandAction = getFallbackAction();

        try {

            return process(new Action() {

                @Override

                Object execute() {

                    MetaHolder metaHolder = commandAction.getMetaHolder();

                    Object[] args = createArgsForFallback(metaHolder, getExecutionException());

                    return commandAction.executeWithArgs(commandAction.getMetaHolder().getFallbackExecutionType(), args);

                }

            });

        } catch (Throwable e) {

            LOGGER.error(FallbackErrorMessageBuilder.create()

                    .append(commandAction, e).build());

            throw new FallbackInvocationException(e.getCause());

        }

    } else {

        return super.getFallback();

    }

}


HystrixThreadEventStream

订阅了执行的完成事件后会把执行结果汇总到HystrixThreadEventStream。顾名思义就是一个事件流

-onNext触发的行为,写入HystrixCommandCompletionStream,如果是使用线程池隔离的方式还会写入HystrixThreadPoolCompletionStream。

private static final Action1 writeCommandCompletionsToShardedStreams =new Action1() {

@Override

    public void call(HystrixCommandCompletion commandCompletion) {

HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());

        commandStream.write(commandCompletion);

        if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {

HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());

            threadPoolStream.write(commandCompletion);

        }

}

};

HystrixThreadEventStream(Thread thread) {

this.threadId = thread.getId();

    this.threadName = thread.getName();

    writeOnlyCommandStartSubject = PublishSubject.create();

    writeOnlyCommandCompletionSubject = PublishSubject.create();

    writeOnlyCollapserSubject = PublishSubject.create();

    writeOnlyCommandStartSubject

.onBackpressureBuffer()

.doOnNext(writeCommandStartsToShardedStreams)

.unsafeSubscribe(Subscribers.empty());

    --重点关注writeOnlyCommandCompletionSubject这个对象,触发onNext()方法的时候会调用writeCommandCompletionsToShardedStreams这个方法。

writeOnlyCommandCompletionSubject

.onBackpressureBuffer()

.doOnNext(writeCommandCompletionsToShardedStreams)

.unsafeSubscribe(Subscribers.empty());

    writeOnlyCollapserSubject

.onBackpressureBuffer()

.doOnNext(writeCollapserExecutionsToShardedStreams)

.unsafeSubscribe(Subscribers.empty());

}

--executionResult为执行结果,commandKey为hystrix的命令key,主要是用来区分不同业务进行统计使用的

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {

HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);

    --发送命令到writeOnlyCommandCompletionSubject这个subject(被观察者)

writeOnlyCommandCompletionSubject.onNext(event);

}


十、实战

1. Hystrix接入

public class HelloCommandextends HystrixCommand {

public HelloCommand() {

super(HystrixCommandGroupKey.Factory.asKey("test"));

    }

@Override

    protected Stringrun()throws Exception {

Thread.sleep(800);

        return "sucess";

    }

@Override

    protected StringgetFallback() {

System.out.println("执行了回退方法");

        return "error";

    }

}

2. RXJava

Observable observable = Observable.create(new ObservableOnSubscribe() {

@Override

        public void subscribe(ObservableEmitter emitter)throws Exception {

// 使用Emitter事件发射器发射事件

            emitter.onNext("这是事件1");

            emitter.onNext("这是事件2");

            // emitter.onError(new Exception("这里事件发生了异常。"));

            emitter.onNext("这是事件3");

            emitter.onNext("这是事件4");

            emitter.onComplete();

        }

});

    // 定义观察者,接收事件

    Observer observer =new Observer() {

@Override

        public void onSubscribe(Disposable d) {

// 订阅成功回调该方法,返回控制对象

// d.dispose();

            Log.i(TAG, "--onSubscribe--");

        }

@Override

        public void onNext(String s) {

// 这里接收被观察者发出的事件

            Log.i(TAG, "--onNext--" + s);

        }

@Override

        public void onError(Throwable e) {

// 错误事件

            Log.i(TAG, "--onError--" + e.getMessage());

        }

@Override

        public void onComplete() {

// 完成事件

            Log.i(TAG, "--onComplete--");

        }

};

    // 观察者订阅被观察者

    observable.subscribe(observer);

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,458评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,030评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,879评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,278评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,296评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,019评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,633评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,541评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,068评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,181评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,318评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,991评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,670评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,183评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,302评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,655评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,327评论 2 358