Sentinel之集群原理

之前的限流功能都是单机版的,只能统计本地的服务调用次数信息,那么如果是在集群状态下,一个服务被放在了多个服务器上,假设一个集群有5台机器,每台机器单机限流阈值为10qps,理想状态下整个集群的限流阈值就是50qps,不过实际状态下路由到每台机器的流量可能会不均匀,会导致总量没有到的情况下某些机器就开始限流。

每个单机实例只关心自己的阈值,但是对于整个系统的全局阈值大家都漠不关心,当我们希望为某个api设置一个总的Qps时,那么单机模式下的限流就无法满足条件。
单机版是在每个实例中进行统计,而集群版是有一个专门的实例进行统计。

这个专门用来统计数据的称为Sentinel的token server,其他的实例作为Sentinel的token client会向token server去请求token,如果能获取到token,则说明当前的qps还未达到总的阈值,否则就说明已经达到集群的总阈值,当前实例就会被block。

集群限流适合的场景:
1) 在API Gateway处统计某个api的总访问量,并对某个api或者服务的总qps进行限制。
2)Service Mesh中对服务间的调用进行全局流控。
3)集群内对热点商品的总访问频次进行限制。

起点

Sentinel的集群限流是在FlowSlot中实现的。它会根据资源名找到所有的限流规则FlowRule,然后依次对每个规则调用canPassCheck进行判断,是否能够通过该限流规则。

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                    boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

如果该规则是应用在集群模式下,则会调用passClusterCheck方法。

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {
            TokenService clusterService = pickClusterService(); \\@1
            if (clusterService == null) {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);  //@2
            }
            long flowId = rule.getClusterConfig().getFlowId(); //@3
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);  //@4
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex) {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }

@1代码处是获取当前节点是Token Client还是Token Server。
1) 如果当前节点的角色是Client,返回的TokenService为DefaultClusterTokenClient
2)如果当前节点的角色是Server,则默认返回的TokenService为DefaultTokenService

@2代码处: 如果无法获取集群的的TokenService,那么该流量控制规则可以退化为单机限流模式。

private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                 boolean prioritized) {
        if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
            return passLocalCheck(rule, context, node, acquireCount, prioritized);
        } else {
            return true;
        }
    }

@3代码处获取该流量控制的flowId,在集群模式下,每一个rule都有一个对应的ClusterFlowConfig

ClusterFlowConfig类介绍:

public class ClusterFlowConfig {
    // 全局唯一id
    private Long flowId;
    // 有两种阈值类型,一种是单机均摊,一种是集群总体模式
    private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL;
    //集群不可用时是否回退到单机模式
    private boolean fallbackToLocalWhenFail = true;
    
    private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;
    // 集群采样数 10 
    private int sampleCount = ClusterRuleConstant.DEFAULT_CLUSTER_SAMPLE_COUNT;
   // 1000ms,即1秒
    private int windowIntervalMs = RuleConstant.DEFAULT_WINDOW_INTERVAL_MS;

@4代码处根据获取的flowId通过TokenService进行申请token。从上面可知,它可能是TokenClient调用的,也可能是ToeknServer调用的。分别对应的类是DefaultClusterTokenClientDefaultTokenService

下面分别从TokenClient和TokenService两个角色进行解读。

DefaultClusterTokenClient

public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
        // 如果flowId是无效的,或则count小于等于0
        // id == null || id <= 0 || count <= 0;
        if (notValidRequest(flowId, acquireCount)) {
            return badRequest();
        }
        // 新建一个请求对象
        FlowRequestData data = new FlowRequestData().setCount(acquireCount)
            .setFlowId(flowId).setPriority(prioritized);
        // 进一步封装为ClusterRequest,消息类型是Flow,
        //  MSG_TYPE_PING = 0; MSG_TYPE_FLOW = 1; MSG_TYPE_PARAM_FLOW = 2;
        ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
        try {
            // 然后向TokenServer发送请求
            TokenResult result = sendTokenRequest(request);
            logForResult(result);
            return result;
        } catch (Exception ex) {
            ClusterClientStatLogUtil.log(ex.getMessage());
            return new TokenResult(TokenResultStatus.FAIL);
        }
    }
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
        if (transportClient == null) {
            RecordLog.warn(
                "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
            return clientFail();
        }
        ClusterResponse response = transportClient.sendRequest(request);
        TokenResult result = new TokenResult(response.getStatus());
        if (response.getData() != null) {
            FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
            result.setRemaining(responseData.getRemainingCount())
                .setWaitInMs(responseData.getWaitInMs());
        }
        return result;
    }

在客户端启动的时候会创建与TokenServer之间的连接,当发送请求时如何客户端对象为空,就会记录请求失败。

DefaultTokenService

Token Server收到客户端的请求后,会调用FlowRequestProcessorprocessRequest,最终会调用DefaultTokenServicerequestToken方法。

@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
        TokenService tokenService = TokenServiceProvider.getService();
        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        boolean prioritized = request.getData().isPriority();
        TokenResult result = tokenService.requestToken(flowId, count, prioritized);
        return toResponse(result, request);
    }
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        // 和上面一样,先进行验证
        if (notValidRequest(ruleId, acquireCount)) {
            return badRequest();
        }
        // 从一个Map中进行查找 
        // private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>();
        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
        // 没有该规则
        if (rule == null) {
            return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
        }
        // 服务端进行检查,是否发送令牌token
        return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
    }

因为acquireClusterToken相对较长,故进行了拆分讲解。

第一步:

Long id = rule.getClusterConfig().getFlowId();
if (!allowProceed(id)) {
   return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
} // @1

@1代码处首先判断是否允许本次许可申请,这是因为TokenServe支持嵌入式,即支持在应用节点中嵌入一个TokenServer,为了保证许可申请的请求不对正常业务造成比较大的影响,故对申请许可这个动作进行了限流。

一旦触发了限流,将向客户端返回Too_Many_Request状态码,Sentinel支持按namespace进行限流,具体由GlobalRequestLimiter实现,该类的内部同样是基于滑动窗口进行收集,原理与FlowSlot相似,默认的限流TPS是3W。

static boolean allowProceed(long flowId) {
        String namespace = ClusterFlowRuleManager.getNamespace(flowId);
        return GlobalRequestLimiter.tryPass(namespace);
    }
public static boolean tryPass(String namespace) {
        if (namespace == null) {
            return false;
        }
        // private static final Map<String, RequestLimiter> GLOBAL_QPS_LIMITER_MAP = new ConcurrentHashMap<>();
        RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace);
        if (limiter == null) {
            return true;
        }
        return limiter.tryPass();
    }

canPass方法是计算当前的通过数+1后是否超过qpsAllowed。

public boolean tryPass() {
        if (canPass()) {
            add(1);
            return true;
        }
        return false;
}

public void add(int x) {
        data.currentWindow().value().add(x);
}

public boolean canPass() {
        return getQps() + 1 <= qpsAllowed;
}

public double getQps() {
        return getSum() / data.getIntervalInSecond();
}
private final LeapArray<LongAdder> data;

public long getSum() {
        data.currentWindow();
        long success = 0;

        List<LongAdder> list = data.values();
        for (LongAdder window : list) {
            success += window.sum();
        }
        return success;
}

从上面可以看出在集群模式下也是利用时间窗口进行统计的。

第二大步:

根据FlowId获取对用的指标采集器metric

private static final Map<Long, ClusterMetric> METRIC_MAP = new ConcurrentHashMap<>();  

ClusterMetric metric = ClusterMetricStatistics.getMetric(id);  
if (metric == null) {
    return new TokenResult(TokenResultStatus.FAIL);
}

该Metric具体的是一个ClusterMetricLeapArray,与之前的OccupiableBucketLeapArray类似,多了一个记录有关抢占数据的数组。

public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> {

    private final LongAdder[] occupyCounter;
    private boolean hasOccupied = false;

    public ClusterMetricLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        ClusterFlowEvent[] events = ClusterFlowEvent.values();
        this.occupyCounter = new LongAdder[events.length];
        for (ClusterFlowEvent event : events) {
            occupyCounter[event.ordinal()] = new LongAdder();
        }
    }

第三大步:

获取当前通过的Qps,设置的总许可数和剩余的许可数。

double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
double nextRemaining = globalThreshold - latestQps - acquireCount;

如果是FLOW_THRESHOLD_GLOBAL,即集群的许可数等于限流规则中配置的count值。
如果是FLOW_THRESHOLD_AVG_LOCAL,此时限流规则中配置的值只是单机的count值,还要乘以集群中客户端的数量。 上面的getExceedCount默认是1.0。

private static double calcGlobalThreshold(FlowRule rule) {
        double count = rule.getCount();
        switch (rule.getClusterConfig().getThresholdType()) {
            case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
                return count;
            case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
            default:
                int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
                return count * connectedCount;
        }
}

第四大步:

如果剩余的许可数大于等于0,更新当前的统计信息。

if (nextRemaining >= 0) {
     //增加通过数和通过的请求数
     metric.add(ClusterFlowEvent.PASS, acquireCount);
     metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
     if (prioritized) {
           // Add prioritized pass.
           // Pass (pre-occupy incoming buckets)
            metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
      }
      // Remaining count is cut down to a smaller integer.
      return new TokenResult(TokenResultStatus.OK)
                .setRemaining((int) nextRemaining)
                .setWaitInMs(0);
} 

第五大步:

如果剩余数小于0。

if (prioritized) {
       // Try to occupy incoming buckets.
       // Waiting due to flow shaping or for next bucket tick.
       // 获取当前等待的Qps(以1s为维度,当前等待的请求数量)
       double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
       // 如果当前等待的Qps低于可借用未来窗口的许可阈值时,可通过,但要设置等待时间
       if (occupyAvg <= 
        // 默认是1.0 , 后面是全局的通过阈值数
        ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
        // 计算等待的时间
        int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
                    // waitInMs > 0 indicates pre-occupy incoming buckets successfully.
                    if (waitInMs > 0) {
                        ClusterServerStatLogUtil.log("flow|waiting|" + id);
                        return new TokenResult(TokenResultStatus.SHOULD_WAIT)
                            .setRemaining(0)
                            .setWaitInMs(waitInMs);
           }
     // Or else occupy failed, should be blocked.
     }
}
// Blocked.
// 发生阻塞,当前请求不能通过,增加与阻塞相关指标的统计数。 
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
if (prioritized) {
      // Add prioritized block.
      metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
      ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}

     return blockedResult();
}

下面再讲解下如何计算等待时间的。

// event 为 pass
public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
        // 当前的通过数
        double latestQps = getAvg(ClusterFlowEvent.PASS);
        // 判断是否支持抢占
        if (!canOccupy(event, acquireCount, latestQps, threshold)) {
            return 0;
        }
        
        // 在抢占数组中添加本次的占用数
        /**
        *  public void addOccupyPass(int count) {
        occupyCounter[ClusterFlowEvent.PASS.ordinal()].add(count);
        occupyCounter[ClusterFlowEvent.PASS_REQUEST.ordinal()].add(1);
        this.hasOccupied = true;
         }
        **/
        metric.addOccupyPass(acquireCount);
        
        // 在普通的时间窗口增加等待数
        /**
        *  public void add(ClusterFlowEvent event, long count) {
        metric.currentWindow().value().add(event, count);
    }
        **/
        add(ClusterFlowEvent.WAITING, acquireCount);
        // 这里有些不懂,sampleCount默认值应该是10,
        // 这样的话返回的是一个时间窗口的大小
        return 1000 / metric.getSampleCount();
}

从上面可以看出,canOccupy是一个关键方法。

private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
        long headPass = metric.getFirstCountOfWindow(event);
        /**
        *  public long getOccupiedCount(ClusterFlowEvent event) {
        return occupyCounter[event.ordinal()].sum();
    }
        **/
        // 获得Pass事件下的已占用的数  
        long occupiedCount = metric.getOccupiedCount(event);
        // 已通过的请求数 + 本次需要的请求数 + 占用的请求数  - 第一个统计的时间窗口的数 
        // 如果小于等于阈值,即可以抢占
        return latestQps + (acquireCount + occupiedCount) - headPass <= threshold;
    }
public long getFirstCountOfWindow(ClusterFlowEvent event) {
        if (event == null) {
            return 0;
        }
        WindowWrap<ClusterMetricBucket> windowWrap = getValidHead();
        if (windowWrap == null) {
            return 0;
        }
        return windowWrap.value().get(event);
}
public WindowWrap<T> getValidHead() {
        return getValidHead(TimeUtil.currentTimeMillis());
}
WindowWrap<T> getValidHead(long timeMillis) {
        // Calculate index for expected head time.
        int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
        WindowWrap<T> wrap = array.get(idx);
        if (wrap == null || isWindowDeprecated(wrap)) {
            return null;
        }

        return wrap;
}

执行到这里,服务端已经判断出是否要发送令牌。

然后再将代码跳回到客户端请求发送的位置。

private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
        if (transportClient == null) {
            RecordLog.warn(
                "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
            return clientFail();
        }
        ClusterResponse response = transportClient.sendRequest(request);
        TokenResult result = new TokenResult(response.getStatus());
        if (response.getData() != null) {
            FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
            result.setRemaining(responseData.getRemainingCount())
                .setWaitInMs(responseData.getWaitInMs());
        }
        return result;
    }

从上面可以看出,会将返回的结果中的剩余数量,等待时间。

那如何处理得到的结果Result。

private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
                                                         DefaultNode node,
                                                         int acquireCount, boolean prioritized) {
        switch (result.getStatus()) {
            case TokenResultStatus.OK:   
                return true;
            case TokenResultStatus.SHOULD_WAIT:
                // Wait for next tick.
                try {
                    Thread.sleep(result.getWaitInMs());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return true;
            case TokenResultStatus.NO_RULE_EXISTS:
            case TokenResultStatus.BAD_REQUEST:
            case TokenResultStatus.FAIL:
            case TokenResultStatus.TOO_MANY_REQUEST:
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            case TokenResultStatus.BLOCKED:
            default:
                return false;
        }
    }
}

从上面可以看出如果状态是OK的话,返回true,允许通过。

如果是在优先级情况下,支持抢占,则根据返回的等待时间进行等待。

如果是其他的状态:则回退到单机模式进行判断。

默认情况下,返回false。

至此,关于关于集群限流就讲解完了,但是在关于计算等待时间的逻辑还是有些不清楚,欢迎大家交流。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352