本人小白,这个文章是本人的阅读笔记,不是权威解读,需要自己甄别对错
大致流程跑完了,现在分析下IRule相关实现和机制
整个流程跑下来,我们知道Ribbon中有三个核心组件,ILoadBalancer,IRule,IPing,其中IRule是起到做负载均衡算法作用的,在前文中我们从AllList中选择一个Server出来,就是使用的IRule中的算法。
首先我们看看IRule组件的初始化
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
这段初始化说的意思是如果我们配置了自定义的IRule策略就使用我们定义的,如果没有定义就走默认的,前文我们已经看过了,ZoneAvoidanceRule的策略实现是走父类PredicateBasedRule的轮询策略
那么我们看看常见可配置的负载均衡策略在源码里提供了多少实现:
- RoundRobinRule
- AvailabilityFilteringRule
- BestAvailableRule
- RandomRule
- RetryRule
- WeightedResponseTimeRule
- ZoneAvoidanceRule
大概就这些吧,其余的不眼熟,也不常用,就不做分析了,老师傅对我建议是,生产环境下,别瞎捣鼓,用默认的就行 -_-!
RoundRobinRule --轮询算法
首先看看这个,为什么先这个呢,后面的许多均衡策略用该算法为基础算法的,先上核心方法看看:
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
//获取状态可用的服务列表
List<Server> reachableServers = lb.getReachableServers();
//获取所有注册服务的列表
List<Server> allServers = lb.getAllServers();
//可用服务列表大小
int upCount = reachableServers.size();
//注册服务列表大小
int serverCount = allServers.size();
//可用服务或者注册表大小为0,返回null
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
//获取下一个下标,使用的死循环和CAS 就是自旋锁那一套玩意
int nextServerIndex = incrementAndGetModulo(serverCount);
//获取下标对应的服务
server = allServers.get(nextServerIndex);
// 服务为空,歇会,交出计算资源,继续
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
//如果服务存活可用
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
看了一遍,不就是轮询算法么???那么和默认的AbstractServerPredicate中的轮询有毛不同么,我的天,请原谅我的无知,不懂!
先看看什么个逻辑,大致逻辑都加了注释,嗯,看完的感受:毛玩意!!!我先吐槽下,我是真不懂,既然你获取了可用服务列表,为什么还用注册服务列表做轮询???难道是怕自旋锁锁住的时间内,服务节点状态改变了???算了,回头在研究
大概的意思就是轮询,选择一个可用服务节点进行返回,如果没有,就寻找10次,最后还没有,就是返回NULL了,中间如果发现寻找到服务为NULL,将进行线程yield操作,从这里看出来,这个比默认的轮询算法加了一个可用服务过滤。
AvailabilityFilteringRule
直接干核心方法吧
@Override
public Server choose(Object key) {
int count = 0;
//使用roundRobinRule选择一个服务出来
Server server = roundRobinRule.choose(key);
//总共循环10次
while (count++ <= 10) {
//判读是否满足条件
if (predicate.apply(new PredicateKey(server))) {
return server;
}
//选择下一个
server = roundRobinRule.choose(key);
}
//调用父类的轮询策略
return super.choose(key);
}
调用轮询算法找到一个可用服务节点,之后判断是否满足一个条件,之后满足就返回,如果没有满足,就下一个。
那这个条件是啥?跟进去看看:
@Override
public boolean apply(@Nullable PredicateKey input) {
//获取状态收集器 自己起的名字
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
//这步是真正用的判断逻辑
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
就是判断当前的服务不是处于熔断器开启状态,且当前的服务连接数小于设置的并发数量,默认限制并发数是Integer.MAX_VALUE,没有这种机器能支持这个并发!!!这个一般配合hystrix使用比较好吧,接着看choose()方法
总之,大概的流程就是选择一个可用的,且并发数小于阈值的服务,如果10次还没找到,直接使用父类的轮询选择的节点
BestAvailableRule
核心方法:
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
//获取所有服务
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
//遍历所有节点
for (Server server: serverList) {
//获取服务状态
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
//判断当前服务是否是熔断状态
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
// 找到最小连接数的服务
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
//调用父类轮询算法
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
差不多的意思,找到最小连接数且服务不是熔断状态的服务,感觉没啥可说的,但是每次选取都是走一遍遍历,找到最小连接数的Server,和AvailabilityFilteringRule是由很大区别的
RandomRule
随机选择的,懒得看了,一般不用
RetryRule
看名字就知道是重试规则,看看核心方法吧
public Server choose(ILoadBalancer lb, Object key) {
//获取当前发起时间
long requestTime = System.currentTimeMillis();
//获取中止时间
long deadline = requestTime + maxRetryMillis;
Server answer = null;
//轮询选择一个Server
answer = subRule.choose(key);
//如果选取的服务无效或者服务不可用 并且当前时间小于中止时间
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
//设置一个中断调度任务 执行时间deadline - System.currentTimeMillis()
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
//如果当前线程未被中断
while (!Thread.interrupted()) {
//再轮询选择一个Server
answer = subRule.choose(key);
// 一样的判断规则
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
// 让出CPU
Thread.yield();
} else {
break;
}
}
//中断任务取消
task.cancel();
}
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
大致的流程:
- 轮询选择一个Server
- 如果当前服务可用直接返回
- 如果当前服务不可用,启动一个线程中断任务,并且循环轮询服务节点,知道找到可用或者超过中止时间
大致的代码都加上了注解,好处就是不会无限循环,有重试超时时间,超时指的是找到可用节点的时间超时
WeightedResponseTimeRule
基于权重的选取规则,这个权重和我们在Nginx配置的权重还是有区别的,WeightedResponseTimeRule的权重是基于响应时间,响应时间越长权重越小
该选取规则在对象初始化时候执行了initialize()方法
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
//初始了一个定时调度器
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
//设置执行方法和间隔
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
//先走一遍初始计算
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
//线程中止时候取消调度任务
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}
这个初始化方法就是初始化了一个调度任务,30S执行一次,第一次计算了相关值,看看调度任务干了啥
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
//这一步是计算注册服务的所有响应时间之和
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;
// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
//遍历所有注册服务,计算权重,权重计算规则是所有服务响应时间-当前服务响应时间
//那么响应时间越短的权重越大
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
30S执行一次权重计算,服务权重 = 所有服务响应时间之和 - 当前服务响应时间
看看核心方法吧
while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
//线程中断返回null
if (Thread.interrupted()) {
return null;
}
//获取所有服务
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
//获取最后节点的响应时间
// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
//如maxTotalWeight小于0.001,或者服务列表和权重计算集合数量不匹配
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
//返回轮询选取的节点
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
//生成 0 到 maxTotalWeight的随机权重的
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
//找到第一个比随机权重大的服务下标
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
//获取该服务
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
看了一遍,随机性很大,但是权重大选取的概率是比较大,选取的maxTotalWeight是最后一个服务节点的平均响应时间,不是真正的最大响应时间啊,我找了一圈也没发现排序之类的东西,并且如果真排序了,那么和服务列表也对应不上了。
为什么说随机性很大呢,最后一个节点的平均响应时间不知道处在什么什么水平,之后又是取了0-maxTotalWeight的随机数,但是权重越大,d >= randomWeight越容易成立,越容易选取,说明一点不是说你的权重最大,你就一定被选取。
ZoneAvoidanceRule
默认的轮询算法,没啥可说的,也是取模+CAS那一套,代码放出来看看吧
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}