spring cloud ribbon学习三:负载均衡器ILoadBalancer接口及其实现

看负载均衡器这源码,好绕,看的好累。

虽然Spring Cloud中定义了LoadBalancerClient作为负载均衡器的通用接口,并且针对Ribbon实现了RibbonLoadBalancerClient,但是它作为具体实现客户端负载均衡时,是通过Ribbon的com.netflix.loadbalancer.ILoadBalancer接口实现的。

总结一下:
ILoadBalancer接口实现类做了以下的一些事情:
1.维护了存储服务实例Server对象的二个列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单
2.初始化得到可用的服务列表,启动定时任务去实时的检测服务列表中的服务的可用性,并且间断性的去更新服务列表,结合注册中心。
3.选择可用的服务进行调用(这个一般交给IRule去实现,不同的轮询策略)

三个很重要的概念

  • ServerList接口:定义用于获取服务器列表的方法的接口,主要实现DomainExtractingServerList接口,每隔30s种执行getUpdatedListOfServers方法进行服务列表的更新。
  • ServerListUpdater接口:主要实现类EurekaNotificationServerListUpdater和PollingServerListUpdater(默认使用的是PollingServerListUpdater,结合Eureka注册中心,定时任务的方式进行服务列表的更新)
  • ServerListFilter接口:根据LoadBalancerStats然后根据一些规则去过滤部分服务,比如根据zone(区域感知)去过滤。(主要实现类ZonePreferenceServerListFilter的getFilteredListOfServers会在更新服务列表的时候去执行)。
ILoadBalancer及其实现

com.netflix.loadbalancer.AbstractLoadBalancer

AbstractLoadBalancer contains features required for most loadbalancing
implementations.
An anatomy of a typical LoadBalancer consists of 1. A List of Servers (nodes)
that are potentially bucketed based on a specific criteria. 2. A Class that
defines and implements a LoadBalacing Strategy via IRule 3. A
Class that defines and implements a mechanism to determine the
suitability/availability of the nodes/servers in the List.
AbstractLoadBalancer包含大多数负载均衡实现的特征。
典型的LoadBalancer(负载均衡器)包括
1.一个基于某些特征的服务列表。
2.一个通过IRule定义和实现负载均衡战略的类。
3.一个用来确定列表节点/服务是否可用的类。

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
public enum ServerGroup{
       
   //所有服务实例     
   ALL,
   //正常服务的实例
   STATUS_UP,
   //停止服务的实例
   STATUS_NOT_UP        
}
        
/**
 * 选择具体的服务实例,key为null,忽略key的条件判断
 */
public Server chooseServer() {
    return chooseServer(null);
}
/**
 * 定义了根据分组类型来获取不同的服务实例的列表。
 */
 public abstract List<Server> getServerList(ServerGroup serverGroup);
    
/**
 * 定义了获取LoadBalancerStats对象的方法,LoadBalancerStats对象被用来存储负载均衡器中
 * 各个服务实例当前的属性和统计信息。这些信息非常有用,我们可以利用这些信息来观察负载均衡
 * 的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。
 */
public abstract LoadBalancerStats getLoadBalancerStats();
    
}

com.netflix.loadbalancer.BaseLoadBalancer

com.netflix.loadbalancer.BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,在该类中定义了很多关于负载均衡器相关的基础内容。

定义并维护了两种存储服务实例Server对象的列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单。

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
         .synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
         .synchronizedList(new ArrayList<Server>());
  • 定义了之前我们提到的用来存储负载均衡实例属性和统计信息的LoadBalancerStates对象。
  • 定义了检查服务实例是否正常服务的IPing对象,在BaseLoadBalancer中默认为null,需要在构造时注入它的具体实现。
protected IPing ping = null;
  • 定义了检查服务实例操作的执行策略对象IPingStrategy,在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现。根据源码,我们可以看到该策略采用线性遍历ping服务实例的方式实现检查。该策略在当IPing对象的实现速度不理想,或者是Server列表过大时,可能会影响到系统性能,这时候需要通过实现IPingStrategy接口并重写pingServer(IPing ping Server[] servers)函数去扩展ping的执行策略。
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
SerialPingStrategy实现
  • 定义了负载均衡的处理原则IRule对象,从BaseLoadBalancerchooseServer(Object key)的实现源码可以知道,负载均衡器实际将服务实例选择的任务委托给IRule实例中的choose函数来实现。
    默认初始化了RoundRobinRule实现,RoundRobinRule实现了最基本且常用的线性负载均衡规则。
protected IRule rule = DEFAULT_RULE;
private final static IRule DEFAULT_RULE = new RoundRobinRule();
BaseLoadBalancer的chooseServer方法
  • 启动ping任务:在BaseLoadBalancer的默认构造函数中,会直接启动一个用于定时检查Server是否健康的任务。该任务默认的执行间隔式10s。
/**
  * Default constructor which sets name as "default", sets null ping, and
  * {@link RoundRobinRule} as the rule.
  * <p>
  * This constructor is mainly used by {@link ClientFactory}. Calling this
  * constructor must be followed by calling {@link #init()} or
  * {@link #initWithNiwsConfig(IClientConfig)} to complete initialization.
  * This constructor is provided for reflection. When constructing
  * programatically, it is recommended to use other constructors.
  */
public BaseLoadBalancer() {
     this.name = DEFAULT_NAME;
     this.ping = null;
     setRule(DEFAULT_RULE);
     setupPingTask();
     lbStats = new LoadBalancerStats(DEFAULT_NAME);
}

定时任务

void setupPingTask() {
   if (canSkipPing()) {
       return;
   }
   if (lbTimer != null) {
      lbTimer.cancel();
   }
   lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
         true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}
  • 实现了ILoadBalancer接口定义的一系列的基本操作:

addServers(List<Server> newServers):向负载均衡器中增加新的服务实例列表,该实现将原本已经维护的所有服务实例清单allServerList和新传入的服务实例清单newServers都加入了newList中,然后通过调用setServersList函数对newList进行处理,在BaseLoadBalancer中实现的时候会使用新的列表覆盖旧的列表。

    /**
     * Add a list of servers to the 'allServer' list; does not verify
     * uniqueness, so you could give a server a greater share by adding it more
     * than once
     */
    @Override
    public void addServers(List<Server> newServers) {
        if (newServers != null && newServers.size() > 0) {
            try {
                ArrayList<Server> newList = new ArrayList<Server>();
                newList.addAll(allServerList);
                newList.addAll(newServers);
                setServersList(newList);
            } catch (Exception e) {
                logger.error("Exception while adding Servers", e);
            }
        }
    }

chooseServer(Object key) :挑选一个具体的服务实例,

  public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Throwable t) {
                return null;
            }
        }
    }

markServerDown(Server server):标记某个服务实例为暂停服务。

public void markServerDown(Server server) {
   if (server == null) {
      return;
   }

   if (!server.isAlive()) {
       return;
   }

   logger.error("LoadBalancer:  markServerDown called on ["
        + server.getId() + "]");
   server.setAlive(false);
   //forceQuickPing();

   notifyServerStatusChangeListener(singleton(server));
}

getReachableServers():获取可用的服务实例列表。由于BaseLoadBalancer中单独维护了一个正常服务的实例清单,所以直接返回即可

@Override
public List<Server> getReachableServers() {
   return Collections.unmodifiableList(upServerList);
}

getAllServers():获取所有的服务实例列表。由于BaseLoadBalancer中单独维护了一个正常服务的实例清单,所以直接返回即可。

@Override
public List<Server> getAllServers() {
   return Collections.unmodifiableList(allServerList);
}

com.netflix.loadbalancer.DynamicServerListLoadBalancer

com.netflix.loadbalancer.DynamicServerListLoadBalancer类继承com.netflix.loadbalancer.BaseLoadBalancer类,它是对基础负载均衡器的扩展。
该负载均衡器中,实现了服务实例清单在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,也就是说,我们可以通过过滤器来选择性的获取一批服务实例清单。

ServerList
DynamicServerListLoadBalancer的成员定义中,我们马上可以发现新增了一个关于服务列表的操作对象ServerList<T> serverListImpl。从类名DynamicServerListLoadBalancer<T extends Server>发现T泛型是Server子类,即代表了一个具体的服务实例的扩展类,而ServerList接口定义如下:

volatile ServerList<T> serverListImpl;

ServerList接口定义如下

/**
 * Interface that defines the methods sed to obtain the List of Servers
 * @author stonse
 *
 * @param <T>
 */
public interface ServerList<T extends Server> {

    //用于获取初始化的服务实例清单
    public List<T> getInitialListOfServers();
    
    //获取更新的服务实例清单,每隔30s更新一次
    public List<T> getUpdatedListOfServers();   

}

其实现类:

DynamicServerListLoadBalancer中的ServerList默认配置到底使用了哪些具体的实现呢?既然是该负载均衡器中实现服务实例的动态更新,那么势必需要Ribbon访问Eureka来获取服务实例的能力,可以从Ribbon整合Eureka的包下去寻找,
org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration中的,

@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
}

可以看出创建了DomainExtractingServerList实例,其内部也维护了ServerList list,同时DomainExtractingServerList类中对getInitialListOfServersgetUpdatedListOfServers的具体实现,其实是委托给内部定义的ServerList<DiscoveryEnabledServer> list对象,而该对象是通过创建DiscoveryEnabledNIWSServerList实例传递进去的

org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList的源码:

那么DiscoveryEnabledNIWSServerList是如何实现这两个服务实例获取的呢?从DiscoveryEnabledNIWSServerList其源码的私有方法obtainServersViaDiscovery通过服务发现机制来实现服务实例的获取的,
com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList的obtainServersViaDiscovery方法,

@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
}

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
}

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
}

EurekaClient eurekaClient = eurekaClientProvider.get();
        //vipAddresses可以理解为逻辑上的服务名,对这些服务名进行遍历,将状态为UP(正常服务)的实例转换成DiscoveryEnabledServer对象
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        //将这些实例组织成列表返回。
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
}

ServerListUpdater
com.netflix.loadbalancer.DynamicServerListLoadBalancer负载均衡器使用不同的策略进行列表更新的策略。

上面分析了如何从Eureka Server中获取服务实例清单,那么它又是如何触发向Eureka Server去获取服务实例清单以及如何在获取到服务实例清单后更新本地实例清单呢?

回到com.netflix.loadbalancer.DynamicServerListLoadBalancer

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            //实现对服务列表的更新
            updateListOfServers();
        }
    };
    
    protected volatile ServerListUpdater serverListUpdater;

updateListOfServers方法

     @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

ServerListUpdater的二种实现:

ServerListUpdater的二种实现

PollingServerListUpdater:动态服务列表更新的默认策略,也就是说,DynamicServerListLoadBalancer负载均衡器中的默认实现就是它,它通过定时任务的方式进行服务列表的更新。

EurekaNotificationServerListUpdater:该更新器也可以服务于DynamicServerListLoadBalancer负载均衡器,但是它的触发机制与PollingServerListUpdater不同,它需要利用Eureka的事件监听来驱动服务列表的更新操作。

查看PollingServerListUpdater的实现,

private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;

    public PollingServerListUpdater() {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }

    public PollingServerListUpdater(IClientConfig clientConfig) {
        this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
    }

    public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
        this.initialDelayMs = initialDelayMs;
        this.refreshIntervalMs = refreshIntervalMs;
    }

    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

initialDelayMsrefreshIntervalMs的默认定义是1000和30*1000,单位是毫秒。就是说,更新服务实例在初始化之后延迟1s后开始执行,并以30s位周期重复执行,还会记录最后更新时间,是否存活等信息。

ServerListFilter

volatile ServerListFilter<T> filter;

ServerListFilter接口的定义:
This interface allows for filtering the configured or dynamically obtained List of candidate servers with desirable characteristics.
该接口允许用配置或动态获取的具有所需特性的候选服务器列表进行过滤。

public interface ServerListFilter<T extends Server> {

    public List<T> getFilteredListOfServers(List<T> servers);

}
ServerListFilter接口的实现

其中,除了org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter的实现是Spring Cloud Ribbon中对Netflix Ribbon的扩展实现外,其他均是Netflix Ribbon中的原生实现类,

  • com.netflix.loadbalancer.AbstractServerListFilter:这是一个抽象的接口,接收一个重要的依据对象LoadBalancerStats
AbstractServerListFilter
  • com.netflix.loadbalancer.ZoneAffinityServerListFilter:该过滤器基于"区域感知(Zone Affinity)"的方式实现服务实例的过滤,也就说,它会根据提供服务的实例所处于的区域(Zone)与消费者自身所处区域(Zone)进行比较,过滤掉那些不是同处一个区域的实例
@Override
    public List<T> getFilteredListOfServers(List<T> servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
}
  • com.netflix.niws.loadbalancer.DefaultNIWSServerListFilter
    该过滤器完全继承自ZoneAffinityServerListFilter,是默认的NIWS(Netfilx Internal Web Service)过滤器。

  • com.netflix.loadbalancer.ServerListSubsetFilter
    该过滤器也继承自ZoneAffinityServerListFilter,它非常适用于拥有大规模服务器集群(上百或者更多)的系统。因为它可以产生一个“区域感知”

  • org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter
    spring cloud整合时新增的过滤器,若使用Spring Cloud整合EurekaRibbon时会默认使用该过滤器,它实现了通过配置或者Eureka实例元数据的所属区域(Zone)来过滤同区域的服务实例,它的实现非常简单,首先通过ZoneAffinityServerListFilter的过滤器来获得"区域感知"的服务实例列表,然后遍历这个结果,取出根据消费者配置预设的区域Zone来进行过滤,如果过滤的结果是空就直接返回父类的结果,如果不为空就返回通过消费者的Zone过滤后的结果。

@Override
public List<Server> getFilteredListOfServers(List<Server> servers) {
        List<Server> output = super.getFilteredListOfServers(servers);
        if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<Server>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
}

com.netflix.loadbalancer.ZoneAwareLoadBalancer

ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer的扩展。在DynamicServerListLoadBalancer中,我们可以看到它并没有重写选择具体服务实例的chooseServer函数,所以它依然会采用在BaseLoadBalancer中实现的算法。使用RoundRobinRule规则,以线性轮询的方式来选择调用的服务实例,该算法实现简单并没有区域(Zone)的概念,所以它会把所有实例视为一个Zone下的节点来看待,这样就会周期性的跨区域(Zone)访问的情况,由于跨区域会产生更高的延迟,这些实例主要以防止区域性故障实现高可用为目的而不能作为常规访问的实例,所以在多区域部署的情况会出现一定的性能问题,而该负载均衡器则可以规避这样的问题。

ZoneAwareLoadBalancer中,并没有重写setServersList方法,说明实现服务实例清单的更新的主要逻辑没有变化。但是重写了setServerListForZones方法,DynamicServerListLoadBalancer中的定义:

DynamicServerListLoadBalancer

setServerListForZones函数的调用位于更新服务实例清单函数setServersList最后,根据区域Zone分组的实例列表,为负载均衡器中的LoadBalancerStats对象创建ZoneStats并放入Map zoneServersMap集合中,每一个区域Zone对应一个ZoneStats,它用于存储每个Zone的一些状态和统计信息。

ZoneAwareLoadBalancer中对setServerListForZones重写如下:

@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);,它存储每个Zone区域对应的负载均衡器。
        if (balancers == null) {
            //创建一个ConcurrentHashMap类型的balancers对象
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
        //具体的负载均衡器在getLoadBalancer函数中完成,同时在创建负载均衡器的时候会创建它的规则
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
              //创建完负载均衡器的时候会马上调用setServersList函数为其设置对应Zone区域的实例清单
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        //对Zone区域中实例清单的检查,看看是否有Zone区域的实例清单已经没有实例了,是的话就将balancers中对应的Xone区域的实例列表清空,该操作
        //的作用是为了后续选择节点时,防止过多的Zone区域统计信息干扰具体实例的选择算法
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }    

getLoadBalancer方法

    //如果当前实现中没有IRule的实例,就创建一个AvailabilityFilteringRule规则,如果有实现克隆一个
    @VisibleForTesting
    BaseLoadBalancer getLoadBalancer(String zone) {
        zone = zone.toLowerCase();
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        if (loadBalancer == null) {
            // We need to create rule object for load balancer for each zone
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
                loadBalancer = prev;
            }
        } 
        return loadBalancer;        
    }

在了解了负载均衡器如何扩展服务实例清单的时候,看其怎样挑选服务实例,来实现对区域的识别的,

@Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        //只有当负载均衡器中维护的实例所属的Zone区域的个数大于1的时候才会执行这里的策略
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            //调用ZoneAvoidanceRule.createSnapshot方法,当前的负载均衡器中所有的Zone区域分布创建快照,保存在Map zoneSnapshot中
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            //调用ZoneAvoidanceRule.getAvailableZones方法,来获取可用Zone区域集合,在该函数中会通过Zone区域快照的统计数据实现可用区的挑选。
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            //当获得的可用Zone区域集合不为空,并且个数小于Zone区域总数,就随机选择一个Zone区域
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                //在确定了某个Zone区域后,则获取了对应Zone区域服务均衡器,并调用zoneLoadBalancer.chooseServer来选择具体的服务实例,而在
                //zoneLoadBalancer.chooseServer中将使用IRule接口的choose函数来选择具体的服务实例,在这里,IRule接口的实现会使用ZoneAvoidanceRule来挑选具体的服务实例。
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Throwable e) {
            logger.error("Unexpected exception when choosing server using zone aware logic", e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            //否则实现父类的策略
            return super.chooseServer(key);
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容