Spring Cloud源码分析——Ribbon客户端负载均衡

IZONE小樱花(●’◡’●)ノ

年前聊了Eureka和Zookeeper的区别,然后微服务架构系列就鸽了三个多月,一直沉迷逛B站,无法自拔。最近公司复工,工作状态慢慢恢复(又是元气满满地划水)。本文从以下3个方面进行分析(参考了翟永超[程序猿DD])的《Spring Cloud微服务实战》

  1. LoadBalancerInterceptor拦截器对RestTemplate的请求拦截;
  2. RibbonLoadBalancerClient实际接口实现;
  3. 负载均衡策略

1、LoadBalancerInterceptor源码

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
            LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null,
                "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName,
                this.requestFactory.createRequest(request, body, execution));
    }

}

可以看出,该拦截器注入了LoadBalancerClient实例,当一个被@LoadBalanced修饰的RestTemplate对象发起Http请求,会被LoadBalancerInterceptor中的intercept函数拦截。该函数会通过getHost()获取Http请求的服务名,恰巧我们使用的RestTemplate对象采用服务名作为Host,接着loadBalancer查找到对应服务名的服务,调用execute函数对该服务发起请求。

2、RibbonLoadBalancerClient实现

/**
     * New: Execute a request by selecting server using a 'key'. The hint will have to be
     * the last parameter to not mess with the `execute(serviceId, ServiceInstance,
     * request)` method. This somewhat breaks the fluent coding style when using a lambda
     * to define the LoadBalancerRequest.
     * @param <T> returned request execution result type
     * @param serviceId id of the service to execute the request to
     * @param request to be executed
     * @param hint used to choose appropriate {@link Server} instance
     * @return request execution result
     * @throws IOException executing the request may result in an {@link IOException}
     */
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
            throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

经过LoadBalancerInterceptor拦截器后,调用LoadBalancerClient的execute函数去发起对应服务的请求。(LoadBalancerClient只是个抽象的负载均衡接口,RibbonLoadBalancerClient则是该接口的具体实现)
execute函数的作用,如官方所说:通过‘key’找到对应的服务并执行请求。
从源码中可以看出,execute函数具体实现首先是定义一个传入serviceId的loadBalancer对象,再getServer获取对应的具体服务,最后通过ribbonServer整合一系列服务信息发起请求。
其中getServer()是关键操作,来看看对应的源码:

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }

显然,需要再深入看下loadBalancer 。

public interface ILoadBalancer {

    /**
     * Initial list of servers.
     * This API also serves to add additional ones at a later time
     * The same logical server (host:port) could essentially be added multiple times
     * (helpful in cases where you want to give more "weightage" perhaps ..)
     * 
     * @param newServers new servers to add
     */
    public void addServers(List<Server> newServers);
    
    /**
     * Choose a server from load balancer.
     * 
     * @param key An object that the load balancer may use to determine which server to return. null if 
     *         the load balancer does not use this parameter.
     * @return server chosen
     */
    public Server chooseServer(Object key);
    
    /**
     * To be called by the clients of the load balancer to notify that a Server is down
     * else, the LB will think its still Alive until the next Ping cycle - potentially
     * (assuming that the LB Impl does a ping)
     * 
     * @param server Server to mark as down
     */
    public void markServerDown(Server server);
    
    /**
     * @deprecated 2016-01-20 This method is deprecated in favor of the
     * cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
     * and {@link #getAllServers} API (equivalent to availableOnly=false).
     *
     * Get the current list of servers.
     *
     * @param availableOnly if true, only live and available servers should be returned
     */
    @Deprecated
    public List<Server> getServerList(boolean availableOnly);

    /**
     * @return Only the servers that are up and reachable.
     */
    public List<Server> getReachableServers();

    /**
     * @return All known servers, both reachable and unreachable.
     */
    public List<Server> getAllServers();
}

ILoadBalancer定义了客户端负载均衡器的一系列抽象操作接口,从官方说明看出:

  • addServers:向负载均衡器的实例列表中添加新的服务实例
  • chooseServer:通过某种策略,挑选出一个具体的服务实例
  • markServerDown:通知并标识负载均衡器中某个具体服务实例已停止服务,不然的话,负载均衡器在下一次获取具体服务实例的时候,还会以为该服务正常
  • getReachableServers:获取可正常使用的服务实例列表
  • getAllServers:获取所有服务实例列表,包括正常和停止的

来看看具体实现BaseLoadBalancer,

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
    
        logger.debug("LoadBalancer [{}]:  initialized", name);
        
        this.name = name;
        this.ping = ping;
        this.pingStrategy = pingStrategy;
        setRule(rule);
        setupPingTask();
        lbStats = stats;
        init();
    }

默认构造函数ping设为null,rule策略默认设为轮询(RoundRobin)。该构造函数除了基本的赋值之外,主要是setRule(设置负载均衡策略)和setupPingTask(启动ping心跳任务)。

void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

setupPingTask逻辑主要是定义ShutdownEnabledTimer实例来执行一个10秒间隔的schedule。timer定时器还定义了个PingTask任务

class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }

官方注释中,TimerTask会在自定义的时间间隔内检查服务实例列表中每个服务实例的运行状态。
再看看PingTask 任务里runPinger方法的关键逻辑:

                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);

                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                            name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();

                notifyServerStatusChangeListener(changedServers);

从源码可以看出,PingTask运行runPinger方法,根据pingerStrategy.pingServers(ping, allServers)来获取服务的可用性,然后对比前后服务的状态,如果状态一致,则不去EurekaClient(一般用Eureka作为注册中心,可换成其他注册中心)获取注册列表;否则,则调用notifyServerStatusChangeListener通知EurekaClient更新或重新拉取。

简单总结下完整的过程:
RibbonLoadBalancerClient(负载均衡客户端)初始化(调用execute),通过ILoadBalance从Eureka注册中心获取服务注册列表,同时以10s为间隔往EurekaClient发送ping,来保证服务的可用性,如果服务前后发生改变,则ILoadBalance重新从Eureka注册中心获取。RibbonLoadBalancerClient拿到服务注册列表之后,再根据IRule具体的策略,去获取对应的服务实例。

3、负载均衡策略

前面讲到RibbonLoadBalancerClient获取具体服务实例的过程,这里就需要了解下负载均衡策略。众所周知,使用负载均衡的好处主要有:当一台或多台机器宕机之后,剩余的机器可以保证服务正常运行;分担机器运行的压力,防止某一高峰机器CPU负载过高。
常见的策略有:随机(Random)、轮询(RoundRobin)、一致性哈希(ConsistentHash)、哈希(Hash)、加权(Weighted)

  • 轮询(RoundRobin)
public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }

            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }
private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextServerCyclicCounter.get();
            int next = (current + 1) % modulo;
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

轮询算法其实就一句(current + 1) % modulo,每次都取下一台服务器。

  • 随机(Random)
    choose方法其实都差不多,主要看下算法
protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }

ThreadLocalRandom获取随机数即可

  • 一致性哈希(ConsistentHash)、哈希(Hash)
    这两个是很常见的算法,本文就不讨论了
  • 加权(Weighted)、BestAvailableRule、WeightedResponseTimeRule、ZoneAvoidanceRule


    负载均衡策略方法

    这个研究起来就又要长篇大论了,下次再写篇来介绍吧(下次一定)

Ribbon的源码分析大概就这样,后面可能会不定期更新,有兴趣的朋友可以继续深入了解下,有啥问题也可以在评论中一起讨论下。
最后有件很重要的事,那就是麻烦点赞关注赞赏,谢谢(๑•̀ㅂ•́)و✧

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容