之前的限流功能都是单机版的,只能统计本地的服务调用次数信息,那么如果是在集群状态下,一个服务被放在了多个服务器上,假设一个集群有5台机器,每台机器单机限流阈值为10qps,理想状态下整个集群的限流阈值就是50qps,不过实际状态下路由到每台机器的流量可能会不均匀,会导致总量没有到的情况下某些机器就开始限流。
每个单机实例只关心自己的阈值,但是对于整个系统的全局阈值大家都漠不关心,当我们希望为某个api设置一个总的Qps时,那么单机模式下的限流就无法满足条件。
单机版是在每个实例中进行统计,而集群版是有一个专门的实例进行统计。
这个专门用来统计数据的称为Sentinel的token server,其他的实例作为Sentinel的token client会向token server去请求token,如果能获取到token,则说明当前的qps还未达到总的阈值,否则就说明已经达到集群的总阈值,当前实例就会被block。
集群限流适合的场景:
1) 在API Gateway处统计某个api的总访问量,并对某个api或者服务的总qps进行限制。
2)Service Mesh中对服务间的调用进行全局流控。
3)集群内对热点商品的总访问频次进行限制。
起点
Sentinel的集群限流是在FlowSlot
中实现的。它会根据资源名找到所有的限流规则FlowRule
,然后依次对每个规则调用canPassCheck
进行判断,是否能够通过该限流规则。
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
如果该规则是应用在集群模式下,则会调用passClusterCheck
方法。
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
TokenService clusterService = pickClusterService(); \\@1
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); //@2
}
long flowId = rule.getClusterConfig().getFlowId(); //@3
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); //@4
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
@1代码处是获取当前节点是Token Client还是Token Server。
1) 如果当前节点的角色是Client,返回的TokenService为DefaultClusterTokenClient
;
2)如果当前节点的角色是Server,则默认返回的TokenService为DefaultTokenService
。
@2代码处: 如果无法获取集群的的TokenService,那么该流量控制规则可以退化为单机限流模式。
private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
return passLocalCheck(rule, context, node, acquireCount, prioritized);
} else {
return true;
}
}
@3代码处获取该流量控制的flowId,在集群模式下,每一个rule都有一个对应的ClusterFlowConfig
。
ClusterFlowConfig类介绍:
public class ClusterFlowConfig {
// 全局唯一id
private Long flowId;
// 有两种阈值类型,一种是单机均摊,一种是集群总体模式
private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL;
//集群不可用时是否回退到单机模式
private boolean fallbackToLocalWhenFail = true;
private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;
// 集群采样数 10
private int sampleCount = ClusterRuleConstant.DEFAULT_CLUSTER_SAMPLE_COUNT;
// 1000ms,即1秒
private int windowIntervalMs = RuleConstant.DEFAULT_WINDOW_INTERVAL_MS;
@4代码处根据获取的flowId通过TokenService进行申请token。从上面可知,它可能是TokenClient调用的,也可能是ToeknServer调用的。分别对应的类是DefaultClusterTokenClient
和DefaultTokenService
。
下面分别从TokenClient和TokenService两个角色进行解读。
DefaultClusterTokenClient
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
// 如果flowId是无效的,或则count小于等于0
// id == null || id <= 0 || count <= 0;
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}
// 新建一个请求对象
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
// 进一步封装为ClusterRequest,消息类型是Flow,
// MSG_TYPE_PING = 0; MSG_TYPE_FLOW = 1; MSG_TYPE_PARAM_FLOW = 2;
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
// 然后向TokenServer发送请求
TokenResult result = sendTokenRequest(request);
logForResult(result);
return result;
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
if (transportClient == null) {
RecordLog.warn(
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
return clientFail();
}
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
}
return result;
}
在客户端启动的时候会创建与TokenServer之间的连接,当发送请求时如何客户端对象为空,就会记录请求失败。
DefaultTokenService
Token Server收到客户端的请求后,会调用FlowRequestProcessor
的processRequest
,最终会调用DefaultTokenService
的requestToken
方法。
@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
boolean prioritized = request.getData().isPriority();
TokenResult result = tokenService.requestToken(flowId, count, prioritized);
return toResponse(result, request);
}
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
// 和上面一样,先进行验证
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
// 从一个Map中进行查找
// private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>();
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
// 没有该规则
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
// 服务端进行检查,是否发送令牌token
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}
因为acquireClusterToken
相对较长,故进行了拆分讲解。
第一步:
Long id = rule.getClusterConfig().getFlowId();
if (!allowProceed(id)) {
return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
} // @1
@1代码处首先判断是否允许本次许可申请,这是因为TokenServe支持嵌入式,即支持在应用节点中嵌入一个TokenServer,为了保证许可申请的请求不对正常业务造成比较大的影响,故对申请许可这个动作进行了限流。
一旦触发了限流,将向客户端返回Too_Many_Request状态码,Sentinel支持按namespace进行限流,具体由GlobalRequestLimiter实现,该类的内部同样是基于滑动窗口进行收集,原理与FlowSlot相似,默认的限流TPS是3W。
static boolean allowProceed(long flowId) {
String namespace = ClusterFlowRuleManager.getNamespace(flowId);
return GlobalRequestLimiter.tryPass(namespace);
}
public static boolean tryPass(String namespace) {
if (namespace == null) {
return false;
}
// private static final Map<String, RequestLimiter> GLOBAL_QPS_LIMITER_MAP = new ConcurrentHashMap<>();
RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace);
if (limiter == null) {
return true;
}
return limiter.tryPass();
}
canPass
方法是计算当前的通过数+1后是否超过qpsAllowed。
public boolean tryPass() {
if (canPass()) {
add(1);
return true;
}
return false;
}
public void add(int x) {
data.currentWindow().value().add(x);
}
public boolean canPass() {
return getQps() + 1 <= qpsAllowed;
}
public double getQps() {
return getSum() / data.getIntervalInSecond();
}
private final LeapArray<LongAdder> data;
public long getSum() {
data.currentWindow();
long success = 0;
List<LongAdder> list = data.values();
for (LongAdder window : list) {
success += window.sum();
}
return success;
}
从上面可以看出在集群模式下也是利用时间窗口进行统计的。
第二大步:
根据FlowId获取对用的指标采集器metric
。
private static final Map<Long, ClusterMetric> METRIC_MAP = new ConcurrentHashMap<>();
ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
if (metric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}
该Metric具体的是一个ClusterMetricLeapArray
,与之前的OccupiableBucketLeapArray
类似,多了一个记录有关抢占数据的数组。
public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> {
private final LongAdder[] occupyCounter;
private boolean hasOccupied = false;
public ClusterMetricLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
ClusterFlowEvent[] events = ClusterFlowEvent.values();
this.occupyCounter = new LongAdder[events.length];
for (ClusterFlowEvent event : events) {
occupyCounter[event.ordinal()] = new LongAdder();
}
}
第三大步:
获取当前通过的Qps,设置的总许可数和剩余的许可数。
double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
double nextRemaining = globalThreshold - latestQps - acquireCount;
如果是FLOW_THRESHOLD_GLOBAL
,即集群的许可数等于限流规则中配置的count值。
如果是FLOW_THRESHOLD_AVG_LOCAL
,此时限流规则中配置的值只是单机的count值,还要乘以集群中客户端的数量。 上面的getExceedCount
默认是1.0。
private static double calcGlobalThreshold(FlowRule rule) {
double count = rule.getCount();
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
return count * connectedCount;
}
}
第四大步:
如果剩余的许可数大于等于0,更新当前的统计信息。
if (nextRemaining >= 0) {
//增加通过数和通过的请求数
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
// Add prioritized pass.
// Pass (pre-occupy incoming buckets)
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
// Remaining count is cut down to a smaller integer.
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
}
第五大步:
如果剩余数小于0。
if (prioritized) {
// Try to occupy incoming buckets.
// Waiting due to flow shaping or for next bucket tick.
// 获取当前等待的Qps(以1s为维度,当前等待的请求数量)
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
// 如果当前等待的Qps低于可借用未来窗口的许可阈值时,可通过,但要设置等待时间
if (occupyAvg <=
// 默认是1.0 , 后面是全局的通过阈值数
ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
// 计算等待的时间
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
// waitInMs > 0 indicates pre-occupy incoming buckets successfully.
if (waitInMs > 0) {
ClusterServerStatLogUtil.log("flow|waiting|" + id);
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
// Or else occupy failed, should be blocked.
}
}
// Blocked.
// 发生阻塞,当前请求不能通过,增加与阻塞相关指标的统计数。
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
if (prioritized) {
// Add prioritized block.
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}
return blockedResult();
}
下面再讲解下如何计算等待时间的。
// event 为 pass
public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
// 当前的通过数
double latestQps = getAvg(ClusterFlowEvent.PASS);
// 判断是否支持抢占
if (!canOccupy(event, acquireCount, latestQps, threshold)) {
return 0;
}
// 在抢占数组中添加本次的占用数
/**
* public void addOccupyPass(int count) {
occupyCounter[ClusterFlowEvent.PASS.ordinal()].add(count);
occupyCounter[ClusterFlowEvent.PASS_REQUEST.ordinal()].add(1);
this.hasOccupied = true;
}
**/
metric.addOccupyPass(acquireCount);
// 在普通的时间窗口增加等待数
/**
* public void add(ClusterFlowEvent event, long count) {
metric.currentWindow().value().add(event, count);
}
**/
add(ClusterFlowEvent.WAITING, acquireCount);
// 这里有些不懂,sampleCount默认值应该是10,
// 这样的话返回的是一个时间窗口的大小
return 1000 / metric.getSampleCount();
}
从上面可以看出,canOccupy
是一个关键方法。
private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
long headPass = metric.getFirstCountOfWindow(event);
/**
* public long getOccupiedCount(ClusterFlowEvent event) {
return occupyCounter[event.ordinal()].sum();
}
**/
// 获得Pass事件下的已占用的数
long occupiedCount = metric.getOccupiedCount(event);
// 已通过的请求数 + 本次需要的请求数 + 占用的请求数 - 第一个统计的时间窗口的数
// 如果小于等于阈值,即可以抢占
return latestQps + (acquireCount + occupiedCount) - headPass <= threshold;
}
public long getFirstCountOfWindow(ClusterFlowEvent event) {
if (event == null) {
return 0;
}
WindowWrap<ClusterMetricBucket> windowWrap = getValidHead();
if (windowWrap == null) {
return 0;
}
return windowWrap.value().get(event);
}
public WindowWrap<T> getValidHead() {
return getValidHead(TimeUtil.currentTimeMillis());
}
WindowWrap<T> getValidHead(long timeMillis) {
// Calculate index for expected head time.
int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
WindowWrap<T> wrap = array.get(idx);
if (wrap == null || isWindowDeprecated(wrap)) {
return null;
}
return wrap;
}
执行到这里,服务端已经判断出是否要发送令牌。
然后再将代码跳回到客户端请求发送的位置。
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
if (transportClient == null) {
RecordLog.warn(
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
return clientFail();
}
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
}
return result;
}
从上面可以看出,会将返回的结果中的剩余数量,等待时间。
那如何处理得到的结果Result。
private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
DefaultNode node,
int acquireCount, boolean prioritized) {
switch (result.getStatus()) {
case TokenResultStatus.OK:
return true;
case TokenResultStatus.SHOULD_WAIT:
// Wait for next tick.
try {
Thread.sleep(result.getWaitInMs());
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
case TokenResultStatus.NO_RULE_EXISTS:
case TokenResultStatus.BAD_REQUEST:
case TokenResultStatus.FAIL:
case TokenResultStatus.TOO_MANY_REQUEST:
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
case TokenResultStatus.BLOCKED:
default:
return false;
}
}
}
从上面可以看出如果状态是OK的话,返回true,允许通过。
如果是在优先级情况下,支持抢占,则根据返回的等待时间进行等待。
如果是其他的状态:则回退到单机模式进行判断。
默认情况下,返回false。
至此,关于关于集群限流就讲解完了,但是在关于计算等待时间的逻辑还是有些不清楚,欢迎大家交流。