Divide插件介绍
Divide插件是进行http正向代理的插件,所有http类型的请求,都由该插件进行负载均衡调用。
-
负载均衡:
- 随机(带权重):性能高,但均衡差一些
- 轮询(带权重):性能较随机差一些,但均衡性好
- 一致性Hash:同一个客户端的IP请求,始终会被同一个Server处理
服务探活
Divide插件运行原理
当每次请求通过网关时,DividePlugin会通过request uri获取选择器和规则,如果上游服务的rpc type是http类型,那么DividerPlugin会对这个请求进行处理。代码如下:
public class DividePlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// 获取SoulContext信息
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
// 获取规则信息
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
// 根据选择器,获取上游服务的地址列表,注意:如果某上游服务不可用不在该列表中
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
// 如果空则返回Error,没有上游服务可用
if (CollectionUtils.isEmpty(upstreamList)) {
log.error("divide upstream configuration error: {}", rule.toString());
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 获取 客户端IP目的是为HashLoadBalance使用
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// 选择LoadBalance策略,并选择其中一个可用的上游服务地址
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
// 如果没有可用的上游服务,则返回Error
if (Objects.isNull(divideUpstream)) {
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 构建domain
String domain = buildDomain(divideUpstream);
// 构建真实的URL(上游服务的URL)
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// 设置超时
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
// 设置重试次数
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
// 交由下一个Plugin处理
return chain.execute(exchange);
}
// 判断rpc type如果不是http,则不处理,跳到下一个plugin
@Override
public Boolean skip(final ServerWebExchange exchange) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
return !Objects.equals(Objects.requireNonNull(soulContext).getRpcType(), RpcTypeEnum.HTTP.getName());
}
private String buildDomain(final DivideUpstream divideUpstream) {
// ..
}
private String buildRealURL(final String domain, final SoulContext soulContext, final ServerWebExchange exchange) {
//...
}
}
负载均衡策略
轮询
轮询(带权重)是常见负载均衡策略之一。轮询能保证所有请求均衡分发到每个Server端。如果每个Server端的处理性能有差别,可能设置权重。权重高的将多分担些请求。
public class RoundRobinLoadBalance extends AbstractLoadBalance {
private final int recyclePeriod = 60000;
private final ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<>(16);
private final AtomicBoolean updateLock = new AtomicBoolean();
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
// url 做为key
String key = upstreamList.get(0).getUpstreamUrl();
// 存储url的权重
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
// 如果key不存在,初始化空map
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
// 初始化总权重
int totalWeight = 0;
// 当前权重的最大值
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
DivideUpstream selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (DivideUpstream upstream : upstreamList) {
String rKey = upstream.getUpstreamUrl();
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
// 根据warnup和配置的权重计算出新的权重值
int weight = getWeight(upstream);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
// 如果key不存在,则将WeightedRoundRobin put
map.putIfAbsent(rKey, weightedRoundRobin);
}
// 如果权重值不要等,则更新权重
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
// 将权重值累加
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
// 如果 当前的权重值 大于 最大值,则选择此url
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWRR = weightedRoundRobin;
}
// 总的权重值累加
totalWeight += weight;
}
// 更新状态为false并且map和上游服务列表不一致,则更新map并设置updateLock为true
if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
// 复制一份map
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
// 如果当前时间和上次更新的时间大于60s,则删除
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
// 放入到methodWeightMap中
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
// 如果被选中,则将该key的权重的值重新置为 -totalWeight。也就是权重最低值
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return upstreamList.get(0);
}
}
总结一下,今天主要是学习Divide插件的负载均衡功能。明天对另外两个策略进行分析。