RoundRobinLoadBalance是dubbo里面提供的按照权重进行轮询的负载均衡算法,整个算法设计的非常巧妙,如下
1 初始化本地权重表,根据情况动态调整
2 每次动态的更新本地权重表,更新算法为当前invoker的权重+本地权重表的old值
3 选取本地权重最大的invoker,并将其本地权重表的权重 减去 本轮所有invoker的权重和,并返回当前的invoker
重复3
tip 本地权重表中如果一分钟内某个权重信息没有更新(即对应的invoker没有被调用,那么回收,重新的初始化)
,举个例子,引用于
https://blog.csdn.net/lizz861109/article/details/109517749(侵权请联系我删除)
图如下
image.png
源码分析如下
public class RoundRobinLoadBalance extends AbstractLoadBalance {
//负载均衡器的名称
public static final String NAME = "roundrobin";
//更新时间(就是某个invoker一直没有被调用,将其删除,并重新的初始化)
private static int RECYCLE_PERIOD = 60000;
//内置的权重轮询器,主要对当前invoker的默认权重weight(一般不会变,如果是启动
//十分钟内会变化)和当前权重current(如果被选中,会减少total的权重,保证下次被 //选中的概率减小)
protected static class WeightedRoundRobin {
//配置权重
private int weight;
//当前权重
private AtomicLong current = new AtomicLong(0);
//最后更新时间
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
//每次轮询的时候,都会将当前权重 + 默认权重再次设置为当前权重
public long increaseCurrent() {
return current.addAndGet(weight);
}
//如果被选中,则当前权重减去 total
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
//缓存
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
//cas锁
private AtomicBoolean updateLock = new AtomicBoolean();
/**
* get invoker addr list cached for specified invocation
* <p>
* <b>for unit test only</b>
*
* @param invokers
* @param invocation
* @return
*/
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map != null) {
return map.keySet();
}
return null;
}
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//获取key service + method
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
//如果为空,那么初始化一个
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
//获取当前invoker的权重
int weight = getWeight(invoker, invocation);
if (weight < 0) {
weight = 0;
}
//如果当前invoker对应的weightedRoundRobin为空,那么new一个,顺便使用
//当前invoker的信息来初始化这个weightedRoundRobin
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
weightedRoundRobin = map.get(identifyString);
}
//如果默认的weight与实际的不一致,那么更新(有可能在warmup阶段)
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
//将invoker的当前权重 + 默认权重
long cur = weightedRoundRobin.increaseCurrent();
//更新时间
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
//如果invokers的个数与缓存的个数对不上,进行更新
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
//如果某个invoker一分钟都没有更新了(也就是根本没有请求过来)
//将其回收,下次请求进来会重建
Entry<String, WeightedRoundRobin> item = it.next();
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
//经过上面的操作之后,当前权重最大的invoker被选出来了
if (selectedInvoker != null) {
//更新当前权重(也就是减去totalWeight )
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
}
大家结合图和代码就能很好的理解这个加权轮询算法了