LoadBalance
从LoadBalance的实现类可知,dubbo默认的负载均衡算法总计有4种:
- 随机算法RandomLoadBalance(默认)
- 轮训算法RoundRobinLoadBalance
- 最小活跃数算法LeastActiveLoadBalance
- 一致性hash算法ConsistentHashLoadBalance
如何选择负载均衡算法
调用链从com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker中的invoke(final Invocation invocation)
开始:
--> AbstractClusterInvoker.invoke(Invocation)
--> AbstractClusterInvoker.doInvoke(invocation, invokers, loadbalance)
--> FailoverClusterInvoker.doInvoke(Invocation, final List<Invoker<T>>, LoadBalance) (说明,dubbo默认是failover集群实现,且默认最多重试2次,即默认最多for循环调用3次)
--> AbstractClusterInvoker.select()
--> AbstractClusterInvoker.doSelect()
--> AbstractLoadBalance.select()
--> RoundRobinLoadBalance.doSelect()
选定Invoker
一般dubbo服务都有多个Provider,即对Consumer来说有多个可以调用的Invoker,
根据下面的源码可知,dubbo只根据第一个Invoker(invokers.get(0)
)确定负载均衡策略,所以选择哪个Invoker取决于List<Invoker<T>> invokers
排列顺序,dubbo对Invoker排序的compare方法定义为:o1.getUrl().toString().compareTo(o2.getUrl().toString()));所以如果有10.0.0.1:20884,10.0.0.1:20886,10.0.0.1:20888三个invoke,那么第一个Invoker肯定是10.0.0.1:20884(这个排序算法后面会详细讲解);
public Result invoke(final Invocation invocation) throws RpcException {
checkWheatherDestoried();
LoadBalance loadbalance;
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
// 排序后的Invoker集合的第一个Invoker(即invokers.get(0))的负载均衡策略就是dubbo选择的策略,默认策略为Constants.DEFAULT_LOADBALANCE,即"random",然后根据扩展机制得到负载均衡算法的实现为com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
// 如果没有任何Invoker,那么直接取默认负载
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
sticky
sticky:粘的,粘性;
作用是在调用Invoker的重试过程中,尽可能调用前面选择的Invoker;sticky配置在consumer端,配置方式如下:
<dubbo:reference id="testService" interface="com.alibaba.dubbo.demo.TestService" retries="2" sticky="true"/>
sticky业务逻辑主要在如下这段代码中:
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
// 判断UR上是否配置了sticky属性,sticky默认为false
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;
{
//ignore overloaded method
if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){
stickyInvoker = null;
}
//ignore cucurrent problem
if (sticky && stickyInvoker != null && (selected == null || selected.contains(stickyInvoker))){
if (availablecheck && stickyInvoker.isAvailable()){
return stickyInvoker;
}
}
}
Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
// 如果配置了sticky="true",那么将这次选择到的Invoker缓存下来赋值给stickyInvoker
if (sticky){
stickyInvoker = invoker;
}
return invoker;
}
选择LoadBalance
实现源码AbstractClusterInvoker:
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
// 如果没有任何可调用服务, 那么返回null
if (invokers == null || invokers.size() == 0)
return null;
// 如果只有1个Invoker(provider),那么直接返回,不需要任何负载均衡算法
if (invokers.size() == 1)
return invokers.get(0);
// 如果只有两个invoker,且处于重新选择(重试)过程中(selected != null && selected.size() > 0),则退化成轮循;
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
// invokers数量至少3个的话,调用具体的负载均衡实现(例如轮询,随机等)选择Invoker
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//如果处于重试过程中,且 selected中包含当前选择的invoker(优先判断) 或者 (不可用&&availablecheck=true) 则重试.
if( (selected != null && selected.contains(invoker))
||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
try{
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if(rinvoker != null){
invoker = rinvoker;
}else{
//看下第一次选的位置,如果不是最后,选+1位置.
int index = invokers.indexOf(invoker);
try{
//最后在避免碰撞
invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
}catch (Exception e) {
logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
}
}
}catch (Throwable t){
logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
}
}
return invoker;
}
Dubbo的BUG:
这里退化成轮询的实现有问题,对应源码return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
如果retries=4,即最多调用5次,且两个可选invoke分别为:
10.0.0.1:20884,10.0.0.1:20886;
那么5次选择的invoke为:
10.0.0.1:20884
10.0.0.1:20886
10.0.0.1:20886
10.0.0.1:20886
10.0.0.1:20886,
即除了第1次外后面的选择都是选择第二个invoker;
因次需要把selected.get(0)修改为:selected.get(selected.size()-1);
即每次拿前一次选择的invoker与 invokers.get(0)比较,如果相同,则选则另一个invoker;否则就选 invokers.get(0);
2. 负载均衡算法的实现
2.1 随机算法
RandomLoadBalance.doSelect():
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); //Invoke的总个数
int totalWeight = 0; // 所有Invoke的总权重
boolean sameWeight = true; // 各个Invoke权重是否都一样
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // 累加总权重
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; // 每个invoke与前一次遍历的invoke的权重进行比较,判断所有权重是否一样
}
}
if (totalWeight > 0 && ! sameWeight) {
// 如果所有invoke的权重有不相同且总权重大于0则按总权重数随机
int offset = random.nextInt(totalWeight);
// 确定随机值落在哪个片断上,从而取得对应的invoke,如果weight越大,那么随机值落在其上的概率越大,这个invoke被选中的概率越大
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果权重相同或权重为0则均等随机
return invokers.get(random.nextInt(length));
}
算法说明
假定有3台dubbo provider:
10.0.0.1:20884, weight=2
10.0.0.1:20886, weight=3
10.0.0.1:20888, weight=4
随机算法的实现:
totalWeight=9;
假设offset=1(即random.nextInt(9)=1)
1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
假设offset=4(即random.nextInt(9)=4)
4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3
假设offset=7(即random.nextInt(9)=7)
7-2=5<0?否,这时候offset=5, 5-3=2<0?否,这时候offset=2, 2-4<0?是,所以选中 10.0.0.1:20888, weight=4
2.2 轮询算法
RoundRobinLoadBalance.doSelect():
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// key表示方法,例如com.alibaba.dubbo.demo.TestService.getRandomNumber, 即负载均衡算法细化到每个方法的调用;
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // provider的总个数
int maxWeight = 0; // 最大权重, 只是一个临时值, 所以设置为0, 当遍历invokers时接口的weight肯定大于0,马上就会替换成一个真实的maxWeight的值;
int minWeight = Integer.MAX_VALUE; // 最小权重,只是一个临时值, 所以设置为Integer类型最大值, 当遍历invokers时接口的weight肯定小于这个数,马上就会替换成一个真实的minWeight的值;
for (int i = 0; i < length; i++) {
// 从Invoker的URL中获取权重时,dubbo会判断是否warnup了,即只有当invoke这个jvm进程的运行时间超过warnup(默认为10分钟)时间,配置的weight才会生效;
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // 重新计算最大权重值
minWeight = Math.min(minWeight, weight); // 重新计算最小权重值
}
if (maxWeight > 0 && minWeight < maxWeight) { // 如果各provider配置的权重不一样
AtomicPositiveInteger weightSequence = weightSequences.get(key);
if (weightSequence == null) {
weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
weightSequence = weightSequences.get(key);
}
int currentWeight = weightSequence.getAndIncrement() % maxWeight;
List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : invokers) {
// 筛选权重大于当前权重基数的Invoker,从而达到给更大权重的invoke加权的目的
if (getWeight(invoker, invocation) > currentWeight) {
weightInvokers.add(invoker);
}
}
int weightLength = weightInvokers.size();
if (weightLength == 1) {
return weightInvokers.get(0);
} else if (weightLength > 1) {
invokers = weightInvokers;
length = invokers.size();
}
}
// 每个方法对应一个AtomicPositiveInteger,其序数从0开始,
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 取模轮循
return invokers.get(sequence.getAndIncrement() % length);
}
算法说明
假定有3台权重都一样的dubbo provider:
10.0.0.1:20884, weight=100
10.0.0.1:20886, weight=100
10.0.0.1:20888, weight=100
轮询算法的实现:
其调用方法某个方法(key)的sequence从0开始:
sequence=0时,选择invokers.get(0%3)=10.0.0.1:20884
sequence=1时,选择invokers.get(1%3)=10.0.0.1:20886
sequence=2时,选择invokers.get(2%3)=10.0.0.1:20888
sequence=3时,选择invokers.get(3%3)=10.0.0.1:20884
sequence=4时,选择invokers.get(4%3)=10.0.0.1:20886
sequence=5时,选择invokers.get(5%3)=10.0.0.1:20888
如果有3台权重不一样的dubbo provider:
10.0.0.1:20884, weight=50
10.0.0.1:20886, weight=100
10.0.0.1:20888, weight=150
调试过很多次,这种情况下有问题;留一个TODO;
2.3 最小活跃数算法:
LeastActiveRobinLoadBalance.doSelect():
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 总个数
int leastActive = -1; // 最小的活跃数
int leastCount = 0; // 相同最小活跃数的个数
int[] leastIndexes = new int[length]; // 相同最小活跃数的下标
int totalWeight = 0; // 总权重
int firstWeight = 0; // 第一个权重,用于于计算是否相同
boolean sameWeight = true; // 是否所有权重相同
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 活跃数从RpcStatus中取得, dubbo统计活跃数时以方法为维度;
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
leastActive = active; // 记录最小活跃数
leastCount = 1; // 重新统计相同最小活跃数的个数
leastIndexes[0] = i; // 重新记录最小活跃数下标
totalWeight = weight; // 重新累计总权重
firstWeight = weight; // 记录第一个权重
sameWeight = true; // 还原权重相同标识
} else if (active == leastActive) { // 累计相同最小的活跃数
leastIndexes[leastCount ++] = i; // 累计相同最小活跃数下标
totalWeight += weight; // 累计总权重
// 判断所有权重是否一样
if (sameWeight && i > 0
&& weight != firstWeight) {
sameWeight = false;
}
}
}
if (leastCount == 1) {
// 如果只有一个最小则直接返回
return invokers.get(leastIndexes[0]);
}
if (! sameWeight && totalWeight > 0) {
// 如果权重不相同且权重大于0则按总权重数随机
int offsetWeight = random.nextInt(totalWeight);
// 并确定随机值落在哪个片断上
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// 如果权重相同或权重为0则均等随机
return invokers.get(leastIndexes[random.nextInt(leastCount)]);
}
算法说明
最小活跃数算法实现:
假定有3台dubbo provider:
10.0.0.1:20884, weight=2,active=2
10.0.0.1:20886, weight=3,active=4
10.0.0.1:20888, weight=4,active=3
active=2最小,且只有一个2,所以选择10.0.0.1:20884
假定有3台dubbo provider:
10.0.0.1:20884, weight=2,active=2
10.0.0.1:20886, weight=3,active=2
10.0.0.1:20888, weight=4,active=3
active=2最小,且有2个,所以从[10.0.0.1:20884,10.0.0.1:20886 ]中选择;
接下来的算法与随机算法类似:
假设offset=1(即random.nextInt(5)=1)
1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
假设offset=4(即random.nextInt(5)=4)
4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3
2.4 一致性hash算法
实现源码请参考ConsistentHashLoadBalanceRobinLoadBalance.doSelect(),具体步骤如下:
- 定义全局一致性hash选择器的
ConcurrentMap<String, ConsistentHashSelector<?>> selectors
,key为方法名称,例如com.alibaba.dubbo.demo.TestService.getRandomNumber; - 如果一致性hash选择器不存在或者与以前保存的一致性hash选择器不一样(即dubbo服务provider有变化,通过System.identityHashCode(invokers)计算一个identityHashCode值) 则需要重新构造一个一致性hash选择器;
- 构造一个一致性hash选择器ConsistentHashSelector的源码如下,通过参数i和h打散Invoker在TreeMap上的位置,replicaNumber默认值为160,所以最终virtualInvokers这个TreeMap的size为
invokers.size()*replicaNumber
:
for (Invoker<T> invoker : invokers) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(invoker.getUrl().toFullString() + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
- 选择Invoker的步骤:
4.1. 根据Invocation中的参数invocation.getArguments()转成key;
4.2 算出这个key的md5值;
4.3 根据md5值的hash值从TreeMap中选择一个Invoker;
NOTE: 选择Invoker的hash值根据Invocation中的参数得到,那么如果调用的方法没有任何参数,例如getRandomNumber(),那么在可用Invoker集合不变的前提下,任何对该方法的调用都一直返回相同的Invoker;另外,通过对一致性hash算法的分析可知,weight权重设置对其不起作用,只对其他三种负载均衡算法其作用;