最后再来说一下Predicate这个类。
负载均衡使用这个类用来过滤服务。
- 1接口结构如下
Predicate
public interface Predicate<T> {
boolean apply(@Nullable T input);
@Override
boolean equals(@Nullable Object object);
}
从接口的结构上来看比较简单,只有一个apply返回boolean类型的结果,以及一个equals方法。
下面看看相关实现类

相关实现类
- 2相关类型
- 2.1 AbstractServerPredicate
这个抽象类其实定义了基本的一个过滤流程
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
对服务列表进行过滤,通过子类的apply方法
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
}
- 2.2 相关实现类
- 2.2.1 CompositePredicate
复合过滤
public class CompositePredicate extends AbstractServerPredicate {
//delegate 优先的过滤规则
private AbstractServerPredicate delegate;
//服务列表不满足要求的时候,用次要的过滤规则重新匹配
private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
private int minimalFilteredServers = 1;
private float minimalFilteredPercentage = 0;
@Override
public boolean apply(@Nullable PredicateKey input) {
使用优先的过滤规则进行过滤
return delegate.apply(input);
}
public static class Builder {
private CompositePredicate toBuild;
Builder(AbstractServerPredicate primaryPredicate) {
toBuild = new CompositePredicate();
toBuild.delegate = primaryPredicate;
}
Builder(AbstractServerPredicate ...primaryPredicates) {
toBuild = new CompositePredicate();
Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);
toBuild.delegate = AbstractServerPredicate.ofKeyPredicate(chain);
}
public Builder addFallbackPredicate(AbstractServerPredicate fallback) {
toBuild.fallbacks.add(fallback);
return this;
}
public Builder setFallbackThresholdAsMinimalFilteredNumberOfServers(int number) {
toBuild.minimalFilteredServers = number;
return this;
}
public Builder setFallbackThresholdAsMinimalFilteredPercentage(float percent) {
toBuild.minimalFilteredPercentage = percent;
return this;
}
public CompositePredicate build() {
return toBuild;
}
}
public static Builder withPredicates(AbstractServerPredicate ...primaryPredicates) {
return new Builder(primaryPredicates);
}
public static Builder withPredicate(AbstractServerPredicate primaryPredicate) {
return new Builder(primaryPredicate);
}
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
服务列表未满足要求的时候,按次要的过滤规则进行重新过滤
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
}
- 2.2.2 ZoneAvoidancePredicate
源码如下
public class ZoneAvoidancePredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
String serverZone = input.getServer().getZone();
没有分区直接返回
if (serverZone == null) {
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
没有统计信息直接返回
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
可靠的分区数<=1,直接返回
return true;
}
通过ZoneAvoidanceRule获取分区的统计快照
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
return true;
}
拿到可靠的分区集合,通过分区的统计数据来分析
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
如果可靠的分区中包括该分区则返回true,否则返回false
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
}
- 2.2.3 AvailabilityPredicate
用于判断服务是否熔断,或者压力比较大,超过限制。
源码如下
public class AvailabilityPredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {
如果服务熔断,或者活跃请求数超过限制了,则跳过
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
}