源码分析---和dubbo相比 迪士尼源码搭建下载 SOFARPC是如何实现负载均衡的?

迪士尼源码搭建下载【hubawl.com】


官方目前建议使用的负载均衡包括以下几种:

random(随机算法)

localPref(本地优先算法)

roundRobin(轮询算法)

consistentHash(一致性hash算法)

所以我们接下来分析以下以上四种负载均衡的源码是怎样的。

随机算法

我们先看一下SOFARPC的源码实现:

@OverridepublicProviderInfodoSelect(SofaRequest invocation, List<ProviderInfo> providerInfos){    ProviderInfo providerInfo =null;intsize = providerInfos.size();// 总个数inttotalWeight =0;// 总权重booleanisWeightSame =true;// 权重是否都一样for(inti =0; i < size; i++) {intweight = getWeight(providerInfos.get(i));        totalWeight += weight;// 累计总权重if(isWeightSame && i >0&& weight != getWeight(providerInfos.get(i -1))) {            isWeightSame =false;// 计算所有权重是否一样}    }if(totalWeight >0&& !isWeightSame) {// 如果权重不相同且权重大于0则按总权重数随机intoffset = random.nextInt(totalWeight);// 并确定随机值落在哪个片断上for(inti =0; i < size; i++) {            offset -= getWeight(providerInfos.get(i));if(offset <0) {                providerInfo = providerInfos.get(i);break;            }        }    }else{// 如果权重相同或权重为0则均等随机providerInfo = providerInfos.get(random.nextInt(size));    }returnproviderInfo;}

上面主要做了几件事:

获取所有的provider

遍历provier,如果当前的provider的权重和上一个provider的权重不一样,那么就做个标记

如果权重不相同那么就随机取一个0到总权重之间的值,遍历provider去减随机数,如果减到小于0,那么就返回那个provider

如果没有权重相同,那么用随机函数取一个provider

我们再来看看dubbo是怎么实现的:

@OverrideprotectedInvokerdoSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){intlength = invokers.size();// Number of invokersbooleansameWeight =true;// Every invoker has the same weight?intfirstWeight = getWeight(invokers.get(0), invocation);inttotalWeight = firstWeight;// The sum of weightsfor(inti =1; i < length; i++) {intweight = getWeight(invokers.get(i), invocation);        totalWeight += weight;// Sumif(sameWeight && weight != firstWeight) {            sameWeight =false;        }    }if(totalWeight >0&& !sameWeight) {// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.intoffset = ThreadLocalRandom.current().nextInt(totalWeight);// Return a invoker based on the random value.for(inti =0; i < length; i++) {            offset -= getWeight(invokers.get(i), invocation);if(offset <0) {returninvokers.get(i);            }        }    }// If all invokers have the same weight value or totalWeight=0, return evenly.returninvokers.get(ThreadLocalRandom.current().nextInt(length));}

获取invoker的数量

获取第一个invoker的权重,并复制给firstWeight

循环invoker集合,把它们的权重全部相加,并复制给totalWeight,如果权重不相等,那么sameWeight为false

如果invoker集合的权重并不是全部相等的,那么获取一个随机数在1到totalWeight之间,赋值给offset属性

循环遍历invoker集合,获取权重并与offset相减,当offset减到小于零,那么就返回这个inovker

如果权重相等,那么直接在invoker集合里面取一个随机数返回

从上面我们可以看到,基本上SOFARPC和dubbo的负载均衡实现是一致的。

本地优先算法

在负载均衡时使用保持本机优先。这个相信大家也比较好理解。在所有的可选地址中,找到本机发布的地址,然后进行调用。

@OverridepublicProviderInfodoSelect(SofaRequest invocation, List<ProviderInfo> providerInfos){    String localhost = SystemInfo.getLocalHost();if(StringUtils.isEmpty(localhost)) {returnsuper.doSelect(invocation, providerInfos);    }    List localProviderInfo =newArrayList();for(ProviderInfo providerInfo : providerInfos) {// 解析IP,看是否和本地一致if(localhost.equals(providerInfo.getHost())) {            localProviderInfo.add(providerInfo);        }    }if(CommonUtils.isNotEmpty(localProviderInfo)) {// 命中本机的服务端returnsuper.doSelect(invocation, localProviderInfo);    }else{// 没有命中本机上的服务端returnsuper.doSelect(invocation, providerInfos);    }}

查看本机的host,如果为空,那么直接调用父类随机算法

遍历所有的provider,如果服务提供方的host和服务调用方的host一致,那么保存到集合里

如果存在服务提供方的host和服务调用方的host一致,那么就在这些集合中选取

如果不一致,那么就在所有provider中选取

轮询算法

我们首先来看看SOFARPC的轮训是怎么实现的:

privatefinalConcurrentMap sequences =newConcurrentHashMap();@OverridepublicProviderInfodoSelect(SofaRequest request, List<ProviderInfo> providerInfos){    String key = getServiceKey(request);// 每个方法级自己轮询,互不影响intlength = providerInfos.size();// 总个数PositiveAtomicCounter sequence = sequences.get(key);if(sequence ==null) {        sequences.putIfAbsent(key,newPositiveAtomicCounter());        sequence = sequences.get(key);    }returnproviderInfos.get(sequence.getAndIncrement() % length);}privateStringgetServiceKey(SofaRequest request){    StringBuilder builder =newStringBuilder();    builder.append(request.getTargetAppName()).append("#")        .append(request.getMethodName());returnbuilder.toString();}

从上面的代码我们可以看出,SOFARPC的轮询做的很直接简单。就是new了一个map,然后把每个服务的服务名拼上方法名存到map里面,然后每次value值自增1对provider取模。

我们再看dubbo的实现方式:

protectedInvokerdoSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){    String key = invokers.get(0).getUrl().getServiceKey() +"."+ invocation.getMethodName();    ConcurrentMap map = methodWeightMap.get(key);if(map ==null) {        methodWeightMap.putIfAbsent(key,newConcurrentHashMap());        map = methodWeightMap.get(key);    }inttotalWeight =0;longmaxCurrent = Long.MIN_VALUE;longnow = System.currentTimeMillis();    Invoker selectedInvoker =null;    WeightedRoundRobin selectedWRR =null;for(Invoker invoker : invokers) {        String identifyString = invoker.getUrl().toIdentityString();        WeightedRoundRobin weightedRoundRobin = map.get(identifyString);intweight = getWeight(invoker, invocation);if(weight <0) {            weight =0;        }if(weightedRoundRobin ==null) {            weightedRoundRobin =newWeightedRoundRobin();            weightedRoundRobin.setWeight(weight);            map.putIfAbsent(identifyString, weightedRoundRobin);            weightedRoundRobin = map.get(identifyString);        }if(weight != weightedRoundRobin.getWeight()) {//weight changedweightedRoundRobin.setWeight(weight);        }longcur = weightedRoundRobin.increaseCurrent();        weightedRoundRobin.setLastUpdate(now);if(cur > maxCurrent) {            maxCurrent = cur;            selectedInvoker = invoker;            selectedWRR = weightedRoundRobin;        }        totalWeight += weight;    }if(!updateLock.get() && invokers.size() != map.size()) {if(updateLock.compareAndSet(false,true)) {try{// copy -> modify -> update referenceConcurrentMap newMap =newConcurrentHashMap();                newMap.putAll(map);                Iterator> it = newMap.entrySet().iterator();while(it.hasNext()) {                    Entry item = it.next();if(now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {                        it.remove();                    }                }                methodWeightMap.put(key, newMap);            }finally{                updateLock.set(false);            }        }    }if(selectedInvoker !=null) {        selectedWRR.sel(totalWeight);returnselectedInvoker;    }// should not happen herereturninvokers.get(0);}

dubbo的轮询的实现里面还加入了权重在里面,sofarpc的权重轮询是放到另外一个类当中去做的,因为性能太差了而被弃用了。

我们举个例子来简单看一下dubbo的加权轮询是怎么做的:

假定有3台dubboprovider:10.0.0.1:20884, weight=2

10.0.0.1:20886, weight=3

10.0.0.1:20888, weight=4

totalWeight=9;

那么第一次调用的时候:

10.0.0.1:20884, weight=2    selectedWRR ->current =210.0.0.1:20886, weight=3    selectedWRR ->current =310.0.0.1:20888, weight=4    selectedWRR ->current =4selectedInvoker->10.0.0.1:20888调用 selectedWRR.sel(totalWeight);10.0.0.1:20888, weight=4    selectedWRR ->current = -5返回10.0.0.1:20888这个实例那么第二次调用的时候:10.0.0.1:20884, weight=2    selectedWRR ->current =410.0.0.1:20886, weight=3    selectedWRR ->current =610.0.0.1:20888, weight=4    selectedWRR ->current = -1selectedInvoker->10.0.0.1:20886调用 selectedWRR.sel(totalWeight);10.0.0.1:20886 , weight=4  selectedWRR ->current = -3返回10.0.0.1:20886这个实例那么第三次调用的时候:10.0.0.1:20884, weight=2    selectedWRR ->current =610.0.0.1:20886, weight=3    selectedWRR ->current =010.0.0.1:20888, weight=4    selectedWRR ->current =3selectedInvoker->10.0.0.1:20884调用 selectedWRR.sel(totalWeight);10.0.0.1:20884, weight=2    selectedWRR ->current = -3返回10.0.0.1:20884这个实例

一致性hash算法

在SOFARPC中有两种方式实现一致性hash算法,一种是带权重的一种是不带权重的,我对比了一下,两边的代码基本上是一样的,所以我直接分析带权重的代码就好了。

下面我们来分析一下代码:

privatefinalConcurrentHashMap selectorCache =newConcurrentHashMap();@OverridepublicProviderInfodoSelect(SofaRequest request, List<ProviderInfo> providerInfos){    String interfaceId = request.getInterfaceName();    String method = request.getMethodName();    String key = interfaceId +"#"+ method;// 判断是否同样的服务列表inthashcode = providerInfos.hashCode();    Selector selector = selectorCache.get(key);// 原来没有if(selector ==null||// 或者服务列表已经变化selector.getHashCode() != hashcode) {        selector =newSelector(interfaceId, method, providerInfos, hashcode);        selectorCache.put(key, selector);    }returnselector.select(request);}

上面的doSelect方法就是获取到相同服务的Selector,如果没有就新建一个。Selector是WeightConsistentHashLoadBalancer里面的内部类,我们接下来看看这个内部类的实现。

publicSelector(String interfaceId, String method, List actualNodes,inthashcode){this.interfaceId = interfaceId;this.method = method;this.hashcode = hashcode;// 创建虚拟节点环 (provider创建虚拟节点数 =  真实节点权重 * 32)this.virtualNodes =newTreeMap();// 设置越大越慢,精度越高intnum =32;for(ProviderInfo providerInfo : actualNodes) {for(inti =0; i < num * providerInfo.getWeight() /4; i++) {byte[] digest = HashUtils.messageDigest(providerInfo.getHost() + providerInfo.getPort() + i);for(inth =0; h <4; h++) {longm = HashUtils.hash(digest, h);                virtualNodes.put(m, providerInfo);            }        }    }}

Selector内部类中就是构建了一个TreeMap实例,然后遍历所有的provider,每个provider虚拟的节点数是(真实节点权重 * 32)个。

虚拟好节点后,我们直接调用Selector#select方法在hash环中得到相应的provider。

publicProviderInfoselect(SofaRequest request){    String key = buildKeyOfHash(request.getMethodArgs());byte[] digest = HashUtils.messageDigest(key);returnselectForKey(HashUtils.hash(digest,0));}/** * 获取第一参数作为hash的key * *@paramargs the args *@returnthe string */privateStringbuildKeyOfHash(Object[] args){if(CommonUtils.isEmpty(args)) {returnStringUtils.EMPTY;    }else{returnStringUtils.toString(args[0]);    }}/** * Select for key. * *@paramhash the hash *@returnthe provider */privateProviderInfoselectForKey(longhash){    Map.Entry entry = virtualNodes.ceilingEntry(hash);if(entry ==null) {        entry = virtualNodes.firstEntry();    }returnentry.getValue();}

这上面主要是获取第一参数作为hash的key,然后对它进行hash。所以我感觉这里可能有一个问题就是如果一个某个服务里面很多个参数一样的服务,那么是不是都会打到那同一台机器上呢?

dubbo的实现方式也和SOFARPC类似,这里不再赘述。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容