如何实现一个简单的熔断以及Hystrix原理分析

前言

随着业务的越来越复杂,保证程序的健壮性对程序猿来说也变得更加的重要,毕竟不写Bug的程序猿不是一个好的程序猿。但怎样尽可能的保证咱们的程序能够稳定的运行,以及出错后能够进行相应的补偿,这里就需要咱们使用熔断机制了。

PS:在进入正文之前,不妨思考一下两个问题:
①熔断机制究竟为我们解决了什么问题?
②我们怎样去自己实现一个简单的熔断?


自定义熔断的实现

这里咱们简单的实现了一个超时后进行熔断的例子,这里有用到AspectJ的相关知识,对于熟悉Spring AOP知识的同学应该没什么问题。

主要分为两步:

  1. 使用Future控制是否超时,超时后将任务cancel掉。
  2. 调用咱们自己定义好的fallback方法进行处理。在这里需要注意的是,fallback方法参数应该要与原方法相同,这样咱们才能进行补偿措施。例如:咱们可以在fallback方法借助消息中间件将这些参数进行存储,然后在适当的时候从消息中间件中读取出来进行补偿消费处理。
@RestController
public class HelloController {
    private Random random = new Random();

    @MyHystrixCommand(fallback="errorMethod")
    @RequestMapping("/hello")
    public String hello(@RequestParam("name") String message) throws InterruptedException {
        int time = random.nextInt(200);
        System.out.println("spend time : " + time + "ms");
        Thread.sleep(time);
        System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhh");
        return "hello world:" + message;
    }

    public String errorMethod(String message) {
        return "error message";
    }
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyHystrixCommand {
    int value() default 100;
    String fallback() default "";
}
@Aspect
@Component
public class MyHystrixCommandAspect {

    ExecutorService executor = Executors.newFixedThreadPool(10);

    @Pointcut(value = "@annotation(MyHystrixCommand)")
    public void pointCut() {

    }

    @Around(value = "pointCut()&&@annotation(hystrixCommand)")
    public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Throwable {
        int timeout = hystrixCommand.value();
        Future future = executor.submit(() -> {
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
            }
            return null;
        });
        Object returnValue = null;
        try {
            returnValue = future.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            future.cancel(true);
            if (StringUtils.isBlank(hystrixCommand.fallback())){
                throw new Exception("fallback is null");
            }
            returnValue = invokeFallbackMethod(joinPoint, hystrixCommand.fallback());
        }
        return returnValue;
    }

    private Object invokeFallbackMethod(ProceedingJoinPoint joinPoint, String fallback) throws Exception {
        Method method = findFallbackMethod(joinPoint, fallback);
        if (method == null) {
            throw new Exception("can not find fallback :" + fallback + " method");
        } else {
            method.setAccessible(true);
            try {
                Object invoke = method.invoke(joinPoint.getTarget(), joinPoint.getArgs());
                return invoke;
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw e;
            }
        }
    }


    private Method findFallbackMethod(ProceedingJoinPoint joinPoint, String fallbackMethodName) {
        Signature signature = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method method = methodSignature.getMethod();
        Class<?>[] parameterTypes = method.getParameterTypes();
        Method fallbackMethod = null;
        try {
        //这里通过判断必须取和原方法一样参数的fallback方法
            fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallbackMethodName, parameterTypes);
        } catch (NoSuchMethodException e) {
        }
        return fallbackMethod;
    }

}

当然,上述例子只是一个简单的超时后熔断处理的实现方式。咱们在实际应用中,还有可能并发超过指定阈值后咱们也需要进行降级处理,一个最普通的场景:秒杀案例。这些东西在Hystrix中都有相应的处理,它提供了线程池和信号量这两种方式去解决并发的问题。


什么是Hystrix?

咱们看一下官方介绍

In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a library that helps you control the interactions between these distributed services by adding latency tolerance and fault tolerance logic. Hystrix does this by isolating points of access between the services, stopping cascading failures across them, and providing fallback options, all of which improve your system’s overall resiliency.

在分布式环境中,调用一些服务不可避免的会出现失败,Hystrix帮助咱们添加了一些容忍策略,并且将服务进行隔离处理,防止一个服务的失败影响到了另一个服务的调用,这些都提高了咱们系统的弹性。


Hystrix的处理流程

这里咱们结合一下Spring Cloud Hystrix进行说明,从HystrixCommandAspect开始分析:

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        ...
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);//第一步
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);//第二步
        ...
        Object result;
        try {
            //第三步
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } 
        ....
        return result;
    }

这个切面主要针对HystrixCommandHystrixCollapser这两个注解,前者用于进行熔断降级处理,后者用来根据配置进行合并请求(类比数据库操作,将多个insert语句合并成一个insert batch语句)。咱们侧重进行HystrixCommand这一块的分析。

第一步:获取元数据(MetaHolder)

这段代码对应上面的MetaHolder metaHolder = metaHolderFactory.create(joinPoint);,里面封装了比如调用方法method,参数args,方法所属对象target,动态代理对象proxy,回调方法fallbackMethod等等一些元数据的封装。这些数据在创建命令对象时会被使用。

第二步:获取调用者(HystrixInvokable)

它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑,针对HystrixCommand上述的命令对象就是GenericObservableCommandGenericCommand的一种,这里命令对象的选择和方法的返回值有关,如果返回值为Observable类型,则创建GenericObservableCommand命令,否则创建GenericCommand命令。

第三步:执行命令(execute)
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        ...
        switch (executionType) {
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            ...
        }
    }

从上面的代码段中,可以很容易的看出共有三种策略,同步、异步、OBSERVABLE,而Observable又分为Cold Observable(observable.toObservable())Hot Observable(observable.observe())。所以说总共有四种执行方式。但是底层都会调用到AbstractCommand.toObservable()方法。

  • execute():同步执行,返回一个单一的对象结果,发生错误时抛出异常。
  • queue():异步执行,返回一个Future对象,包含着执行结束后返回的单一结果。
  • observe():这个方法返回一个Observable对象,它代表操作的多个结果,但是已经被订阅者消费掉了。
  • toObservable():这个方法返回一个Observable对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。

在执行逻辑中,大量用到了RxJava,各种回调处理,看的着实头晕,感兴趣的同学可以自行阅读源码,我这里只是介绍一些关键的流程点。

①首先会检查是否命中缓存(toObservable方法中),命中缓存则直接返回:

/* try from cache first */
 if (requestCacheEnabled) {
      HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
       if (fromCache != null) {
           isResponseFromCache = true;
           return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
        }
}

②检查断路器是否打开,如果断路器打开,则通过handleShortCircuitViaFallback直接进行fallback处理:

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
        }else {
            return handleShortCircuitViaFallback();
        }
        ...
}

③检查是否用了信号量,如果用了,则判断是否被占满,占满后则抛出异常,通过handleSemaphoreRejectionViaFallback直接转到fallback中进行执行,不执行后面的逻辑。如果没用,则会返回一个默认的TryableSemaphoreNoOp.DEFAULT,在进行executionSemaphore.tryAcquire()时始终返回true。

if (executionSemaphore.tryAcquire()) {
  try {
    /* used to track userThreadExecutionTime */
    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
    return executeCommandAndObserve(_cmd)
            .doOnError(markExceptionThrown)
            .doOnTerminate(singleSemaphoreRelease)
            .doOnUnsubscribe(singleSemaphoreRelease);
    } catch (RuntimeException e) {
        return Observable.error(e);
    }
} else {
    return handleSemaphoreRejectionViaFallback();
}

④执行命令中的逻辑

通过重写AbstractCommand中的getExecutionObservable()方法使得下面两个命令类中的相应逻辑被调用。

  • GenericCommand中的run()方法
  • GenericObservableCommand中的construct()方法

如果run或者construct中设置了超时时间,如果执行时间超过了阈值,则会抛出TimeoutException,或者在执行过程中抛出其他异常,都会进入fallback中进行处理逻辑。

⑤发生异常后执行fallback

   private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, 
         final HystrixEventType eventType,
         final FailureType failureType, 
         final String message,
         final Exception originalException) {
}

最终都会调用到这个方法,咱们看看FailureType具体有哪几种类型。

  • COMMAND_EXCEPTION:执行run方法或者construct方法抛出异常时。
  • TIMEOUT:超时情况下。
  • SHORTCIRCUIT:断路器直接打开时,直接执行handleShortCircuitViaFallback方法。
  • REJECTED_THREAD_EXECUTION:线程池、请求队列被占满的情况下。
  • REJECTED_SEMAPHORE_EXECUTION:信号量占满情况下。
  • BAD_REQUEST_EXCEPTION:
  • REJECTED_SEMAPHORE_FALLBACK:

总结

Hystrix中大量用了RxJava,阅读源码看起来不免会觉得头晕,可以考虑在关键点打几个断点看看,不然各种回调会让你绕圈圈。不过个人觉得RxJava代码看起来还是蛮优美的,只不过有些许不适应而已,后面有时间会研究一下RxJava


END

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351