dubbo技术内幕六 RoundRobinLoadBalance

RoundRobinLoadBalance是dubbo里面提供的按照权重进行轮询的负载均衡算法,整个算法设计的非常巧妙,如下
1 初始化本地权重表,根据情况动态调整
2 每次动态的更新本地权重表,更新算法为当前invoker的权重+本地权重表的old值
3 选取本地权重最大的invoker,并将其本地权重表的权重 减去 本轮所有invoker的权重和,并返回当前的invoker

重复3
tip 本地权重表中如果一分钟内某个权重信息没有更新(即对应的invoker没有被调用,那么回收,重新的初始化)
,举个例子,引用于
https://blog.csdn.net/lizz861109/article/details/109517749(侵权请联系我删除)
图如下

image.png

源码分析如下

public class RoundRobinLoadBalance extends AbstractLoadBalance {
    //负载均衡器的名称
    public static final String NAME = "roundrobin";
    //更新时间(就是某个invoker一直没有被调用,将其删除,并重新的初始化)
    private static int RECYCLE_PERIOD = 60000;
    //内置的权重轮询器,主要对当前invoker的默认权重weight(一般不会变,如果是启动 
  //十分钟内会变化)和当前权重current(如果被选中,会减少total的权重,保证下次被 //选中的概率减小)
    protected static class WeightedRoundRobin {
        //配置权重
        private int weight;
        //当前权重
        private AtomicLong current = new AtomicLong(0);
        //最后更新时间
        private long lastUpdate;
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }
        //每次轮询的时候,都会将当前权重 + 默认权重再次设置为当前权重
        public long increaseCurrent() {
            return current.addAndGet(weight);
        }
       //如果被选中,则当前权重减去 total
        public void sel(int total) {
            current.addAndGet(-1 * total);
        }
        public long getLastUpdate() {
            return lastUpdate;
        }
        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }
    //缓存
    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
     //cas锁
    private AtomicBoolean updateLock = new AtomicBoolean();
    
    /**
     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     * 
     * @param invokers
     * @param invocation
     * @return
     */
    protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map != null) {
            return map.keySet();
        }
        return null;
    }
    
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //获取key  service + method 
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
         //如果为空,那么初始化一个
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;
        for (Invoker<T> invoker : invokers) {
            String identifyString = invoker.getUrl().toIdentityString();
            WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
           //获取当前invoker的权重
            int weight = getWeight(invoker, invocation);
            if (weight < 0) {
                weight = 0;
            }
             //如果当前invoker对应的weightedRoundRobin为空,那么new一个,顺便使用
           //当前invoker的信息来初始化这个weightedRoundRobin
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(identifyString, weightedRoundRobin);
                weightedRoundRobin = map.get(identifyString);
            }
            //如果默认的weight与实际的不一致,那么更新(有可能在warmup阶段)
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                weightedRoundRobin.setWeight(weight);
            }
            //将invoker的当前权重 + 默认权重
            long cur = weightedRoundRobin.increaseCurrent();
           //更新时间
            weightedRoundRobin.setLastUpdate(now);
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            totalWeight += weight;
        }
       //如果invokers的个数与缓存的个数对不上,进行更新
        if (!updateLock.get() && invokers.size() != map.size()) {
            if (updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                    newMap.putAll(map);
                    Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                    while (it.hasNext()) {
                        //如果某个invoker一分钟都没有更新了(也就是根本没有请求过来)
                       //将其回收,下次请求进来会重建
                        Entry<String, WeightedRoundRobin> item = it.next();
                        if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                            it.remove();
                        }
                    }
                    methodWeightMap.put(key, newMap);
                } finally {
                    updateLock.set(false);
                }
            }
        }
        //经过上面的操作之后,当前权重最大的invoker被选出来了
        if (selectedInvoker != null) {
            //更新当前权重(也就是减去totalWeight  )
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return invokers.get(0);
    }

}

大家结合图和代码就能很好的理解这个加权轮询算法了

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容