Sentinel源码分析----流控规则与FlowSlot

FlowSlot主要是用来进行流控规则的处理,直接看下代码

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        // 获取流控规则
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
       //通过资源名称来获取规则列表
        List<FlowRule> rules = flowRules.get(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                // 遍历规则进行处理
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    // 如果规则校验不通过,那么抛出FlowException异常
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

    boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) {
        // 交由FlowRuleChecker进行逻辑处理
        return FlowRuleChecker.passCheck(rule, context, node, count, prioritized);
    }

这里的flowRules是一个全量的规则列表,例如我在控制台配置了如下的规则:


image.png

那么flowRules中就有两个元素,key分别是test和hello,对应的值是一个集合,集合里只有一个元素,就是实际的规则实体FlowRule,具体值与我们配置相关,看下FlowRule中有哪些字段

public class FlowRule extends AbstractRule {

    public FlowRule() {
        super();
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }

    public FlowRule(String resourceName) {
        super();
        setResource(resourceName);
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    private double count;
    private int strategy = RuleConstant.STRATEGY_DIRECT;
    private String refResource;
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    private int warmUpPeriodSec = 10;

    private int maxQueueingTimeMs = 500;

    private boolean clusterMode;

    private ClusterFlowConfig clusterConfig;

    private TrafficShapingController controller;
}
  • limitApp:对应新增流控规则页面的来源应用
  • resource:对应新增流控规则页面的资源名
  • grade:对应新增流控规则页面的阈值类型
  • count:如果页面配置的是qps类型,字段则代表qps的值;如果配置的是线程数类型,字段则代表线程数
  • strategy:对应新增流控规则页面的流控模式
  • refResource:对应流控策略为关联情况下,出现的关联资源 或 对应流控策略为链路情况下,出现的入口资源
  • controlBehavior:对应新增流控规则页面的流控效果
  • warmUpPeriodSec:对应流控效果为Warm Up情况下,出现的预热时长
  • maxQueueingTimeMs:对应流控效果为排队等待情况下,出现的超时时间
  • clusterMode:对应新增流控规则页面的是否集群
  • ClusterFlowConfig:集群流控的相关配置
  • TrafficShapingController:流量整形的实现,不同流控效果有不同算法

FlowRule和页面配置的规则一一对应,通过控制台配置后可以将这些值推送到机器上生成对应的FlowRule

接下来看下FlowRuleChecker.passCheck对具体规则的处理

    static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {// 1
            return true;
        }

        if (rule.isClusterMode()) {//2
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }

        return passLocalCheck(rule, context, node, acquireCount, prioritized);//3
    }

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);//4
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount);//5
    }
  • 标记1:limitApp是页面上的来源应用,默认是default,表示代表所有的应用,这里如果为空则默认通过,因为代码中约定了default是代表所有应用,所以空值为非法值,这里想了一下为什么不把default或者空值当做代表所有应用的限流,可能是因为空值还包括规则字段丢失的情况,应该算作异常情况
  • 标记2:集群模式特殊处理,这里暂不考虑,后续分析
  • 标记3:本地限流逻辑实现
  • 标记4:根据不同情况选择不同Node(这里会涉及上篇文章的知识点)
  • 标记5:根据不同情况调用不同TrafficShapingController实现进行判断

节点选择

上篇文章中分析了Sentinel的各种Node的含义,为什么要设计那么多种类型呢?下面就会看到,对于不同的流控规则而言,需要去拿不同的Node来获取统计的数据,具体看代码(对于各种Node的知识点这里不再详细分析,具体看下上篇文章)

    static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {
        // The limit app should not be empty.
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();

        if (limitApp.equals(origin) && filterOrigin(origin)) {//1
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();// 2
            }

            return selectReferenceNode(rule, context, node);//3
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//4
            if (strategy == RuleConstant.STRATEGY_DIRECT) {//5
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);//6
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//7
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();//8
            }

            return selectReferenceNode(rule, context, node);//9
        }

        return null;
    }
  • 标记1:如果流控规则配置了来源应用且不是"default"或者"other"这种特殊值,那么这种时候该规则就只对配置的来源应用生效,例如:配置了A应用对test资源qps为10,那么就要先取到当前A应用的qps看看是否超过10
  • 标记2:如果是直接限流类型,那么也就是上面举的栗子,获取A应用的统计数据,即A应用对应的OriginNode进行判断
  • 标记:3/6/9:selectReferenceNode方法是对流控模式为关联或者链路的处理
  • 标记4:这种情况limitApp是"default",代表针对所有应用
  • 标记5:如果是直接限流类型,因为不是针对某个应用进行限流,所以就需要取当前资源的ClusterNode节点,因为ClusterNode表示所有应用对该资源的所有请求情况
  • 标记7:这个是"other"值的处理,假设当前请求来源不在当前规则的limitApp中,则进行下面的处理
  • 标记8:如果是直接限流类型,则返回OriginNode

关于7的应用,具体栗子,假设一个资源有如下规则,属性如下

ruleName limitApp
rule1 A
rule2 default
rule3 C
rule4 other

那么rule4只会处理来源应用非A、C、default的应用,例如D,E等统一使用rule4这个规则,这种情况实际应用场景是:假设有非常多的来源应用,但是又不能统一使用某个规则,因为可能某个来源应用的请求量很大,统一使用某个规则会导致请求量小的应用被影响;又不能每个来源应用配置一个规则,那这样会配到手抖,那么可以为ABC分别配置一个规则(假设ABC是请求量非常大的,和其他的差别很大),然后再配置一个other,这样其他请求量小的就可以使用这个规则了

流控模式

关联与链路这两种模式在wiki的介绍中,统一被称为基于调用关系的流量控制

流控模式:关联

当两个资源之间具有资源争抢或者依赖关系的时候,这两个资源便具有了关联。比如对数据库同一个字段的读操作和写操作存在争抢,读的速度过高会影响写得速度,写的速度过高会影响读的速度。如果放任读写操作争抢资源,则争抢本身带来的开销会降低整体的吞吐量。可使用关联限流来避免具有关联关系的资源之间过度的争抢,举例来说,read_db 和 write_db 这两个资源分别代表数据库读写,我们可以给 read_db 设置限流规则来达到写优先的目的:设置 FlowRule.strategy 为 RuleConstant.RELATE 同时设置 FlowRule.ref_identity 为 write_db。这样当写库操作过于频繁时,读数据的请求会被限流。

gayhub的wiki上描述如上,也就是read_db的请求量会被write_db影响,假设read_db配置的规则如下:


image.png
  • 这种情况下,假设write_db没有被执行,那么read_db最大能到多少的qps?

看下代码

    static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
        String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {
            return null;
        }

        if (strategy == RuleConstant.STRATEGY_RELATE) {// 1
            return ClusterBuilderSlot.getClusterNode(refResource);
        }

        //....链路模式的处理
        return null;
    }

看到标记1的地方,关联流控模式是使用关联资源即refResource去获取资源的ClusterNode,以write_db和read_db为例,当read_db请求的时候,是把write_db的ClusterNode与规则进行比较,那么上面的问题就会有答案了,假设write_db一直没有请求,那么read_db就没有限制,因为write_db的ClusterNode数据为空

流控模式:链路

                  machine-root
                    /       \
                   /         \
             Entrance1     Entrance2
                /             \
               /               \
      DefaultNode(nodeA)   DefaultNode(nodeA)

如上所示的Node分布情况,资源nodeA分别在两个上下文Entrance1和Entrance2下进行调用,假设在上下文Entrance1的调用量很大,而上下文Entrance2的调用量很小,我们想针对Entrance1上下文的nodeA调用进行限流,那么可以使用链路限流模式,配置如下:

image.png

那么在上下文Entrance2下对nodeA的调用就没有影响,看下代码

    static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
        String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {
            return null;
        }

        //....关联模式的处理

        if (strategy == RuleConstant.STRATEGY_CHAIN) {//2
            if (!refResource.equals(context.getName())) {
                return null;
            }
            return node;
        }
        return null;
    }

发现当前上下文(context.getName())如果和配置(refResource)的不一样,则返回null,外部如果返回的Node为null,则直接返回true了,那么Entrance2在这种情况下就直接通过了

流控效果

当节点选择完毕后,调用rule.getRater().canPass(selectedNode, acquireCount)开始执行判断,getRater()返回的是TrafficShapingController的实现类,根据不同流控效果有不同的实现

    //com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil#generateRater
    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        return new DefaultController(rule.getCount(), rule.getGrade());
    }

快速失败

快速失败这种情况,使用的是DefaultController,也是最简单的一个

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            return false;
        }
        return true;
    }
    private int avgUsedTokens(Node node) {
        if (node == null) {
            return -1;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps();
    }

获取当前token数,和当前请求的数量相加,看看是否大于规则配置的值

排队等待

当页面流控效果选择排队等待的时候,会出现超时时间的选项,该效果是让请求匀速的通过,可用于消息队列在消费的时候对流量的控制,对应的是漏桶算法,算法实现的代码是RateLimiterController

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        if (acquireCount <= 0) {
            return true;
        }
        if (count <= 0) {
            return false;
        }

        long currentTime = TimeUtil.currentTimeMillis();

        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);//1

        long expectedTime = costTime + latestPassedTime.get();//2

        if (expectedTime <= currentTime) {//3
            latestPassedTime.set(currentTime);//4
            return true;
        } else {//5
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();//6
            if (waitTime > maxQueueingTimeMs) {//7
                return false;
            } else {//8
                long oldTime = latestPassedTime.addAndGet(costTime);//9
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();//10
                    if (waitTime > maxQueueingTimeMs) {//11
                        latestPassedTime.addAndGet(-costTime);//12
                        return false;
                    }
                    if (waitTime > 0) {//13
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

这里将上面几个时间用变量表示

  1. 下一次能获取到token通过的时间为t1
  2. 每次请求花费时间为t2
  3. 上一次获取到token通过的时间为t3

代码每个标记意义如下:

  • 1:即当前请求token数(默认是1)/qps1000=1/qps1000=一个请求需要花费多少时间,假设设置的qps为10,即一秒允许有10个请求通过,那么每个请求的时间就是1/10*1000=100毫秒
  • 2:t3+t2=t1
  • 3:如果当前时间已经在t1后面了,那么请求可以被通过
  • 4:通过的时候需要重设一下t3
  • 5:代表当前请求到来的时候,还没到达能够t1
  • 6:t2+t3-当前时间=代表离t1还差多少时间
  • 7:如果请求很多,每个都需要进行排队,那么会导致越后面的请求等待的时候会更久,那么当时间超过设置的最大间隔,则返回false直接失败
  • 8:这里表示未到达最大的间隔
  • 9:更新t3并返回最新值(这个的变量命名感觉有点问题,addAndGet返回的是最新值)
  • 10~11:在7已经判断过一次了,这里又判断一次的原因是因为可能有多个线程并发执行的时候,在7的时候还未超过最大的间隔时间,而经过latestPassedTime.addAndGet的处理之后,可能有某些线程已经超过了这个时间,所以这里又判断了一遍
  • 12:和7不一样,7只是进行运算,这里是先更新了t3了,所以需要减回去
  • 13:得到等待时间后,使用Sleep是线程睡眠一定时间

Warm Up

还有一种流控效果是Warm Up,该算法类似于令牌桶算法,其代码与Guava的RateLimiter原理类似,限于个人能力,没能看懂其中原理。。。。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容