14.dubbo源码-负载均衡

LoadBalance

从LoadBalance的实现类可知,dubbo默认的负载均衡算法总计有4种:

  • 随机算法RandomLoadBalance(默认)
  • 轮训算法RoundRobinLoadBalance
  • 最小活跃数算法LeastActiveLoadBalance
  • 一致性hash算法ConsistentHashLoadBalance

如何选择负载均衡算法

调用链从com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker中的invoke(final Invocation invocation)开始:
--> AbstractClusterInvoker.invoke(Invocation)
--> AbstractClusterInvoker.doInvoke(invocation, invokers, loadbalance)
--> FailoverClusterInvoker.doInvoke(Invocation, final List<Invoker<T>>, LoadBalance) (说明,dubbo默认是failover集群实现,且默认最多重试2次,即默认最多for循环调用3次)
--> AbstractClusterInvoker.select()
--> AbstractClusterInvoker.doSelect()
--> AbstractLoadBalance.select()
--> RoundRobinLoadBalance.doSelect()

选定Invoker

一般dubbo服务都有多个Provider,即对Consumer来说有多个可以调用的Invoker,
根据下面的源码可知,dubbo只根据第一个Invokerinvokers.get(0))确定负载均衡策略,所以选择哪个Invoker取决于List<Invoker<T>> invokers排列顺序,dubbo对Invoker排序的compare方法定义为:o1.getUrl().toString().compareTo(o2.getUrl().toString()));所以如果有10.0.0.1:20884,10.0.0.1:20886,10.0.0.1:20888三个invoke,那么第一个Invoker肯定是10.0.0.1:20884(这个排序算法后面会详细讲解);

public Result invoke(final Invocation invocation) throws RpcException {
    checkWheatherDestoried();
    LoadBalance loadbalance;  
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && invokers.size() > 0) {
        // 排序后的Invoker集合的第一个Invoker(即invokers.get(0))的负载均衡策略就是dubbo选择的策略,默认策略为Constants.DEFAULT_LOADBALANCE,即"random",然后根据扩展机制得到负载均衡算法的实现为com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        // 如果没有任何Invoker,那么直接取默认负载
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

sticky

sticky:粘的,粘性;
作用是在调用Invoker的重试过程中,尽可能调用前面选择的Invoker;sticky配置在consumer端,配置方式如下:

<dubbo:reference id="testService" interface="com.alibaba.dubbo.demo.TestService" retries="2" sticky="true"/>

sticky业务逻辑主要在如下这段代码中:

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.size() == 0)
        return null;
    String methodName = invocation == null ? "" : invocation.getMethodName();
    // 判断UR上是否配置了sticky属性,sticky默认为false
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;
    {
        //ignore overloaded method
        if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){
            stickyInvoker = null;
        }
        //ignore cucurrent problem
        if (sticky && stickyInvoker != null && (selected == null || selected.contains(stickyInvoker))){
            if (availablecheck && stickyInvoker.isAvailable()){
                return stickyInvoker;
            }
        }
    }
    Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
    // 如果配置了sticky="true",那么将这次选择到的Invoker缓存下来赋值给stickyInvoker
    if (sticky){
        stickyInvoker = invoker;
    }
    return invoker;
}

选择LoadBalance

实现源码AbstractClusterInvoker:

    private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

        // 如果没有任何可调用服务, 那么返回null
        if (invokers == null || invokers.size() == 0)
            return null;
        // 如果只有1个Invoker(provider),那么直接返回,不需要任何负载均衡算法
        if (invokers.size() == 1)
            return invokers.get(0);
        // 如果只有两个invoker,且处于重新选择(重试)过程中(selected != null && selected.size() > 0),则退化成轮循;

        if (invokers.size() == 2 && selected != null && selected.size() > 0) {
            return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
        }

        // invokers数量至少3个的话,调用具体的负载均衡实现(例如轮询,随机等)选择Invoker
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
        
        //如果处于重试过程中,且 selected中包含当前选择的invoker(优先判断) 或者 (不可用&&availablecheck=true) 则重试.
        if( (selected != null && selected.contains(invoker))
                ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
            try{
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if(rinvoker != null){
                    invoker =  rinvoker;
                }else{
                    //看下第一次选的位置,如果不是最后,选+1位置.
                    int index = invokers.indexOf(invoker);
                    try{
                        //最后在避免碰撞
                        invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
                    }catch (Exception e) {
                        logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
                    }
                }
            }catch (Throwable t){
                logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
            }
        }
        return invoker;
    } 

Dubbo的BUG
这里退化成轮询的实现有问题,对应源码return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);如果retries=4,即最多调用5次,且两个可选invoke分别为:
10.0.0.1:20884,10.0.0.1:20886;
那么5次选择的invoke为:
10.0.0.1:20884
10.0.0.1:20886
10.0.0.1:20886
10.0.0.1:20886
10.0.0.1:20886,
即除了第1次外后面的选择都是选择第二个invoker;
因次需要把selected.get(0)修改为:selected.get(selected.size()-1);
即每次拿前一次选择的invoker与 invokers.get(0)比较,如果相同,则选则另一个invoker;否则就选 invokers.get(0);

2. 负载均衡算法的实现

2.1 随机算法

RandomLoadBalance.doSelect():

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size(); //Invoke的总个数
    int totalWeight = 0; // 所有Invoke的总权重
    boolean sameWeight = true; // 各个Invoke权重是否都一样
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        totalWeight += weight; // 累加总权重
        if (sameWeight && i > 0
                && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false; // 每个invoke与前一次遍历的invoke的权重进行比较,判断所有权重是否一样
        }
    }
    if (totalWeight > 0 && ! sameWeight) {
        // 如果所有invoke的权重有不相同且总权重大于0则按总权重数随机
        int offset = random.nextInt(totalWeight);
        // 确定随机值落在哪个片断上,从而取得对应的invoke,如果weight越大,那么随机值落在其上的概率越大,这个invoke被选中的概率越大
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // 如果权重相同或权重为0则均等随机
    return invokers.get(random.nextInt(length));
}

算法说明

假定有3台dubbo provider:
10.0.0.1:20884, weight=2
10.0.0.1:20886, weight=3
10.0.0.1:20888, weight=4
随机算法的实现:
totalWeight=9;
假设offset=1(即random.nextInt(9)=1)
1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
假设offset=4(即random.nextInt(9)=4)
4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3
假设offset=7(即random.nextInt(9)=7)
7-2=5<0?否,这时候offset=5, 5-3=2<0?否,这时候offset=2, 2-4<0?是,所以选中 10.0.0.1:20888, weight=4

2.2 轮询算法

RoundRobinLoadBalance.doSelect():

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // key表示方法,例如com.alibaba.dubbo.demo.TestService.getRandomNumber, 即负载均衡算法细化到每个方法的调用;
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    int length = invokers.size(); // provider的总个数
    int maxWeight = 0; // 最大权重, 只是一个临时值, 所以设置为0, 当遍历invokers时接口的weight肯定大于0,马上就会替换成一个真实的maxWeight的值;
    int minWeight = Integer.MAX_VALUE; // 最小权重,只是一个临时值, 所以设置为Integer类型最大值, 当遍历invokers时接口的weight肯定小于这个数,马上就会替换成一个真实的minWeight的值;

    for (int i = 0; i < length; i++) {
        // 从Invoker的URL中获取权重时,dubbo会判断是否warnup了,即只有当invoke这个jvm进程的运行时间超过warnup(默认为10分钟)时间,配置的weight才会生效;
        int weight = getWeight(invokers.get(i), invocation);
        maxWeight = Math.max(maxWeight, weight); // 重新计算最大权重值
        minWeight = Math.min(minWeight, weight); // 重新计算最小权重值
    }
    if (maxWeight > 0 && minWeight < maxWeight) { // 如果各provider配置的权重不一样
        AtomicPositiveInteger weightSequence = weightSequences.get(key);
        if (weightSequence == null) {
            weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
            weightSequence = weightSequences.get(key);
        }
        int currentWeight = weightSequence.getAndIncrement() % maxWeight;
        List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
        for (Invoker<T> invoker : invokers) { 
            // 筛选权重大于当前权重基数的Invoker,从而达到给更大权重的invoke加权的目的
            if (getWeight(invoker, invocation) > currentWeight) {
                weightInvokers.add(invoker);
            }
        }
        int weightLength = weightInvokers.size();
        if (weightLength == 1) {
            return weightInvokers.get(0);
        } else if (weightLength > 1) {
            invokers = weightInvokers;
            length = invokers.size();
        }
    }

    // 每个方法对应一个AtomicPositiveInteger,其序数从0开始,
    AtomicPositiveInteger sequence = sequences.get(key);
    if (sequence == null) {
        sequences.putIfAbsent(key, new AtomicPositiveInteger());
        sequence = sequences.get(key);
    }
    // 取模轮循
    return invokers.get(sequence.getAndIncrement() % length);
}

算法说明

假定有3台权重都一样的dubbo provider:
10.0.0.1:20884, weight=100
10.0.0.1:20886, weight=100
10.0.0.1:20888, weight=100
轮询算法的实现:
其调用方法某个方法(key)的sequence从0开始:
sequence=0时,选择invokers.get(0%3)=10.0.0.1:20884
sequence=1时,选择invokers.get(1%3)=10.0.0.1:20886
sequence=2时,选择invokers.get(2%3)=10.0.0.1:20888
sequence=3时,选择invokers.get(3%3)=10.0.0.1:20884
sequence=4时,选择invokers.get(4%3)=10.0.0.1:20886
sequence=5时,选择invokers.get(5%3)=10.0.0.1:20888

如果有3台权重不一样的dubbo provider:
10.0.0.1:20884, weight=50
10.0.0.1:20886, weight=100
10.0.0.1:20888, weight=150
调试过很多次,这种情况下有问题;留一个TODO;

2.3 最小活跃数算法:

LeastActiveRobinLoadBalance.doSelect():

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size(); // 总个数
    int leastActive = -1; // 最小的活跃数
    int leastCount = 0; // 相同最小活跃数的个数
    int[] leastIndexes = new int[length]; // 相同最小活跃数的下标
    int totalWeight = 0; // 总权重
    int firstWeight = 0; // 第一个权重,用于于计算是否相同
    boolean sameWeight = true; // 是否所有权重相同
    for (int i = 0; i < length; i++) {
        Invoker<T> invoker = invokers.get(i);
        // 活跃数从RpcStatus中取得, dubbo统计活跃数时以方法为维度;
        int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();         
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
        if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
            leastActive = active; // 记录最小活跃数
            leastCount = 1; // 重新统计相同最小活跃数的个数
            leastIndexes[0] = i; // 重新记录最小活跃数下标
            totalWeight = weight; // 重新累计总权重
            firstWeight = weight; // 记录第一个权重
            sameWeight = true; // 还原权重相同标识
        } else if (active == leastActive) { // 累计相同最小的活跃数
            leastIndexes[leastCount ++] = i; // 累计相同最小活跃数下标
            totalWeight += weight; // 累计总权重
            // 判断所有权重是否一样
            if (sameWeight && i > 0 
                    && weight != firstWeight) {
                sameWeight = false;
            }
        }
    }
    if (leastCount == 1) {
        // 如果只有一个最小则直接返回
        return invokers.get(leastIndexes[0]);
    }
    if (! sameWeight && totalWeight > 0) {
        // 如果权重不相同且权重大于0则按总权重数随机
        int offsetWeight = random.nextInt(totalWeight);
        // 并确定随机值落在哪个片断上
        for (int i = 0; i < leastCount; i++) {
            int leastIndex = leastIndexes[i];
            offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
            if (offsetWeight <= 0)
                return invokers.get(leastIndex);
        }
    }
    // 如果权重相同或权重为0则均等随机
    return invokers.get(leastIndexes[random.nextInt(leastCount)]);
}

算法说明

最小活跃数算法实现:
假定有3台dubbo provider:
10.0.0.1:20884, weight=2,active=2
10.0.0.1:20886, weight=3,active=4
10.0.0.1:20888, weight=4,active=3
active=2最小,且只有一个2,所以选择10.0.0.1:20884

假定有3台dubbo provider:
10.0.0.1:20884, weight=2,active=2
10.0.0.1:20886, weight=3,active=2
10.0.0.1:20888, weight=4,active=3
active=2最小,且有2个,所以从[10.0.0.1:20884,10.0.0.1:20886 ]中选择;
接下来的算法与随机算法类似:
假设offset=1(即random.nextInt(5)=1)
1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
假设offset=4(即random.nextInt(5)=4)
4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3

2.4 一致性hash算法

实现源码请参考ConsistentHashLoadBalanceRobinLoadBalance.doSelect(),具体步骤如下:

  1. 定义全局一致性hash选择器的ConcurrentMap<String, ConsistentHashSelector<?>> selectors,key为方法名称,例如com.alibaba.dubbo.demo.TestService.getRandomNumber;
  2. 如果一致性hash选择器不存在或者与以前保存的一致性hash选择器不一样(即dubbo服务provider有变化,通过System.identityHashCode(invokers)计算一个identityHashCode值) 则需要重新构造一个一致性hash选择器;
  3. 构造一个一致性hash选择器ConsistentHashSelector的源码如下,通过参数i和h打散Invoker在TreeMap上的位置,replicaNumber默认值为160,所以最终virtualInvokers这个TreeMap的size为invokers.size()*replicaNumber
for (Invoker<T> invoker : invokers) {
    for (int i = 0; i < replicaNumber / 4; i++) {
        byte[] digest = md5(invoker.getUrl().toFullString() + i);
        for (int h = 0; h < 4; h++) {
            long m = hash(digest, h);
            virtualInvokers.put(m, invoker);
        }
    }
}
  1. 选择Invoker的步骤:

4.1. 根据Invocation中的参数invocation.getArguments()转成key;
4.2 算出这个key的md5值;
4.3 根据md5值的hash值从TreeMap中选择一个Invoker;

NOTE: 选择Invoker的hash值根据Invocation中的参数得到,那么如果调用的方法没有任何参数,例如getRandomNumber(),那么在可用Invoker集合不变的前提下,任何对该方法的调用都一直返回相同的Invoker;另外,通过对一致性hash算法的分析可知,weight权重设置对其不起作用,只对其他三种负载均衡算法其作用;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容