ribbon 负载均衡-服务发现-07

我们都知道ribbon 是做负载均衡的一个组件,那么它是如何拿到 注册中心的服务列表的呢?本次我们就来解决这个问题。首先我们来看一个简单的案例,使用ribbon 客户端来实现一下负载均衡。

    @Autowired// ribbon 客户端
    private LoadBalancerClient loadBalancerClient;
    @GetMapping("/hello2")
    public String hello2() {
        RestTemplate restTemplate = new RestTemplate();
        // 获取服务实例
        ServiceInstance serviceInstance = loadBalancerClient.choose("server-provider");
        String url = String.format("http://%s:%s",serviceInstance.getHost(), serviceInstance.getPort() + "/hello");
        String template = restTemplate.getForObject(url, String.class);
        return template;
    }

ServiceInstance serviceInstance = loadBalancerClient.choose("server-provider"); 这句话就是拿到 经过负载均衡算法后 得到的一个 服务实例。进去看一下这个方法。

 *
 * @author Ryan Baxter
 */
public interface ServiceInstanceChooser {

    /**
     * Chooses a ServiceInstance from the LoadBalancer for the specified service.
     * @param serviceId The service ID to look up the LoadBalancer.
     * @return A ServiceInstance that matches the serviceId.
     */
    ServiceInstance choose(String serviceId);

}
  • 去到 RibbonLoadBalancerClient
....
    @Override
    public ServiceInstance choose(String serviceId) {
    //去到下面这个 方法 
           return choose(serviceId, null);
    }
        /**
     * New: Select a server using a 'key'.
     * @param serviceId of the service to choose an instance for
     * @param hint to specify the service instance
     * @return the selected {@link ServiceInstance}
     */
    public ServiceInstance choose(String serviceId, Object hint) {
        Server server = getServer(getLoadBalancer(serviceId), hint);
        if (server == null) {
            return null;
        }
        return new RibbonServer(serviceId, server, isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));
    }

Server server = getServer(getLoadBalancer(serviceId), hint);

  • getLoadBalancer(serviceId): 看下这个方法。获取一个 LoadBalancer
public class RibbonLoadBalancerClient implements LoadBalancerClient {
    private SpringClientFactory clientFactory;

    public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
        this.clientFactory = clientFactory;
    }
    protected ILoadBalancer getLoadBalancer(String serviceId) {
        return this.clientFactory.getLoadBalancer(serviceId);
    }
}
  • SpringClientFactory
    /**
     * Get the load balancer associated with the name.
     * @param name name to search by
     * @return {@link ILoadBalancer} instance
     * @throws RuntimeException if any error occurs
     */
    public ILoadBalancer getLoadBalancer(String name) {
        return getInstance(name, ILoadBalancer.class);
    }
    @Override
    public <C> C getInstance(String name, Class<C> type) {
        C instance = super.getInstance(name, type);
        if (instance != null) {
            return instance;
        }
        IClientConfig config = getInstance(name, IClientConfig.class);
        return instantiateWithConfig(getContext(name), type, config);
    }
  • C instance = super.getInstance(name, type);

C instance = super.getInstance(name, type); 这句往上走,就是从spring 容器中 获取一个 instance -> ILoadBalancer 。这里就不往下追了。我们去看一下 ILoadBalancer 这个接口。

  • ILoadBalancer
public interface ILoadBalancer {

    /**
     * Initial list of servers.
     * This API also serves to add additional ones at a later time
     * The same logical server (host:port) could essentially be added multiple times
     * (helpful in cases where you want to give more "weightage" perhaps ..)
     * 
     * @param newServers new servers to add
     */
    public void addServers(List<Server> newServers);
    
    /**
     * Choose a server from load balancer.
     * 
     * @param key An object that the load balancer may use to determine which server to return. null if 
     *         the load balancer does not use this parameter.
     * @return server chosen
     */
    public Server chooseServer(Object key);
    
    /**
     * To be called by the clients of the load balancer to notify that a Server is down
     * else, the LB will think its still Alive until the next Ping cycle - potentially
     * (assuming that the LB Impl does a ping)
     * 
     * @param server Server to mark as down
     */
    public void markServerDown(Server server);
    
    /**
     * @deprecated 2016-01-20 This method is deprecated in favor of the
     * cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
     * and {@link #getAllServers} API (equivalent to availableOnly=false).
     *
     * Get the current list of servers.
     *
     * @param availableOnly if true, only live and available servers should be returned
     */
    @Deprecated
    public List<Server> getServerList(boolean availableOnly);

    /**
     * @return Only the servers that are up and reachable.
     */
    public List<Server> getReachableServers();

    /**
     * @return All known servers, both reachable and unreachable.
     */
    public List<Server> getAllServers();
}

这个接口有一个实现类 :DynamicServerListLoadBalancer。
C instance = super.getInstance(name, type); 我们debug的话 会发现 这里的 instance -> DynamicServerListLoadBalancer。也就说这里返回的是 DynamicServerListLoadBalancer 对象。

  • DynamicServerListLoadBalancer
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

    boolean isSecure = false;
    boolean useTunnel = false;

    // to keep track of modification of server lists
    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);

    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers(); //更新服务列表。
        }
    };
 @Deprecated
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, 
            ServerList<T> serverList, ServerListFilter<T> filter) {
        this(
                clientConfig,
                rule,
                ping,
                serverList,
                filter,
                new PollingServerListUpdater()
        );
    }

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

    public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
        initWithNiwsConfig(clientConfig);
    }
  • restOfInit(clientConfig); // initWithNiwsConfig(clientConfig);

initWithNiwsConfig(clientConfig) ; 这个方法里面会 调用 restOfInit(clientConfig);

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature(); // 开启后台线程 更新服务列表。

        updateListOfServers(); // 更新服务列表。
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
  • enableAndInitLearnNewServersFeature();
    /**
     * Feature that lets us add new instances (from AMIs) to the list of
     * existing servers that the LB will use Call this method if you want this
     * feature enabled
     */
    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction); //开始执行后台更新。
    }

现在我们来看一下 serverListUpdater.start(updateAction); 前面有一段 下面的代码:

 protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers(); // 
        }
    };
// serverListUpdater.start(updateAction);  // 这句话就是 回调 的 这里的 doUpdate();
  • serverListUpdater.start(updateAction); -> PollingServerListUpdater 走的这里的start 方法。
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate(); // 回调。
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable, //传入线程,定时轮询,
                    initialDelayMs, //延迟启动
                    refreshIntervalMs,// 服务列表刷新 时间间隔
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

到这里我们就明白了 ,ribbon 也是 定时轮询服务列表,使用后台线程 完成的。
接下来我们 就来分析 一下 具体 获取服务列表的方法 updateListOfServers();

  • updateListOfServers();
   @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
// 获取服务列表
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
       //保存到本地缓存
        updateAllServerList(servers);
    }

这里呢,我们知道 ribbon 获取了 服务列表后 保存在了本地缓存里面。 updateAllServerList(servers);

  • servers = serverListImpl.getUpdatedListOfServers();

  • serverListImpl -> DiscoveryEnabledNIWSServerList

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }
  • obtainServersViaDiscovery();
 private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }
       // 到这里 就明白了 ribbon 是通过 EurekaClient 来完成的服务列表的获取,也就说使用的还是 eureka 自己的服务发现。
        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
// 获取服务实例
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
// 返回了一个服务列表。
        return serverList;
    }

好了,到这里 我们就分析出 ribbon 是 如何 去 注册中心获取 服务列表的了。 ribbon 会在 后台 开一个定时任务,使用 eureka client 自己的服务发现 去获取 服务列表,然后 保存到本地缓存,当客户段在 消费服务的时候 经过负载均衡算法 选取其中一个服务实例,并通过 restTemplate 发送http 请求 ,请求到 服务提供者。完成带有负载均衡的 远程通信。

这里我们会发现一个问题,也是在 使用spring cloud 开发中经常 碰到的一个现象,就是 当服务在做集群的时候 ,新增 或者 挂掉一个服务时候,不能及时反映到系统里面。到这里我们就知道 原因了,1. eureka client 端的 服务发现 是通过 后台定时任务去更新的,保存到本地缓存的 ,2. ribbon 也是通过 后台定时任务 借助 于 eureka client 端的 服务发现 去查询服务列表,然后保存到本地的。这里就产生了 延迟的问题。当然 eureka 本身是 居于 AP 的,所以如果要使用 eureka 做为注册中的话,一定要 明确 系统是否对 时效 性有要求,如果有的话 就不适合,可以 考虑 zk(CP) 、nacos 等。
服务列表我们拿到了,接下来会 分析 ribbon 的负载均衡策略。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352