系列
- Sentinel流程介绍
- Sentinel资源节点树构成
- Sentinel滑动窗口介绍
- Sentinel流量控制
- Sentinel的职责链slot介绍
- Sentinel熔断降级
- Sentinel Dashboard和应用通信
- Sentinel 控制台
集群限流架构
- Sentinel的集群限流的架构就是各client 节点向中心节点发起请求(携带请求个数),由中心节点进行计算返回是否通过。
集群限流源码
Sentinel 1.4.0 开始引入了集群流控模块,主要包含以下几部分:
-
sentinel-cluster-common-default
: 公共模块,包含公共接口和实体 -
sentinel-cluster-client-default
: 默认集群流控 client 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展 -
sentinel-cluster-server-default
: 默认集群流控 server 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展;同时提供扩展接口对接规则判断的具体实现(TokenService
),默认实现是复用sentinel-core
的相关逻辑
client
public class FlowRuleChecker {
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// @1 针对集群模式,通过passClusterCheck来进行是否允许通过
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
TokenService clusterService = pickClusterService();
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
// @2 向 tokenServer 发送获取token 获取请求
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
}
// @3 DefaultClusterTokenClient底层使用 netty 进行通信
public class DefaultClusterTokenClient implements ClusterTokenClient {
@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
TokenResult result = sendTokenRequest(request);
logForResult(result);
return result;
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}
}
- @1 针对集群模式,通过passClusterCheck来判断是否允许通过。
- @2 由DefaultClusterTokenClient向 tokenServer 发送获取token 获取请求。
- @3 DefaultClusterTokenClient底层使用 netty 进行通信。
server
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
boolean prioritized = request.getData().isPriority();
TokenResult result = tokenService.requestToken(flowId, count, prioritized);
return toResponse(result, request);
}
}
public class DefaultTokenService implements TokenService {
@Override
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
// @1 通过ClusterFlowChecker处理 token 获取请求
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}
}
final class ClusterFlowChecker {
static TokenResult acquireClusterToken(FlowRule rule, int acquireCount, boolean prioritized) {
Long id = rule.getClusterConfig().getFlowId();
// @2 根据限流的总数 - 已经发放token数 - 当前申请的 token 数
double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
double nextRemaining = globalThreshold - latestQps - acquireCount;
// @3 剩余 token 数大于0说明可以通过
if (nextRemaining >= 0) {
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
} else {
if (prioritized) {
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
if (waitInMs > 0) {
ClusterServerStatLogUtil.log("flow|waiting|" + id);
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
}
}
// 进行限流
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount);
ClusterServerStatLogUtil.log("flow|block_request|" + id, 1);
if (prioritized) {
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}
return blockedResult();
}
}
}
- FlowRequestProcessor的processRequest负责处理集群限流的 token 请求。
- DefaultTokenService作为 server 侧调用ClusterFlowChecker.acquireClusterToken进行处理。
- ClusterFlowChecker内部负责处理 token 申请,整体逻辑就是总 token 减去已经发放和此次需要方法的数量,如果剩余就不限流(特殊处理处理剩余不够但是允许通过),否则就进行限流。
集群限流服务端部署
- 独立模式(Alone),即作为独立的 token server 进程启动,独立部署,隔离性好,但是需要额外的部署操作。独立模式适合作为 Global Rate Limiter 给集群提供流控服务。
- 如果独立部署的 token server 服务挂掉的话,那其他的 token client 就会退化成本地流控的模式,也就是单机版的流控,所以这种方式的集群限流需要保证 token server 的高可用性。
- 嵌入模式(Embedded),即作为内置的 token server 与服务在同一进程中启动。在此模式下,集群中各个实例都是对等的,token server 和 client 可以随时进行转变,因此无需单独部署,灵活性比较好。但是隔离性不佳,需要限制 token server 的总 QPS,防止影响应用本身。嵌入模式适合某个应用集群内部的流控。
- 嵌入式部署的模式中,如果 token server 服务挂掉的话,我们可以将另外一个 token client 升级为token server。