ribbon源码阅读--IRule源码分析

本人小白,这个文章是本人的阅读笔记,不是权威解读,需要自己甄别对错

大致流程跑完了,现在分析下IRule相关实现和机制

整个流程跑下来,我们知道Ribbon中有三个核心组件,ILoadBalancer,IRule,IPing,其中IRule是起到做负载均衡算法作用的,在前文中我们从AllList中选择一个Server出来,就是使用的IRule中的算法。

首先我们看看IRule组件的初始化

    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }

这段初始化说的意思是如果我们配置了自定义的IRule策略就使用我们定义的,如果没有定义就走默认的,前文我们已经看过了,ZoneAvoidanceRule的策略实现是走父类PredicateBasedRule的轮询策略

那么我们看看常见可配置的负载均衡策略在源码里提供了多少实现:

  • RoundRobinRule
  • AvailabilityFilteringRule
  • BestAvailableRule
  • RandomRule
  • RetryRule
  • WeightedResponseTimeRule
  • ZoneAvoidanceRule

大概就这些吧,其余的不眼熟,也不常用,就不做分析了,老师傅对我建议是,生产环境下,别瞎捣鼓,用默认的就行 -_-!

RoundRobinRule --轮询算法

首先看看这个,为什么先这个呢,后面的许多均衡策略用该算法为基础算法的,先上核心方法看看:

public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            //获取状态可用的服务列表
            List<Server> reachableServers = lb.getReachableServers();
            //获取所有注册服务的列表
            List<Server> allServers = lb.getAllServers();
            //可用服务列表大小
            int upCount = reachableServers.size();
            //注册服务列表大小
            int serverCount = allServers.size();

            //可用服务或者注册表大小为0,返回null
            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
            
            //获取下一个下标,使用的死循环和CAS 就是自旋锁那一套玩意
            int nextServerIndex = incrementAndGetModulo(serverCount);
            
            //获取下标对应的服务
            server = allServers.get(nextServerIndex);

            // 服务为空,歇会,交出计算资源,继续
            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }
            
            //如果服务存活可用
            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

看了一遍,不就是轮询算法么???那么和默认的AbstractServerPredicate中的轮询有毛不同么,我的天,请原谅我的无知,不懂!

先看看什么个逻辑,大致逻辑都加了注释,嗯,看完的感受:毛玩意!!!我先吐槽下,我是真不懂,既然你获取了可用服务列表,为什么还用注册服务列表做轮询???难道是怕自旋锁锁住的时间内,服务节点状态改变了???算了,回头在研究

大概的意思就是轮询,选择一个可用服务节点进行返回,如果没有,就寻找10次,最后还没有,就是返回NULL了,中间如果发现寻找到服务为NULL,将进行线程yield操作,从这里看出来,这个比默认的轮询算法加了一个可用服务过滤。

AvailabilityFilteringRule

直接干核心方法吧

    @Override
    public Server choose(Object key) {
        int count = 0;
        //使用roundRobinRule选择一个服务出来
        Server server = roundRobinRule.choose(key);
        //总共循环10次
        while (count++ <= 10) {
            //判读是否满足条件
            if (predicate.apply(new PredicateKey(server))) {
                return server;
            }
            //选择下一个
            server = roundRobinRule.choose(key);
        }
        //调用父类的轮询策略
        return super.choose(key);
    }

调用轮询算法找到一个可用服务节点,之后判断是否满足一个条件,之后满足就返回,如果没有满足,就下一个。
那这个条件是啥?跟进去看看:

    @Override
    public boolean apply(@Nullable PredicateKey input) {
        //获取状态收集器 自己起的名字
        LoadBalancerStats stats = getLBStats();
        if (stats == null) {
            return true;
        }
        //这步是真正用的判断逻辑
        return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
    }
    
    private boolean shouldSkipServer(ServerStats stats) {        
        if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
            return true;
        }
        return false;
    }

就是判断当前的服务不是处于熔断器开启状态,且当前的服务连接数小于设置的并发数量,默认限制并发数是Integer.MAX_VALUE,没有这种机器能支持这个并发!!!这个一般配合hystrix使用比较好吧,接着看choose()方法

总之,大概的流程就是选择一个可用的,且并发数小于阈值的服务,如果10次还没找到,直接使用父类的轮询选择的节点

BestAvailableRule

核心方法:

    @Override
    public Server choose(Object key) {
        if (loadBalancerStats == null) {
            return super.choose(key);
        }
        //获取所有服务
        List<Server> serverList = getLoadBalancer().getAllServers();
        int minimalConcurrentConnections = Integer.MAX_VALUE;
        long currentTime = System.currentTimeMillis();
        Server chosen = null;
        //遍历所有节点
        for (Server server: serverList) {
            //获取服务状态
            ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
            //判断当前服务是否是熔断状态
            if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                // 找到最小连接数的服务
                int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                if (concurrentConnections < minimalConcurrentConnections) {
                    minimalConcurrentConnections = concurrentConnections;
                    chosen = server;
                }
            }
        }
        //调用父类轮询算法
        if (chosen == null) {
            return super.choose(key);
        } else {
            return chosen;
        }
    }

差不多的意思,找到最小连接数且服务不是熔断状态的服务,感觉没啥可说的,但是每次选取都是走一遍遍历,找到最小连接数的Server,和AvailabilityFilteringRule是由很大区别的

RandomRule

随机选择的,懒得看了,一般不用

RetryRule

看名字就知道是重试规则,看看核心方法吧

    public Server choose(ILoadBalancer lb, Object key) {
        //获取当前发起时间
        long requestTime = System.currentTimeMillis();
        //获取中止时间
        long deadline = requestTime + maxRetryMillis;

        Server answer = null;

        //轮询选择一个Server
        answer = subRule.choose(key);
        
        //如果选取的服务无效或者服务不可用  并且当前时间小于中止时间
        if (((answer == null) || (!answer.isAlive()))
                && (System.currentTimeMillis() < deadline)) {
            
            //设置一个中断调度任务 执行时间deadline - System.currentTimeMillis()
            InterruptTask task = new InterruptTask(deadline
                    - System.currentTimeMillis());

            //如果当前线程未被中断
            while (!Thread.interrupted()) {
                //再轮询选择一个Server
                answer = subRule.choose(key);
                // 一样的判断规则
                if (((answer == null) || (!answer.isAlive()))
                        && (System.currentTimeMillis() < deadline)) {
                    /* pause and retry hoping it's transient */
                    // 让出CPU
                    Thread.yield();
                } else {
                    break;
                }
            }
            //中断任务取消
            task.cancel();
        }

        if ((answer == null) || (!answer.isAlive())) {
            return null;
        } else {
            return answer;
        }
    }

大致的流程:

  • 轮询选择一个Server
  • 如果当前服务可用直接返回
  • 如果当前服务不可用,启动一个线程中断任务,并且循环轮询服务节点,知道找到可用或者超过中止时间

大致的代码都加上了注解,好处就是不会无限循环,有重试超时时间,超时指的是找到可用节点的时间超时

WeightedResponseTimeRule

基于权重的选取规则,这个权重和我们在Nginx配置的权重还是有区别的,WeightedResponseTimeRule的权重是基于响应时间,响应时间越长权重越小
该选取规则在对象初始化时候执行了initialize()方法

    void initialize(ILoadBalancer lb) {        
        if (serverWeightTimer != null) {
            serverWeightTimer.cancel();
        }
        //初始了一个定时调度器
        serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
                + name, true);
        //设置执行方法和间隔        
        serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
                serverWeightTaskTimerInterval);
        // do a initial run
        //先走一遍初始计算
        ServerWeight sw = new ServerWeight();
        sw.maintainWeights();
        //线程中止时候取消调度任务
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                logger
                        .info("Stopping NFLoadBalancer-serverWeightTimer-"
                                + name);
                serverWeightTimer.cancel();
            }
        }));
    }

这个初始化方法就是初始化了一个调度任务,30S执行一次,第一次计算了相关值,看看调度任务干了啥

    public void maintainWeights() {
            ILoadBalancer lb = getLoadBalancer();
            if (lb == null) {
                return;
            }
            
            if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
                return; 
            }
            
            try {
                logger.info("Weight adjusting job started");
                AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
                LoadBalancerStats stats = nlb.getLoadBalancerStats();
                if (stats == null) {
                    // no statistics, nothing to do
                    return;
                }
                //这一步是计算注册服务的所有响应时间之和
                double totalResponseTime = 0;
                // find maximal 95% response time
                for (Server server : nlb.getAllServers()) {
                    // this will automatically load the stats if not in cache
                    ServerStats ss = stats.getSingleServerStat(server);
                    totalResponseTime += ss.getResponseTimeAvg();
                }
                // weight for each server is (sum of responseTime of all servers - responseTime)
                // so that the longer the response time, the less the weight and the less likely to be chosen
                Double weightSoFar = 0.0;
                
                // create new list and hot swap the reference
                List<Double> finalWeights = new ArrayList<Double>();
                //遍历所有注册服务,计算权重,权重计算规则是所有服务响应时间-当前服务响应时间
                //那么响应时间越短的权重越大
                for (Server server : nlb.getAllServers()) {
                    ServerStats ss = stats.getSingleServerStat(server);
                    double weight = totalResponseTime - ss.getResponseTimeAvg();
                    weightSoFar += weight;
                    finalWeights.add(weightSoFar);   
                }
                setWeights(finalWeights);
            } catch (Exception e) {
                logger.error("Error calculating server weights", e);
            } finally {
                serverWeightAssignmentInProgress.set(false);
            }

        }

30S执行一次权重计算,服务权重 = 所有服务响应时间之和 - 当前服务响应时间

看看核心方法吧

    while (server == null) {
            // get hold of the current reference in case it is changed from the other thread
            List<Double> currentWeights = accumulatedWeights;
            //线程中断返回null
            if (Thread.interrupted()) {
                return null;
            }
            //获取所有服务
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();

            if (serverCount == 0) {
                return null;
            }

            int serverIndex = 0;

            //获取最后节点的响应时间
            // last one in the list is the sum of all weights
            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
            // No server has been hit yet and total weight is not initialized
            // fallback to use round robin
            //如maxTotalWeight小于0.001,或者服务列表和权重计算集合数量不匹配
            if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
                //返回轮询选取的节点
                server =  super.choose(getLoadBalancer(), key);
                if(server == null) {
                    return server;
                }
            } else {
                // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
                //生成 0 到 maxTotalWeight的随机权重的
                double randomWeight = random.nextDouble() * maxTotalWeight;
                // pick the server index based on the randomIndex
                int n = 0;
                //找到第一个比随机权重大的服务下标
                for (Double d : currentWeights) {
                    if (d >= randomWeight) {
                        serverIndex = n;
                        break;
                    } else {
                        n++;
                    }
                }
                //获取该服务
                server = allList.get(serverIndex);
            }

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            // Next.
            server = null;
        }

看了一遍,随机性很大,但是权重大选取的概率是比较大,选取的maxTotalWeight是最后一个服务节点的平均响应时间,不是真正的最大响应时间啊,我找了一圈也没发现排序之类的东西,并且如果真排序了,那么和服务列表也对应不上了。

为什么说随机性很大呢,最后一个节点的平均响应时间不知道处在什么什么水平,之后又是取了0-maxTotalWeight的随机数,但是权重越大,d >= randomWeight越容易成立,越容易选取,说明一点不是说你的权重最大,你就一定被选取。

ZoneAvoidanceRule

默认的轮询算法,没啥可说的,也是取模+CAS那一套,代码放出来看看吧

    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextIndex.get();
            int next = (current + 1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容