Ribbon源码笔记

Spring官网的Ribbon文档
GitHub关于Ribbon组件的描述
生产环境的Ribbon一般都会搭配Feign和Eureka来使用,但对于这种使用场景分析需要同时熟悉Ribbon对静态服务如何做负载均衡、Feign如何调用静态服务以及Eureka Client如何缓存Eureka Server的服务。Eureka在前面已经分析过了,本篇记录Ribbon如何对静态服务做负载均衡,待后面分析了Fiegn对静态服务的调用后,便可将Ribbon、Feign和Eureka融合在一起分析做完整地记录。

一 Maven依赖

        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>Greenwich.SR3</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
        </dependency>

二 自动配置类

Ribbon的自动配置类是RibbonAutoConfiguration,例举出两个重要的Bean

  • SpringClientFactory
  • RibbonClientHttpRequestFactory
  • RestTemplateCustomizer

自动配置类中的Bean不多,是因为SpringClientFactory还内置了一个配置类RibbonClientConfiguration,该配置类是以服务提供方为维度生效的,并不是以单例形式托管给当前调用方服务的容器,后面会详细说明,这里列举出其中比较重要的配置Bean

  • IClientConfig
  • IRule
  • IPing
  • ServerList
  • ServerListUpdater
  • ILoadBalancer
  • RibbonLoadBalancerContext
  • RetryHandler
  • RestClient

三 Bean释义

自动配置类RibbonAutoConfiguration中的Bean

  • SpringClientFactory
    托管给容器的一个Bean,内部还声明了另一个重要的配置类。主要用于从服务提供方容器中获取各个组件
    @Bean
    public SpringClientFactory springClientFactory() {
        SpringClientFactory factory = new SpringClientFactory();
        factory.setConfigurations(this.configurations);
        return factory;
    }
  • RibbonClientHttpRequestFactory
    内部类RibbonClientHttpRequestFactoryConfiguration中配置的Bean,一个工厂类用来生产客户端的HttpRequest
        @Bean
        public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
            return new RibbonClientHttpRequestFactory(this.springClientFactory);
        }
  • RestTemplateCustomizer
    自定义的匿名RestTemplateCustomizer,并且将Spring内置的RestTemplate的RequestFactory设置为RibbonClientHttpRequestFactory
        @Bean
        public RestTemplateCustomizer restTemplateCustomizer(
                final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
            return restTemplate -> restTemplate
                    .setRequestFactory(ribbonClientHttpRequestFactory);
        }

        // lambda表达式可能不太好看,转化为传统匿名类的形式方便理解
        @Bean
        public RestTemplateCustomizer restTemplateCustomizer(
                final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
    
            return new RestTemplateCustomizer() {
    
                @Override
                public void customize(RestTemplate restTemplate) {
                    restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);
                }
            };
        }

内置配置类RibbonClientConfiguration的Bean,这些Bean作为默认选项都有用@ConditionalOnMissingBean修饰,支持自定义替换

  • IClientConfig
    当前服务作为调用方的配置信息,loadProperties将客户端的所有配置的以及默认的配置信息都保存在了DefaultClientConfigImpl内的properties属性上
    @Bean
    @ConditionalOnMissingBean
    public IClientConfig ribbonClientConfig() {
        DefaultClientConfigImpl config = new DefaultClientConfigImpl();
        config.loadProperties(this.name);
        config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
        config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
        config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
        return config;
    }
  • IRule
    当前服务调用其他服务时载均衡的规则。比如随机、轮询、权重及最少并发等,这里配置的是根据区域和可用性过滤规则
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }
  • IPing
    类似于心跳,用来判断被服务提供方的存活情况
    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, name)) {
            return this.propertiesFactory.get(IPing.class, config, name);
        }
        return new DummyPing();
    }
  • ServerList
    服务提供方的集合
    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerList<Server> ribbonServerList(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerList.class, name)) {
            return this.propertiesFactory.get(ServerList.class, config, name);
        }
        ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
        serverList.initWithNiwsConfig(config);
        return serverList;
    }
  • ServerListUpdater
    动态更新服务提供方的节点信息
    @Bean
    @ConditionalOnMissingBean
    public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
        return new PollingServerListUpdater(config);
    }
  • ILoadBalancer
    真正的负载均衡器
    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }
  • RibbonLoadBalancerContext
    封装了负载均衡器的上下文
    @Bean
    @ConditionalOnMissingBean
    public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
            IClientConfig config, RetryHandler retryHandler) {
        return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
    }
  • RetryHandler
    重试策略,默认的只有简单的异常处理,真正生产环境使用都需要根据业务场景自定义重试策略
    @Bean
    @ConditionalOnMissingBean
    public RetryHandler retryHandler(IClientConfig config) {
        return new DefaultLoadBalancerRetryHandler(config);
    }
  • RestClient
    RestClient并不是配置类中的Bean,而是由于配置类中导入了另外的配置类RestClientRibbonConfiguration,它才会被托管给容器,默认的实现是RibbonClientConfiguration的内部类OverrideRestClient
    @Bean
    @Lazy
    @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    public RestClient ribbonRestClient(IClientConfig config, ILoadBalancer loadBalancer,
            ServerIntrospector serverIntrospector, RetryHandler retryHandler) {
        RestClient client = new RibbonClientConfiguration.OverrideRestClient(config,
                serverIntrospector);
        client.setLoadBalancer(loadBalancer);
        client.setRetryHandler(retryHandler);
        return client;
    }

Ribbon作为客户端负载均衡,必然要自己缓存服务端的多个节点信息(同一服务的不同节点),定时去探测节点是否存活,在每次发起调用时,根据配置的负载均衡规则,完成对不同节点的调用。在调用异常后,根据异常信息决定是否要重试

四 Ribbon启动

  1. 客户端服务启动后,BeanSpringClientFactory被实例化
  2. 在其构造器中显式的将内置配置类RibbonClientConfiguration设置到父类NamedContextFactory中,保存在属性defaultConfigType上
  3. RibbonAutoConfiguration中的其他Bean也将被创建托管给容器
    @Bean
    public SpringClientFactory springClientFactory() {
        SpringClientFactory factory = new SpringClientFactory();
        factory.setConfigurations(this.configurations);
        return factory;
    }
    // 类SpringClientFactory
    public SpringClientFactory() {
        super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
    }
    // 父类NamedContextFactory
    public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName,
            String propertyName) {
        this.defaultConfigType = defaultConfigType;
        this.propertySourceName = propertySourceName;
        this.propertyName = propertyName;
    }

五 创建服务提供方IOC容器

由于是对静态服务做负载均衡,服务的节点信息直接配置在配置文件中。官网7.6里有说明,这里贴一下格式

stores:
  ribbon:
    listOfServers: example.com,google.com

用配置的静态服务名替换ip端口后(例如:http://example.com/getUser?id=1替换后为http://stores/getUser?id=1),使用RestTemplate对替换后的url多次发起调用,会看到负载均衡有生效(调用方式:restTemplate.getForObject(url, Stores.class)

  1. 使用RestTemplate的getForObject方法发起调用,调用链依次为RestTemplate的 getForObject() > execute() > doExecute()
  2. createRequest创建了客户端请求的Request
  3. execute完成了对配置的静态服务的调用
    @Nullable
    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

        Assert.notNull(url, "URI is required");
        Assert.notNull(method, "HttpMethod is required");
        ClientHttpResponse response = null;
        try {
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            response = request.execute();
            handleResponse(url, method, response);
            return (responseExtractor != null ? responseExtractor.extractData(response) : null);
        }
        catch (IOException ex) {
            String resource = url.toString();
            String query = url.getRawQuery();
            resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
            throw new ResourceAccessException("I/O error on " + method.name() +
                    " request for \"" + resource + "\": " + ex.getMessage(), ex);
        }
        finally {
            if (response != null) {
                response.close();
            }
        }
    }

在创建客户端请求时,需要从clientFactory中获取部分配置组件,封装为客户端请求RibbonHttpRequest返回

  1. createRequest是父类HttpAccessor中的方法
  2. getRequestFactory返回的是RibbonClientHttpRequestFactory,所以createRequest来到了RibbonClientHttpRequestFactory中(这是在上面实例化BeanRestTemplateCustomizer时设置过的)
  3. this.clientFactory.getClientConfig(serviceId)中的serviceId是服务提供方的服务名(例如示例中的stores),clientFactory是当前服务启动时托管给容器的BeanSpringClientFactory,正是在这里完成了以服务提供方为维度的子IOC容器的创建
  4. 封装RibbonHttpRequest返回
    // HttpAccessor
    protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
        ClientHttpRequest request = getRequestFactory().createRequest(url, method);
        if (logger.isDebugEnabled()) {
            logger.debug("HTTP " + method.name() + " " + url);
        }
        return request;
    }

    // RibbonClientHttpRequestFactory
    public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod)
            throws IOException {
        String serviceId = originalUri.getHost();
        if (serviceId == null) {
            throw new IOException(
                    "Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
        }
        IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
        RestClient client = this.clientFactory.getClient(serviceId, RestClient.class);
        HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name());

        return new RibbonHttpRequest(originalUri, verb, client, clientConfig);
    }

SpringClientFactory中的getClientConfig方法获取配置时,从SpringClientFactory类调用到父类NamedContextFactory,获取配置实际是从name对应的服务提供方子容器中获取,对于不存在的子容器,会尝试创建并缓存在调用方服务内部。获取配置的调用链依次为: getClientConfig() > getInstance() > 父类getInstance()

  1. name是服务提供方的服务名,type是要获取的实例类型。父类getInstance中获取实例时,会先获取服务提供方的上下文
  2. 当前服务作为调用方,如果没有缓存该服务提供方的上下文,会尝试创建服务提供方的上下文并随之缓存起来
  3. 这些以服务提供方为维度的上下文,也就是子IOC容器,类型均为AnnotationConfigApplicationContext
  4. 接着将上述SpringClientFactory往NamedContextFactory中设置的内置配置类RibbonClientConfiguration 注册到子IOC容器中
  5. 设置当前调用方容器作为父容器后,调用子容器的refresh方法,以解析该配置类,进而配置类中的Bean会托管给对应的子容器
    public <T> T getInstance(String name, Class<T> type) {
        AnnotationConfigApplicationContext context = getContext(name);
        if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
                type).length > 0) {
            return context.getBean(type);
        }
        return null;
    }

    protected AnnotationConfigApplicationContext getContext(String name) {
        if (!this.contexts.containsKey(name)) {
            synchronized (this.contexts) {
                if (!this.contexts.containsKey(name)) {
                    this.contexts.put(name, createContext(name));
                }
            }
        }
        return this.contexts.get(name);
    }

    protected AnnotationConfigApplicationContext createContext(String name) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        if (this.configurations.containsKey(name)) {
            for (Class<?> configuration : this.configurations.get(name)
                    .getConfiguration()) {
                context.register(configuration);
            }
        }
        for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
            if (entry.getKey().startsWith("default.")) {
                for (Class<?> configuration : entry.getValue().getConfiguration()) {
                    context.register(configuration);
                }
            }
        }
        context.register(PropertyPlaceholderAutoConfiguration.class,
                this.defaultConfigType);
        context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
                this.propertySourceName,
                Collections.<String, Object>singletonMap(this.propertyName, name)));
        if (this.parent != null) {
            // Uses Environment from parent as well as beans
            context.setParent(this.parent);
            // jdk11 issue
            // https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
            context.setClassLoader(this.parent.getClassLoader());
        }
        context.setDisplayName(generateDisplayName(name));
        context.refresh();
        return context;
    }

六 Ribbon Ping机制

子容器refresh时,会将内置配置类的Bean全部实例化,其中包括ILoadBalancer,实现类为ZoneAwareLoadBalancer,调用链依次为:ZoneAwareLoadBalancer构造器 > 父类DynamicServerListLoadBalancer构造器 > 父类BaseLoadBalancer构造器 > initWithConfig()

  1. setRule设置了负载均衡规则,如果未配置默认为RoundRobinRule
  2. setPing设置了Ping规则,当前默认配置为DummyPing,意味着跳过开启Ping功能
  3. 如果需要开启Ping功能,BaseLoadBalancer中内置的Timer会开启任务,默认每10S执行一次Ping
  4. 具体的Ping功能在内部类SerialPingStrategy的方法pingServers中执行,但最终还是调用配置的BeanIPing的isAlive方法
  5. 举例看下实现类PingUrl中的isAlive方法,可以看到是用Apache的Http发送了Get请求,判断响应状态码是否为200,以最终确定服务是否存活
    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }
    // BaseLoadBalancer类
    public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
        initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
    }

    void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
        this.config = clientConfig;
        String clientName = clientConfig.getClientName();
        this.name = clientName;
        int pingIntervalTime = Integer.parseInt(""
                + clientConfig.getProperty(
                        CommonClientConfigKey.NFLoadBalancerPingInterval,
                        Integer.parseInt("30")));
        int maxTotalPingTime = Integer.parseInt(""
                + clientConfig.getProperty(
                        CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
                        Integer.parseInt("2")));

        setPingInterval(pingIntervalTime);
        setMaxTotalPingTime(maxTotalPingTime);

        // cross associate with each other
        // i.e. Rule,Ping meet your container LB
        // LB, these are your Ping and Rule guys ...
        setRule(rule);
        setPing(ping);

        setLoadBalancerStats(stats);
        rule.setLoadBalancer(this);
        if (ping instanceof AbstractLoadBalancerPing) {
            ((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
        }
        logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
        boolean enablePrimeConnections = clientConfig.get(
                CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);

        if (enablePrimeConnections) {
            this.setEnablePrimingConnections(true);
            PrimeConnections primeConnections = new PrimeConnections(
                    this.getName(), clientConfig);
            this.setPrimeConnections(primeConnections);
        }
        init();

    }

    public void setPing(IPing ping) {
        if (ping != null) {
            if (!ping.equals(this.ping)) {
                this.ping = ping;
                setupPingTask(); // since ping data changed
            }
        } else {
            this.ping = null;
            // cancel the timer task
            lbTimer.cancel();
        }
    }

    void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

    private boolean canSkipPing() {
        if (ping == null
                || ping.getClass().getName().equals(DummyPing.class.getName())) {
            // default ping, no need to set up timer
            return true;
        } else {
            return false;
        }
    }
    // PingUrl类
    public boolean isAlive(Server server) {
            String urlStr   = "";
            if (isSecure){
                urlStr = "https://";
            }else{
                urlStr = "http://";
            }
            urlStr += server.getId();
            urlStr += getPingAppendString();

            boolean isAlive = false;

            HttpClient httpClient = new DefaultHttpClient();
            HttpUriRequest getRequest = new HttpGet(urlStr);
            String content=null;
            try {
                HttpResponse response = httpClient.execute(getRequest);
                content = EntityUtils.toString(response.getEntity());
                isAlive = (response.getStatusLine().getStatusCode() == 200);
                if (getExpectedContent()!=null){
                    LOGGER.debug("content:" + content);
                    if (content == null){
                        isAlive = false;
                    }else{
                        if (content.equals(getExpectedContent())){
                            isAlive = true;
                        }else{
                            isAlive = false;
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                // Release the connection.
                getRequest.abort();
            }

            return isAlive;
    }

七 Ribbon缓存服务

父类BaseLoadBalancer的构造器执行完成后,回到父类DynamicServerListLoadBalancer的构造器中,restOfInit()内完成了对配置的静态服务的缓存以及开启周期性更新服务的任务

  1. restOfInit中enableAndInitLearnNewServersFeature方法,使用配置类中默认的ServerListUpdater执行默认的任务
  2. 默认的Updater是PollingServerListUpdater,默认任务是ServerListUpdater.UpdateAction
  3. UpdateAction的doUpdate方法,最终调用的是DynamicServerListLoadBalancer的方法updateListOfServers
  4. updateListOfServers中用配置的BeanServerList来获取服务提供方,这里是配置类中的静态服务配置类ConfigurationBasedServerList
  5. 静态配置类会从配置文件中解析出服务列表,最后在updateAllServerList尝试更新
  6. 最终的服务是缓存在父类BaseLoadBalancer中,保存在属性allServerList上
    // DynamicServerListLoadBalancer类
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
    // PollingServerListUpdater类
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }
    // ConfigurationBasedServerList类
    @Override
    public List<Server> getUpdatedListOfServers() {
        String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
        return derive(listOfServers);
    }

    protected List<Server> derive(String value) {
        List<Server> list = Lists.newArrayList();
        if (!Strings.isNullOrEmpty(value)) {
            for (String s: value.split(",")) {
                list.add(new Server(s.trim()));
            }
        }
        return list;
    }

八 Ribbon负载均衡

服务提供方的子容器初始化完成后,一系列Bean也已经完成实例化并托管给容器,RestTemplate拿到需要的request后开始发起对服务方的请求(request为配置的BeanRibbonHttpRequest

  1. RestTemplate执行从父类AbstractClientHttpRequest继承的execute方法调用了自身的executeInternal方法
  2. executeInternal方法中执行BeanRestClient的executeWithLoadBalancer方法
  3. RestClient是配置类RibbonClientConfiguration中的内部类OverrideRestClient
  4. 执行的是父类AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法
  5. 使用buildLoadBalancerCommand构造LoadBalancerCommand后,最终执行的是LoadBalancerCommand的submit方法
    @Override
    protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
        try {
            addHeaders(headers);
            if (outputStream != null) {
                outputStream.close();
                builder.entity(outputStream.toByteArray());
            }
            HttpRequest request = builder.build();
            HttpResponse response = client.executeWithLoadBalancer(request, config);
            return new RibbonHttpResponse(response);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }
    // AbstractLoadBalancerAwareClient类
    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }

这里使用了观察者模式,直观的阅读代码不是很好理解。总体流程是:先从缓存的服务中,根据负载均衡规则IRule选中了适合的服务提供方实例,Ribbon内置的RestClient使用jersey完成对选中实例的Http调用,对于失败的请求,会根据配置的重试策略以决定是否要重试

    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
        
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    public Observable<T> call(Server server) {
                        context.setServer(server);
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
                        
                        // Called for each attempt and retry
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        context.incAttemptCount();
                                        loadBalancerContext.noteOpenConnection(stats);
                                        
                                        if (listenerInvoker != null) {
                                            try {
                                                listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                            } catch (AbortExecutionException e) {
                                                return Observable.error(e);
                                            }
                                        }
                                        
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                        
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            @Override
                                            public void onCompleted() {
                                                recordStats(tracer, stats, entity, null);
                                                // TODO: What to do if onNext or onError are never called?
                                            }

                                            @Override
                                            public void onError(Throwable e) {
                                                recordStats(tracer, stats, null, e);
                                                logger.debug("Got error {} when executed on server {}", e, server);
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                }
                                            }

                                            @Override
                                            public void onNext(T entity) {
                                                this.entity = entity;
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                }
                                            }                            
                                            
                                            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                tracer.stop();
                                                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                            }
                                        });
                                    }
                                });
                        
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
            
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }
  1. selectServer中选出了合适服务提供方节点
  2. 这里的loadBalancerContext是子容器中配置的RibbonLoadBalancerContext,实际调用的是父类LoadBalancerContext的方法getServerFromLoadBalancer
  3. 在构造RibbonLoadBalancerContext上下文时,已经将ILoadBalancer设置到LoadBalancerContext中。这里可以拿到ILoadBalancer调用其chooseServer方法以选择服务
  4. chooseServer在ILoadBalancer的父类BaseLoadBalancer中,但最终还是调用了IRule的choose方法
  5. 这里以实现类RandomRule为例,可以看到起负载均衡策略是从所有的服务提供方实例中随机选择实例
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
    // LoadBalancerContext类
    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
        String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }
        if (original != null) {
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // Various Supported Cases
        // The loadbalancer to use and the instances it has is based on how it was registered
        // In each of these cases, the client might come in using Full Url or Partial URL
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                host = svc.getHost();
                if (host == null){
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                return svc;
            } else {
                // No Full URL - and we dont have a LoadBalancer registered to
                // obtain a server
                // if we have a vipAddress that came with the registration, we
                // can use that else we
                // bail out
                if (vipAddresses != null && vipAddresses.contains(",")) {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            "Method is invoked for client " + clientName + " with partial URI of ("
                            + original
                            + ") with no load balancer configured."
                            + " Also, there are multiple vipAddresses and hence no vip address can be chosen"
                            + " to complete this partial uri");
                } else if (vipAddresses != null) {
                    try {
                        Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
                        host = hostAndPort.first();
                        port = hostAndPort.second();
                    } catch (URISyntaxException e) {
                        throw new ClientException(
                                ClientException.ErrorType.GENERAL,
                                "Method is invoked for client " + clientName + " with partial URI of ("
                                + original
                                + ") with no load balancer configured. "
                                + " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
                    }
                } else {
                    throw new ClientException(
                            ClientException.ErrorType.GENERAL,
                            this.clientName
                            + " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
                            + " Also has no vipAddress registered");
                }
            }
        } else {
            // Full URL Case
            // This could either be a vipAddress or a hostAndPort or a real DNS
            // if vipAddress or hostAndPort, we just have to consult the loadbalancer
            // but if it does not return a server, we should just proceed anyways
            // and assume its a DNS
            // For restClients registered using a vipAddress AND executing a request
            // by passing in the full URL (including host and port), we should only
            // consult lb IFF the URL passed is registered as vipAddress in Discovery
            boolean shouldInterpretAsVip = false;

            if (lb != null) {
                shouldInterpretAsVip = isVipRecognized(original.getAuthority());
            }
            if (shouldInterpretAsVip) {
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc != null){
                    host = svc.getHost();
                    if (host == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Invalid Server for :" + svc);
                    }
                    logger.debug("using LB returned Server: {} for request: {}", svc, original);
                    return svc;
                } else {
                    // just fall back as real DNS
                    logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
                }
            } else {
                // consult LB to obtain vipAddress backed instance given full URL
                //Full URL execute request - where url!=vipAddress
                logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
            }
        }
        // end of creating final URL
        if (host == null){
            throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
        }
        // just verify that at this point we have a full URL

        return new Server(host, port);
    }
    // BaseLoadBalancer类
    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
    // RandomRule类
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            if (Thread.interrupted()) {
                return null;
            }
            List<Server> upList = lb.getReachableServers();
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();
            if (serverCount == 0) {
                /*
                 * No servers. End regardless of pass, because subsequent passes
                 * only get more restrictive.
                 */
                return null;
            }

            int index = chooseRandomInt(serverCount);
            server = upList.get(index);

            if (server == null) {
                /*
                 * The only time this should happen is if the server list were
                 * somehow trimmed. This is a transient condition. Retry after
                 * yielding.
                 */
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            // Shouldn't actually happen.. but must be transient or a bug.
            server = null;
            Thread.yield();
        }

        return server;

    }

    protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }

九 Ribbon请求服务端

AbstractLoadBalancerAwareClient
根据负载均衡策略拿到服务提供方信息后,Ribbon准备开始向服务端发起请求,该请求是由Jersey实现。

  1. 拿到服务端信息后,在AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)中发起对服务端的请求
  2. this是配置的BeanRestClientOverrideRestClient,execute方法继承自父类RestClient
  3. 父类RestClient的execute方法中,使用了Jersey来完成对服务端资源的访问
    @Override
    public HttpResponse execute(HttpRequest task, IClientConfig requestConfig) throws Exception {
        IClientConfig config = (requestConfig == null) ? task.getOverrideConfig() : requestConfig;
        return execute(task.getVerb(), task.getUri(),
                task.getHeaders(), task.getQueryParams(), config, task.getEntity());
    }

    private HttpResponse execute(HttpRequest.Verb verb, URI uri,
            Map<String, Collection<String>> headers, Map<String, Collection<String>> params,
            IClientConfig overriddenClientConfig, Object requestEntity) throws Exception {
        HttpClientResponse thisResponse = null;
        boolean bbFollowRedirects = bFollowRedirects;
        // read overriden props
        if (overriddenClientConfig != null
                // set whether we should auto follow redirects
                && overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects)!=null){
            // use default directive from overall config
            Boolean followRedirects = Boolean.valueOf(""+overriddenClientConfig.getProperty(CommonClientConfigKey.FollowRedirects, bFollowRedirects));
            bbFollowRedirects = followRedirects.booleanValue();
        }
        restClient.setFollowRedirects(bbFollowRedirects);

        if (logger.isDebugEnabled()) {
            logger.debug("RestClient sending new Request(" + verb
                    + ": ) " + uri);
        }


        WebResource xResource = restClient.resource(uri.toString());
        if (params != null) {
            for (Map.Entry<String, Collection<String>> entry: params.entrySet()) {
                String name = entry.getKey();
                for (String value: entry.getValue()) {
                    xResource = xResource.queryParam(name, value);
                }
            }
        }
        ClientResponse jerseyResponse;

        Builder b = xResource.getRequestBuilder();

        if (headers != null) {
            for (Map.Entry<String, Collection<String>> entry: headers.entrySet()) {
                String name = entry.getKey();
                for (String value: entry.getValue()) {
                    b = b.header(name, value);
                }
            }
        }
        Object entity = requestEntity;
        
        switch (verb) {
        case GET:
            jerseyResponse = b.get(ClientResponse.class);
            break;
        case POST:
            jerseyResponse = b.post(ClientResponse.class, entity);
            break;
        case PUT:
            jerseyResponse = b.put(ClientResponse.class, entity);
            break;
        case DELETE:
            jerseyResponse = b.delete(ClientResponse.class);
            break;
        case HEAD:
            jerseyResponse = b.head();
            break;
        case OPTIONS:
            jerseyResponse = b.options(ClientResponse.class);
            break;
        default:
            throw new ClientException(
                    ClientException.ErrorType.GENERAL,
                    "You have to one of the REST verbs such as GET, POST etc.");
        }

        thisResponse = new HttpClientResponse(jerseyResponse, uri, overriddenClientConfig);
        if (thisResponse.getStatus() == 503){
            thisResponse.close();
            throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED);
        }
        return thisResponse;
    }

十 Ribbon重试机制

重试机制包括:在同一实例上重试的次数、不同实例重试的次数、是否重试以及在何种条件下重试。当配置了Ribbon的重试机制后,Ribbon在对实例发起请求失败后,会根据重试策略以决定是在同一实例上重试还是根据负载策略换实例重试,不会超过限制的重试次数。重试策略的执行在上述LoadBalancerCommand的retryPolicy方法内

  1. 在retryPolicy方法内,调用了retryHandler的isRetriableException方法以决定是否要重试
  2. 默认的RequestSpecificRetryHandler中,GET请求都会重试,否则只有在连接超时的情况下才会重试
    private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
        return new Func2<Integer, Throwable, Boolean>() {
            @Override
            public Boolean call(Integer tryCount, Throwable e) {
                if (e instanceof AbortExecutionException) {
                    return false;
                }

                if (tryCount > maxRetrys) {
                    return false;
                }
                
                if (e.getCause() != null && e instanceof RuntimeException) {
                    e = e.getCause();
                }
                
                return retryHandler.isRetriableException(e, same);
            }
        };
    }

    @Override
    public boolean isRetriableException(Throwable e, boolean sameServer) {
        if (okToRetryOnAllErrors) {
            return true;
        } 
        else if (e instanceof ClientException) {
            ClientException ce = (ClientException) e;
            if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
                return !sameServer;
            } else {
                return false;
            }
        } 
        else  {
            return okToRetryOnConnectErrors && isConnectionException(e);
        }
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349