【Hystrix技术指南】(3)超时机制的原理和实现

[每日一句]

也许你度过了很糟糕的一天,但这并不代表你会因此度过糟糕的一生。

[背景介绍]

  • 分布式系统的规模和复杂度不断增加,随着而来的是对分布式系统可用性的要求越来越高。在各种高可用设计模式中,【熔断、隔离、降级、限流】是经常被使用的。而相关的技术,Hystrix本身早已算不上什么新技术,但它却是最经典的技术体系!。
  • Hystrix以实现熔断降级的设计,从而提高了系统的可用性。
  • Hystrix是一个在调用端上,实现断路器模式,以及隔舱模式,通过避免级联故障,提高系统容错能力,从而实现高可用设计的一个Java服务组件库。
  • *Hystrix实现了资源隔离机制

前提介绍

Hystrix的超时检测本质上通过启动单独线程去检测的,线程的执行的时间刚好就是任务超时的时间,本质上就是这么个简单的逻辑。

Hystrix超时后会抛出一个 HystrixTimeoutException的异常。

超时检测逻辑

Hystrix的超时包括注册过程和执行过程两个,注册过程如下:

  • 执行lift(new HystrixObservableTimeoutOperator(_cmd))关联超时检测任务
  • 在HystrixObservableTimeoutOperator类中,new TimerListener()负责创建检测任务,HystrixTimer.getInstance().addTimerListener(listener)负责关联定时任务
    • 在HystrixObservableTimeoutOperator类中,addTimerListener通过java的定时任务服务scheduleAtFixedRate在延迟超时时间后执行

Hystrix的超时执行过程如下:

  1. 在超时后执行listener.tick()方法后执行类TimerListener的tick方法
  2. 在TimerListener类的tick方法中执行timeoutRunnable.run()后执行HystrixContextRunnable的run方法
  3. 在HystrixContextRunnable类run方法中执行child.onError(new HystrixTimeoutException())实现超时
  4. executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
private static class HystrixObservableTimeoutOperator implements Operator {

        final AbstractCommand originalCommand;

        public HystrixObservableTimeoutOperator(final AbstractCommand originalCommand) {
            this.originalCommand = originalCommand;
        }

        public Subscribersuper R> call(final Subscribersuper R> child) {
            final CompositeSubscription s = new CompositeSubscription();

            child.add(s);

            final HystrixRequestContext hystrixRequestContext =
                            HystrixRequestContext.getContextForCurrentThread();
            TimerListener listener = new TimerListener() {

                public void tick() {
                  if(originalCommand.isCommandTimedOut
                        .compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {

                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT,
                            originalCommand.commandKey);

                        s.unsubscribe();
                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(
                                originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });
                        timeoutRunnable.run();
                    }
                }

                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };
            final Reference tl = HystrixTimer.getInstance().addTimerListener(listener);

            originalCommand.timeoutTimer.set(tl);

            Subscriber parent = new Subscriber() {

                public void onCompleted() {
                    if (isNotTimedOut()) {

                        tl.clear();
                        child.onCompleted();
                    }
                }

                public void onError(Throwable e) {
                    if (isNotTimedOut()) {

                        tl.clear();
                        child.onError(e);
                    }
                }

                public void onNext(R v) {
                    if (isNotTimedOut()) {
                        child.onNext(v);
                    }
                }
                private boolean isNotTimedOut() {

                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED,
                                                                             TimedOutStatus.COMPLETED);
                }
            };

            s.add(parent);
            return parent;
        }
    }
    public Reference addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();

        Runnable r = new Runnable() {

            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        ScheduledFuture f = executor.get().getThreadPool().scheduleAtFixedRate(r,
                listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(),
                TimeUnit.MILLISECONDS);
            return new TimerReference(listener, f);
    }

    public class HystrixContextRunnable implements Runnable {
        private final Callable actual;
        private final HystrixRequestContext parentThreadState;
        public HystrixContextRunnable(Runnable actual) {
            this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
        }
        public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable
            actual) {
            this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
        }
        public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy,
                                  final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
            this.actual = concurrencyStrategy.wrapCallable(new Callable() {

            public Void call() throws Exception {
                actual.run();
                return null;
            }

        });
        this.parentThreadState = hystrixRequestContext;
    }

    public void run() {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {

            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);

            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {

            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }
}
复制代码

<center> <font color=#0000FF>分享资源</font>

[图片上传失败...(image-6a69f3-1691457956513)]
扫码 头像 并关注发送:资源 获取以上资源

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 开篇  Hystrix的超时检测本质上通过启动单独线程去检测的,线程的启动的时间刚好就是任务超时的时间,本质上就是...
    晴天哥_王志阅读 5,995评论 0 1
  • 在复杂的分布式应用中有着许多的依赖,各个依赖都有难免在某个时刻失败,如果应用不隔离各个依赖,降低外部的风险,那容易...
    陈二狗想吃肉阅读 4,075评论 0 3
  • 微服务知识架构:1、服务容错 2、服务监控 3、服务的框架 4、运行时期的支撑 5、服务部署 6、 服务的安全 1...
    liuwj的ing阅读 1,605评论 3 1
  • HystrixCommand在执行的过程中如何探测超时,本篇主要对此进行介绍说明。 1.主入口:executeCo...
    青芒v5阅读 12,082评论 2 6
  • 一、Hystrix解决了什么问题? 在复杂的分布式应用中有着许多的依赖,各个依赖都有难免在某个时刻失败,如果应用不...
    vivo互联网技术阅读 1,813评论 0 0