接着上篇的文章来。
负载均衡算法
- 代码出处,使用了一个工具类完成了负载均衡后端节点的选取
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
- 看下这里的源码
public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {
LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);
return loadBalance.select(upstreamList, ip);
}
- 可以看出这里是通过
SPI
的机制,根据传入算法名称,运行时动态得到具体的负载均衡实现,由扩展加载器加载到的对应的负载均衡实例 - 先看看这里的
SPI
扩展点机制,然后再去看具体的负载均衡算法
SPI扩展点机制
-
org.dromara.soul.spi.ExtensionLoader
关键代码
public static <T> ExtensionLoader<T> getExtensionLoader(final Class<T> clazz) {
if (clazz == null) {
throw new NullPointerException("extension clazz is null");
}
if (!clazz.isInterface()) {
throw new IllegalArgumentException("extension clazz (" + clazz + ") is not interface!");
}
if (!clazz.isAnnotationPresent(SPI.class)) {
throw new IllegalArgumentException("extension clazz (" + clazz + ") without @" + SPI.class + " Annotation");
}
ExtensionLoader<T> extensionLoader = (ExtensionLoader<T>) LOADERS.get(clazz);
if (extensionLoader != null) {
return extensionLoader;
}
LOADERS.putIfAbsent(clazz, new ExtensionLoader<>(clazz));
return (ExtensionLoader<T>) LOADERS.get(clazz);
}
- 使用
SPI
机制是soul
提供出来的扩展点,如果使用用户想要实现一套自己的负载均衡算法,则只要根据soul
的规范,并做相应的配置就可以完成了。
// TODO 写一个简单例子演示
- 我们了解下 扩展点加载器
ExtensionLoader
,该加载器参考了dubbo
中的ExtensionLoader
,为其缩减版;对比之下,主要减少了依赖注入功能。跟jdk
原生的SPI
机制相比,实现了按需加载,因jdk
会一次性将所有的扩展实现一次性加载的。 - 关于
SPI
扩展点机制 -
soul
中SPI
扩展点机制就先介绍到这里,我们看下soul
提供的几种负载均衡算法
负载均衡算法实现
- 从源码得出,负载均衡算法实现默认🈶️3种
random
、roundRobin
,hash
random
-加权随机
- 看代码实际上是一个加权的随机算法
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
// 计算所有后端节点的权重值
int totalWeight = calculateTotalWeight(upstreamList);
// 判断是否所有的节点为相同权重的情况
boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
if (totalWeight > 0 && !sameWeight) {
// 不是,则要加权随机
return random(totalWeight, upstreamList);
}
// If the weights are the same or the weights are 0 then random
// 如果各节点权重都相同,则按照总的节点数N,生成一个1-N之间的随机数X,然后选取出该X节点
return random(upstreamList);
}
- 加权随机的实现稍稍复杂点,看下源码
private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
// If the weights are not the same and the weights are greater than 0, then random by the total number of weights
// 先拿到随机数为区间,为1-总权重值N之间
int offset = RANDOM.nextInt(totalWeight);
// Determine which segment the random value falls on
// 遍历所有的节点,每次循环都用区间=区间-当前权重值,然后再看区间是否小于0;
// 若小于0,则证明刚好位于当前节点的区间,返回当前节点;若没有,则继续循环
for (DivideUpstream divideUpstream : upstreamList) {
offset -= getWeight(divideUpstream);
if (offset < 0) {
return divideUpstream;
}
}
// 最后返回第一个兜底
return upstreamList.get(0);
}
roundRobin
-加权轮询
- 加权轮询的算法会有些难懂,我们先看下关键方法
doSelect
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
//TODO question 如果这批upstream第一个节点经常变更,可能会导致前面节点生成的WeightedRoundRobin数据无法释放
String key = upstreamList.get(0).getUpstreamUrl();
// 取出此批后端服务的权重对象 key -> 后端server,为ip:port value -> 后端server对应的权重对象
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
// 总权重值
int totalWeight = 0;
// 当前请求权重的最大值,每次请求都会置为MIN_VALUE
long maxCurrent = Long.MIN_VALUE;
// 当前系统时间
long now = System.currentTimeMillis();
// 选中节点
DivideUpstream selectedInvoker = null;
// 选中节点的权重
WeightedRoundRobin selectedWRR = null;
//TODO question 轮询这里为什么没有break,应该找到选中节点后就跳出循环,不需要继续循环下去
for (DivideUpstream upstream : upstreamList) {
// rkey -> 后端server,ip:port
String rKey = upstream.getUpstreamUrl();
// 后端server对应的权重对象
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
// 当前后端节点的权重值
int weight = getWeight(upstream);
// 如果当前后端server权重对象不存在,则需生成当前后端server的权重对象,并添加到内存
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(rKey, weightedRoundRobin);
}
// 节点权重值发生了变化,则需要重新设置
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
// 权重值发生变化后,将内存中后端server权重更新为最新权重值
// 同时其内部current值也将更新为0,回到初始状态
weightedRoundRobin.setWeight(weight);
}
// 内部序列从0开始增,每次进来都+自己的权重值
long cur = weightedRoundRobin.increaseCurrent();
// 重新设置更新时间
weightedRoundRobin.setLastUpdate(now);
// 如果节点当前轮询权重大于此次调用的最大权重值,则满足项,可以作为选中节点
// 这里选中节点后,并不会停止循环,会遍历所有的节点,将最后一个满足条件的节点作为选中节点
// 这里是为了找到当前权重最大的那个后端server,将其作为选中节点
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWRR = weightedRoundRobin;
}
// 总权重
totalWeight += weight;
}
// 更新标志锁位未更新 后端节点数不等于原有节点数 采用compareAndSet取锁,获取到锁之后才更新
if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
// 新的map
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
// 如果从轮询开始,到结束,中间间隔了1min钟,则需要移除该后端server的权重对象数据
// 这里是为了移除掉那些下线的节点,因为只要有被轮询,其LastUpdate=now;而超过1min钟,都没有轮询到的,则就是已下线的节点
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
// 将新的后端节点的权重数据替换旧有的
methodWeightMap.put(key, newMap);
} finally {
// 最终将锁的标志为设置为false
updateLock.set(false);
}
}
// 选中节点的处理
if (selectedInvoker != null) {
// 选中节点的内部计数的当前权重current要减去总的权重值,降低其当前权重,也就意味着其下次被选中的优先级被降低
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return upstreamList.get(0);
}
- 因不懂其思想,在
debug
源码过程中,将关键变量变化的过程用表格记录了下来,方便跟踪逻辑
- 从上图可以看出:当权重为50:50时,4次调用其最终选取出来的结果为1:1;当权重为10:20时,6次调用其最终选中出来的结果为1:2,前3次调用最终选取出来的结果也为1:2。
-
//TODO
很是神奇,不知晓其中原理 实现逻辑看起来比较复杂,简单说来就是 对于每个 DivideUpstream 都维护一个权重对象,每次遍历所有权重对象获取到权重最大的那个 DivideUpstream,同时减去被选中的 DivideUpstream 的权重(减去总权重,优先级降为最低)
- 参考:【高性能网关soul学习】11. 插件之DevidePlugin
hash
-IP哈希
- 实际上是基于远程客户端
IP
的一个hash
负载均衡算法 -
doSelect
的源码
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
// 并发跳表map key->后端server的hash值 value->后端节点
final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
for (DivideUpstream address : upstreamList) {
// 每个实际后端节点会生成5个虚拟节点,用于更大范围映射,类似于一致性hash
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
// 后端server的hash
long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
treeMap.put(addressHash, address);
}
}
// 远程客户端ip的hash
long hash = hash(String.valueOf(ip));
// 如果 请求地址的Hash值 右边存在节点,返回右边第一个
// treeMap.tailMap(hash) 会返回所有(key >= hash)的映射集合,同时是有序的。
SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
// 如果找到了,则从找到的entry拿出后端节点
if (!lastRing.isEmpty()) {
// 这里也就是取 url hash 值右边的第一个节点
return lastRing.get(lastRing.firstKey());
}
// 没找到,则直接去第一个节点兜底
return treeMap.firstEntry().getValue();
}
- 有点高级,内部使用了
ConcurrentSkipListMap
结构,以及一致性hash
中虚拟节点的技术。 - 关于
ConcurrentSkipListMap
- 关于
一致性hash
算法
总结
-
Divide
插件这块的内容还挺多的,后面还需要将负载均衡这块的逻辑重点看一看,搞清楚roundRobin
与hash
算法的实现,同时也去看看其他框架中这两种算法的实现 - 已分析
Divide
插件中重点块:后端节点的探活机制、负载均衡算法 - 后续还有
http
服务的请求转发和websocket
服务的支持 - 关于负载均衡分析的好文章