今天来看dubbo负载均衡策略中的最后一种,leastActive。leastActive在权重基础上,新增活跃度(active)维度限制,步骤如下:
1、初始化。初始化最低活跃值(-1)、最低活跃节点个数(可能有多个,默认0)、最低活跃度节点下标数组、
权重数组、总权重、首个最低活跃度节点权重。
2、遍历。遍历invoker,寻找所有活跃度最低的invoker;获取active值并计算权重;记录最低活跃节点、节点个数、totalweight等
3、若leastCount= 1,直接返回第一个;否则采用类似random的方式,基于totalweight生成随机值,然后基于随机值,选一个invoker
4、以上条件均不满足,则随机选取(leastIndexes中对应的下标)一个返回。
整个过程看起来是比较简单的,下面来看代码实现。
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//初始化最低活跃值、最低活跃节点数、最低活跃节点下标数组、权重数组、总权重,首个最低活跃度节点权重
int length = invokers.size();
int leastActive = -1;
int leastCount = 0;
int[] leastIndexes = new int[length];
int[] weights = new int[length];
int totalWeight = 0;
int firstWeight = 0;
boolean sameWeight = true;
// 遍历,过滤所有活跃度最低的invoker
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
//这里获取invoker的active值
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
} else if (active == leastActive) {
leastIndexes[leastCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 结果集只有一个,直接返回
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
//结果集不止一个,且所有节点权重不完全相同,totalweight>0
if (!sameWeight && totalWeight > 0) {
// 这里采用类似random负载均衡的方式,随机选取一个返回
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
//否则,随机选一个直接返回
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
到这里,leastActive负载均衡策略已经介绍完毕。其中一个细节需要单独拎出来看,注意下面这行代码
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
active值是按照URL、methodName从RpcStatus中获取的,那么,active的值是怎么计算出来的呢?先来看下取值逻辑:
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
if (map == null) {
METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
map = METHOD_STATISTICS.get(uri);
}
RpcStatus status = map.get(methodName);
if (status == null) {
map.putIfAbsent(methodName, new RpcStatus());
status = map.get(methodName);
}
return status;
}
逻辑比较简单,直接根据URL的identityString从Map缓存里拿,那么值是什么时候,怎么放进去的呢?关注下面的方法:
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
RpcStatus appStatus = getStatus(url);
RpcStatus methodStatus = getStatus(url, methodName);
if (methodStatus.active.incrementAndGet() > max) {
methodStatus.active.decrementAndGet();
return false;
} else {
appStatus.active.incrementAndGet();
return true;
}
}
可以看到,这里是active值统计的统一入口;这是一个公用方法,同时支持method纬度和app维度的active值统计;当active值超过max上限时,会触发decrement。找到active入口之后,再来看看,什么时候会对active值进行统计,来看下面的代码:
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
//开始统计
if (!RpcStatus.beginCount(url, methodName, max)) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
//加锁,防止其他调用者修改count
synchronized (count) {
while (!RpcStatus.beginCount(url, methodName, max)) {
try {
count.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
+ ". max concurrent invoke limit: " + max);
}
}
}
}
boolean isSuccess = true;
long begin = System.currentTimeMillis();
try {
return invoker.invoke(invocation);
} catch (RuntimeException t) {
isSuccess = false;
throw t;
} finally {
//结束统计
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if (max > 0) {
synchronized (count) {
count.notifyAll();
}
}
}
}
}
每次consumer进行RPC调用时,ActiveLimitFilter都会统计被调用应用、方法的active,是leastActive负载均衡中active值的源头。了解这里的逻辑之后,我们就能比较清晰理解leastActive负载均衡策略的核心原理。
注:参考 dubbo源码版本 2.7.1,欢迎指正。