Hystrix源码解析

Hystrix简介

  在分布式系统中,难免有对外部接口的依赖,而外部接口有可能出现响应缓慢,大量请求超时,大量访问出现异常等情况。出现上面所说的情况有可能是由很多原因导制的,可能是网络抖动,外部系统有没有测出的bug,系统遭遇黑客攻击等。因为一个接口的异常,有可能导制线程阻塞,影响到其它接口的服务,甚至整个系统的服务给拖跨,对外部系统依赖的模块越多,出现的风险也就会越高,Hystrix正是用于解决这样的问题。Hystrix同样是Netflix公司开源的用于解决分布式问题而开源的框架。源码网址为:https://github.com/Netflix/Hystrix。Hystrix提供了如下几种解决方案应对上面说的问题,分别为:

  • 线程池隔离
  • 信号量隔离
  • 熔断
  • 降级回退

Hystrix 版的 Hello World

  • 在pom.xml文件里引入Hystrix依赖的类
   <dependencies>
       <dependency>
           <groupId>com.netflix.hystrix</groupId>
           <artifactId>hystrix-core</artifactId>
           <version>1.5.13</version>
       </dependency>
   </dependencies>
  • 编写业务Command
package com.ivan.client.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class HelloCommand extends HystrixCommand<String> {

   protected HelloCommand() {
       super(HystrixCommandGroupKey.Factory.asKey("test"));
   }

   @Override
   protected String run() throws Exception {
       //模拟请求外部接口需要的时间长度
       Thread.sleep(500);
       return "sucess";
   }
   
   @Override
   protected String getFallback() {
     //当外部请求超时后,会执行fallback里的业务逻辑
     System.out.println("执行了回退方法");
     return "error";
   }

}
  • 模拟系统调用
package com.ivan.client.hystrix;

public class App {
    public static void main(String[] args) {
        HelloCommand command = new HelloCommand();
        String result = command.execute();
        System.out.println(result);
    }
}

当我们增大 HelloCommand run方法里Thread.sleep()方法的时长时,我们可以看到 command.execute()方法调用返回了error。在实际的使用中,当发现第三方接口调用不通的情况下,会调用fallback方法进行降级处理,比如可以返回一段错误提示。

Hystrix线程池隔离

   在分布式的系统里,系统可能对多个外部系统都有依赖关系,比同订单系统同时对会员系统,库存系统统,优惠券系统都有依赖。假如优惠券系统出现访问异常的时候,会超成线程的堆积,对于系统调用库存系统与会员系统的业务也不可用。而通过线程池能够将不同的业务由不同的线程池处理,从而做到保护其它业务能够正常访问。下面就来看看Hystrix是根据什么来创建线程的。

  • 找到HystrixCommand的父类AbstractCommand, 里面有个构造方法,从构造方法可以看出里这里定义了 threadPool对象。代码如下,关键代码都有做相应的注释
/**
这个方法是AbstractCommand的构造方法,里面用于初使化AbstractCommand,包括circuitBreaker 与线程池对象都在这里进行构造
**/
    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
//commandGroup对象,用于组织一类业务相关的对象
        this.commandGroup = initGroupKey(group);
// commandKey默认是以类为为名称的
        this.commandKey = initCommandKey(key, getClass());
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
//这个方法里定义了TheradPool里的关键字,默认以传入的commandGroup 的name做为key的名称
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
//这里就是线程池对象啦。
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

        //Strategies from plugins
        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
        this.executionHook = initExecutionHook(executionHook);

        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

        /* fallback semaphore override if applicable */
        this.fallbackSemaphoreOverride = fallbackSemaphore;

        /* execution semaphore override if applicable */
        this.executionSemaphoreOverride = executionSemaphore;
    }

/**
这个方法用于得到HystrixThreadPoolKey 对象, Hystrix内部有大量的Key对象,可以简单理解这些  Key都是相应对象的唯一标识。从代码里可以看出,默认情况下Hystrix采用的是commandGroup 的name做为Thread Pool的key值。
**/
    private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
        if (threadPoolKeyOverride == null) {
            // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
            if (threadPoolKey == null) {
                /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
                return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
            } else {
                return threadPoolKey;
            }
        } else {
            // we have a property defining the thread-pool so use it instead
            return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
        }
    }

/**
在这里将调用具体的构造线程池的方法。
**/
    private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
        if (fromConstructor == null) {
            // get the default implementation of HystrixThreadPool
            return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
        } else {
            return fromConstructor;
        }
    }

从上面的代码分析我们知道线程池的构造最终会落到HystrixThreadPool.Factory这个类上面。这个类内存持有一个ConcurrentHashMap用于缓存线程池对象,当传入的HystrixThreadPoolKey已经构造过了相应的ThreadPool,将会直接从ConcurrentHashMap里返回已经生成的ThreadPool。如果传入的HystrixThreadPoolKey没有相应的ThreadPool,将构造新的ThreadPool并放入到ConcurrentHashMap这个缓存对象上。下面是关键代码:

static class Factory {
  final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
            // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
            String key = threadPoolKey.name();

            // this should find it for all but the first time
//这里从缓存取
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize
//这里需要保证线程安全,加上了相应的锁
            synchronized (HystrixThreadPool.class) {
                if (!threadPools.containsKey(key)) {
//具体的线程池是由HystrixThreadPoolDefault进行构造的
                    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                }
            }
            return threadPools.get(key);
        }
}

HystrixThreadPoolDefault 内部通过HystrixConcurrencyStrategy这个对象进行线程池的构造,里面根据传入的properties信息来构造线程池对象。 关键代码如下:

static class HystrixThreadPoolDefault implements HystrixThreadPool {
        private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);

        private final HystrixThreadPoolProperties properties;
        private final BlockingQueue<Runnable> queue;
        private final ThreadPoolExecutor threadPool;
        private final HystrixThreadPoolMetrics metrics;
        private final int queueSize;

        public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.queueSize = properties.maxQueueSize().get();

            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
            this.threadPool = this.metrics.getThreadPool();
            this.queue = this.threadPool.getQueue();

            /* strategy: HystrixMetricsPublisherThreadPool */
            HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
        }
}

HystrixConcurrencyStrategy 类里我们可以看到采用的我们熟悉的ThreadPoolExecutor对象来构造线程池。 里面需要传入核心线程池的大小,最大线程数,队列等关键信息。关键代码如下:

    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }

从上面代码的分析我们可以得出,线程池是以HystrixCommandGroupKey进行划分的,不同的CommandGroup有不同的线程池来处理,而这个CommandGroup在我们的分布式系统中,可以把相关的业务处理放到一个CommandGroup中。

Hystrix熔断

  熔断器,现实生活中有一个很好的类比,就是家庭电路中都会安装一个保险盒,当电流过大的时候保险盒里面的保险丝会自动断掉,来保护家里的各种电器及电路。Hystrix中的熔断器(Circuit Breaker)也是起到这样的作用,Hystrix在运行过程中会向每个CommandKey对应的熔断器报告成功、失败、超时和拒绝的状态,熔断器维护计算统计的数据,根据这些统计的信息来确定熔断器是否打开。如果打开,后续的请求都会被截断(不再执行run方法里的内容了,直接执行fallback方法里的内容)。然后会隔一段时间默认是5s,尝试半开,放入一部分流量请求进来,相当于对依赖服务进行一次健康检查,如果恢复,熔断器关闭,随后完全恢复调用。改造HelloCommand类代码如下:

package com.ivan.client.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;

public class HelloCommand extends HystrixCommand<String> {

    protected HelloCommand() {
        
        
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        //开启熔断模式
                        .withCircuitBreakerEnabled(true)
                        //出现错误的比率超过30%就开启熔断
                        .withCircuitBreakerErrorThresholdPercentage(30)
                        //至少有10个请求才进行errorThresholdPercentage错误百分比计算
                        .withCircuitBreakerRequestVolumeThreshold(10)
                        //半开试探休眠时间,这里设置为3秒
                        .withCircuitBreakerSleepWindowInMilliseconds(3000)
                        )
                );
        
    }

    @Override
    protected String run() throws Exception {
        //模拟外部请求需要的时间长度
        System.out.println("执行了run方法");
        Thread.sleep(2000);
        return "sucess";
    }
    
    @Override
    protected String getFallback() {
      //当外部请求超时后,会执行fallback里的业务逻辑
      System.out.println("执行了回退方法");
      return "error";
    }

}

改造App的代码,通过执行30次请求,可以看出刚开始的时候(前10次请求)会执行run方法里的逻辑,一量熔断器打开后,将不执行run方法里的内容,而是直接执行getFallback方法里的逻辑,直到过了设置的3秒后才又有流量执行run方法的逻辑。改造App后的代码如下:

package com.ivan.client.hystrix;

public class App {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 30; i++) {
            HelloCommand command = new HelloCommand();
            String result = command.execute();
            System.out.println("circuit Breaker is open : " + command.isCircuitBreakerOpen());
            if(command.isCircuitBreakerOpen()){
                Thread.currentThread().sleep(500);
            }
        }
    }
}

Hystrix熔断源码分析

  找到AbstractCommand类的initCircuitBreaker方法,这是熔断器的构造方法入口。首先判断是否打开了熔断器,只有在打开了熔断器后才会通过HystrixCircuitBreaker.Factory工厂新建一个熔断器,源码如下:

    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        if (enabled) {
            if (fromConstructor == null) {
                // get the default implementation of HystrixCircuitBreaker
                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
            } else {
                return fromConstructor;
            }
        } else {
            return new NoOpCircuitBreaker();
        }
    }

HystrixCircuitBreaker.Factory 类里对熔断器根据CommandKey进行了缓存,如果存在直接取缓存里的key,不存在则新建HystrixCircuitBreakerImpl对象用于熔断操作。源代码如下:

    class Factory {
      //circuitBreakersByCommand 是个ConcurrentHashMap, 这里缓存了系统的所有熔断器
        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            // this should find it for all but the first time
  //先从缓存里取
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize

            // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
            // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
            // If 2 threads hit here only one will get added and the other will get a non-null response instead.
 //取不到对象才会创建个HystrixCircuitBreakerImpl对象并放入缓存Map中
            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) {
                // this means the putIfAbsent step just created a new one so let's retrieve and return it
                return circuitBreakersByCommand.get(key.name());
            } else {
                // this means a race occurred and while attempting to 'put' another one got there before
                // and we instead retrieved it and will now return it
                return cbForCommand;
            }
        }

        /**
         * Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey} or null if none exists.
         * 
         * @param key
         *            {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}
         * @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}
         */
        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
            return circuitBreakersByCommand.get(key.name());
        }

        /**
         * Clears all circuit breakers. If new requests come in instances will be recreated.
         */
        /* package */static void reset() {
            circuitBreakersByCommand.clear();
        }
    }

  HystrixCircuitBreakerImpl 这个类里定义了一个状态变量,断路由有三种状态 ,分别为关闭,打开,半开状态。重点关注下allowRequest方法,在allowRequest里首先判断forceOpen属性是否打开,如果打开则不允许有请求进入,然后forceClosed属性,如果这个属性为true,刚对所有的求求放行,相当于熔断器不起作用。之后就是状态判断了。isAfterSleepWindow()方法用于放行超过了指定时间后的流量,。具体代码如下,关键部分有相应的注释:

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;
  //三种状态通过枚举来定义
        enum Status {
            CLOSED, OPEN, HALF_OPEN;
        }
//状态变时,默认是关闭的状态
        private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
//最后一次访问的时间,用于试探请求是否恢复
        private final AtomicLong circuitOpened = new AtomicLong(-1);
        private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;

            //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
            Subscription s = subscribeToStream();
            activeSubscription.set(s);
        }

        private Subscription subscribeToStream() {
            /*
             * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
             */
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(HealthCounts hc) {
                            // check if we are past the statisticalWindowVolumeThreshold
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                // we are not past the minimum volume threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    //we are not past the minimum error threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    // our failure rate is too high, we need to set the state to OPEN
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }
//将熔断器置于关闭状态,并重置统计数据
        @Override
        public void markSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                //This thread wins the race to close the circuit - it resets the stream to start it over from 0
                metrics.resetStream();
                Subscription previousSubscription = activeSubscription.get();
                if (previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }
                Subscription newSubscription = subscribeToStream();
                activeSubscription.set(newSubscription);
                circuitOpened.set(-1L);
            }
        }

//将熔断器置于打开状态
        @Override
        public void markNonSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
                circuitOpened.set(System.currentTimeMillis());
            }
        }
//用于判断熔断器是否打开
        @Override
        public boolean isOpen() {
            if (properties.circuitBreakerForceOpen().get()) {
                return true;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return false;
            }
            return circuitOpened.get() >= 0;
        }
//用于判断是否放行流量
        @Override
        public boolean allowRequest() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
//第一次请求肯定就放行了
            if (circuitOpened.get() == -1) {
                return true;
            } else {
//半开状态将不放行
                if (status.get().equals(Status.HALF_OPEN)) {
                    return false;
                } else {
                    return isAfterSleepWindow();
                }
            }
        }

//根据当前时间与最后一次请求的时候进行比较,当超过了设置的SleepWindowInMilliseconds,将放行请求用于试探服务访问是否OK
        private boolean isAfterSleepWindow() {
            final long circuitOpenTime = circuitOpened.get();
            final long currentTime = System.currentTimeMillis();
            final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
            return currentTime > circuitOpenTime + sleepWindowTime;
        }

//用于试探服务是否OK的方法
        @Override
        public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    //only the first request after sleep window should execute
                    //if the executing command succeeds, the status will transition to CLOSED
                    //if the executing command fails, the status will transition to OPEN
                    //if the executing command gets unsubscribed, the status will transition to OPEN
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
    }

Hystrix降级处理

  所谓降级,就是指在在Hystrix执行非核心链路功能失败的情况下,我们如何处理,比如我们返回默认值等。如果我们要回退或者降级处理,代码上需要实现HystrixCommand.getFallback()方法或者是HystrixObservableCommand. resumeWithFallback()。

Hystrix与Spring Cloud整合

在实际项目的开发中,都会用到Fegin,所以这里的集成是在Feign的基础上进行的。

  • 首先还是需要引入包,pom.xml文件如下:
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Edgware.SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-ribbon</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-feign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix</artifactId>
        </dependency>
    </dependencies>
  • 启动类需要加上EnableCircuitBreaker注解代码如下:
package com.ivan.client.feign;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@EnableCircuitBreaker
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

  • FeignClient 注解需有fallback属性,属性的值是个class,这个 class在是断路器打开后,会执行的业务逻辑,一般在项目里返回一个默认值。这个类需要实现与FeignClient注释上相同的接口
package com.ivan.client.feign.service;

import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import com.ivan.client.feign.entity.User;
import com.ivan.client.feign.hystrix.fallback.UserServiceFallback;


@FeignClient(value="provider", fallback=UserServiceFallback.class)
public interface UserService {

    @RequestMapping(method = RequestMethod.GET, value = "/user/{id}")
    public User getUser(@PathVariable("id") Integer id);

    @RequestMapping(method = RequestMethod.POST, value = "/user/create", consumes = MediaType.APPLICATION_JSON_VALUE)
    public User create(User user);

}
  • fallback类的代码如下:
package com.ivan.client.feign.hystrix.fallback;

import org.springframework.stereotype.Component;

import com.ivan.client.feign.entity.User;
import com.ivan.client.feign.service.UserService;

@Component
public class UserServiceFallback implements UserService {

    public User getUser(Integer id) {
        System.out.println(Thread.currentThread().getName());
        System.out.println("=====执行到了fallback方法=======");
        User user = new User();
        user.setId(0);
        return user;
    }

    public User create(User user) {
        // TODO Auto-generated method stub
        return null;
    }

}

  • application.properties文件里需要把feign.hystrix.enabled=true 这个属性打开。配置文件如下:
server.port=9000
spring.application.name=consumer-feign-hystrix
eureka.instance.hostname=localhost
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
spring.cloud.circuit.breaker.enabled=true

ribbon.ReadTimeout=5000

feign.hystrix.enabled=true
#command相关
hystrix.command.default.execution.isolation.strategy=THREAD
#设置调用者的超时时间
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=6000
#是否开启超时设置
hystrix.command.default.execution.timeout.enabled=true
#表示是否在执行超时时,中断HystrixCommand.run() 的执行
hystrix.command.default.execution.isolation.thread.interruptOnTimeout=true

#fallback相关
#是否开启fallback功能
hystrix.command.default.fallback.enabled=true

#断路器相关
#是否开启断路器
hystrix.command.default.circuitBreaker.enabled=true
#窗口时间内打开断路器最小的请求量
hystrix.command.default.circuitBreaker.requestVolumeThreshold=5
#断路器跳闸后,在此值的时间的内,hystrix会拒绝新的请求,只有过了这个时间断路器才会打开闸门
hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds=5
#失败百分比的阈值
hystrix.command.default.circuitBreaker.errorThresholdPercentage=20

#线程相关配置
#核心线程数
hystrix.threadpool.default.coreSize=5
#最大线程数
hystrix.threadpool.default.maximumSize=5
#队列的大小
hystrix.threadpool.default.maxQueueSize=1024
#因为maxQueueSize值不能被动态修改,所有通过设置此值可以实现动态修改等待队列长度。即等待的队列的数量大于queueSizeRejectionThreshold时(但是没有达到maxQueueSize值),则开始拒绝后续的请求进入队列
hystrix.threadpool.default.queueSizeRejectionThreshold=128
#设置线程多久没有服务后,需要释放(maximumSize-coreSize )个线程
hystrix.threadpool.default.keepAliveTimeMinutes=60

上面的属性基本上包括了大部分会在项目中使用的属性,有以下几点需要重点关注一下:

  • 上面属性的default可以改成ComandKey,这样就可以对特定的接口进行配置了,Feign中CommandKey的值为:接口名#方法名(参数类型),如上的CommandKey为UserService#getUser(Integer)
  • 在测试hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds 属性的时候,服务端如果在指定的时间返回了结果,但系统还是调用了fallback里的逻辑,需要指定ribbon.ReadTimeout的时间。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容