Sentinel之Slots插槽源码分析热点参数限流(七)

一、引子

该篇是插槽分析的最后一篇。
何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:

  • 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
  • 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制

热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。

Sentinel 利用 LRU 策略,结合底层的滑动窗口机制来实现热点参数统计。LRU 策略可以统计单位时间内,最近最常访问的热点参数,而滑动窗口机制可以帮助统计每个参数的 QPS。

二、使用

使用热点限流功能,需要引入以下依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-parameter-flow-control</artifactId>
    <version>x.y.z</version>
</dependency>

然后为对应的资源配置热点参数限流规则,并在 entry的时候传入相应的参数,即可使热点参数限流生效。

注:若自行扩展并注册了自己实现的 SlotChainBuilder,并希望使用热点参数限流功能,则可以在 chain 里面合适的地方插入ParamFlowSlot

public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException
public static Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException

其中最后的一串 args 就是要传入的参数,有多个就按照次序依次传入。比如要传入两个参数 paramA 和 paramB,则可以:

// paramA in index 0, paramB in index 1.
SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);

三、热点参数规则

热点参数规则(ParamFlowRule)类似于流量控制规则(FlowRule):

属性 说明 默认值
resource 资源名,必填
count 限流阈值,必填
grade 限流模式 QPS 模式
paramIdx 热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置
paramFlowItemList 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型

可以通过 ParamFlowRuleManagerloadRules 方法更新热点参数规则,下面是一个示例:

ParamFlowRule rule = new ParamFlowRule(resourceName)
    .setParamIdx(0)
    .setCount(5);
// 针对 int 类型的参数 PARAM_B,单独设置限流 QPS 阈值为 10,而不是全局的阈值 5.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
    .setClassType(int.class.getName())
    .setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));

ParamFlowRuleManager.loadRules(Collections.singletonList(rule));

四、源码分析

1、ParamFlowSlot

    @Override
 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
        throws Throwable {

        //检查该资源是否存在限流资源
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            fireEntry(context, resourceWrapper, node, count, args);
            return;
        }

        //限流规则检测
        checkFlow(resourceWrapper, count, args);
        fireEntry(context, resourceWrapper, node, count, args);
    }

 void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args)
        throws BlockException {
        //若存在限流规则
        if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
           //获取限流规则
            List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
            if (rules == null) {
                return;
            }

            for (ParamFlowRule rule : rules) {
                // Initialize the parameter metrics.
                //初始化参数统计
                initHotParamMetricsFor(resourceWrapper, rule.getParamIdx());
                //若热点参数限流符合,则进行限流处理
                if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {

                    // Here we add the block count.
                    addBlockCount(resourceWrapper, count, args);

                    String message = "";
                    if (args.length > rule.getParamIdx()) {
                        Object value = args[rule.getParamIdx()];
                        message = String.valueOf(value);
                    }
                    throw new ParamFlowException(resourceWrapper.getName(), message);
                }
            }
        }
    }

通过checkFlow进行限流规则检测:

  1. 若该资源存在限流规则,并获取该资源的限流规则;否则跳过检测;
  2. 循环判断规则,并先初始化热点参数统计窗口,如下:
  void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ int index) {
        ParameterMetric metric;
        // Assume that the resource is valid.
        if ((metric = metricsMap.get(resourceWrapper)) == null) {
            synchronized (LOCK) {
                if ((metric = metricsMap.get(resourceWrapper)) == null) {
                    metric = new ParameterMetric();
                    metricsMap.put(resourceWrapper, metric);
                    RecordLog.info("[ParamFlowSlot] Creating parameter metric for: " + resourceWrapper.getName());
                }
            }
        }
        metric.initializeForIndex(index);
    }

通过ParameterMetric的initialiazeForIndex初始了一个时间窗口。

  1. 通过ParamFlowChecker的passCheck方法检测规则。

2、ParamFlowChecker

  static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
                             Object... args) {
        // 如果参数不存在直接返回
        if (args == null) {
            return true;
        }

        int paramIdx = rule.getParamIdx();
        //参数的个数小于规则的索引直接返回
        if (args.length <= paramIdx) {
            return true;
        }

        Object value = args[paramIdx];
        
        //规则调用
        return passLocalCheck(resourceWrapper, rule, count, value);
    }

  private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
        try {
            if (Collection.class.isAssignableFrom(value.getClass())) {
                for (Object param : ((Collection)value)) {
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            } else if (value.getClass().isArray()) {
                int length = Array.getLength(value);
                for (int i = 0; i < length; i++) {
                    Object param = Array.get(value, i);
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            } else {
                return passSingleValueCheck(resourceWrapper, rule, count, value);
            }
        } catch (Throwable e) {
            RecordLog.info("[ParamFlowChecker] Unexpected error", e);
        }

        return true;
    }

在passLocalCheck方法中:
1.依次对Collection类型,Array类型,非数组集合类型进行处理,进入到passSingleValueCheck中;


    static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
         //获取当前参数在该资源时间窗口内通过的数量
            double curCount = getHotParameters(resourceWrapper).getPassParamQps(rule.getParamIdx(), value);

            if (exclusionItems.contains(value)) {
                // Pass check for exclusion items.
                int itemQps = rule.getParsedHotItems().get(value);
                return curCount + count <= itemQps;
            } else if (curCount + count > rule.getCount()) {
                if ((curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0) {
                    return true;
                }
                return false;
            }
        }

        return true;
    }
  1. 获取rule的解析参数集合HotItems;
  2. 获取当前参数在该资源时间窗口内通过的数量;
  3. 如果exclusionItems(参数例外项)包含该值,判断curCount + count 大于itemQpsitemQps是该参数对应的count,若大于则说明超过限流阈值,则返回false,反之返回true;
  4. 如果exclusionItems不包含该值,并且curCount + count 大于rule.getCount()时,并判断(curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0是否满足,若满足则返回true,反之返回false。

3、ParameterMetric

   //保存了对应资源的时间窗口数据
   private Map<Integer, HotParameterLeapArray> rollingParameters =
        new ConcurrentHashMap<Integer, HotParameterLeapArray>();

    public Map<Integer, HotParameterLeapArray> getRollingParameters() {
        return rollingParameters;
    }

    public synchronized void clear() {
        rollingParameters.clear();
    }

    // 初始化一个时间窗口
    public void initializeForIndex(int index) {
        if (!rollingParameters.containsKey(index)) {
            synchronized (this) {
                // putIfAbsent
                if (rollingParameters.get(index) == null) {
                    rollingParameters.put(index, new HotParameterLeapArray(
                        1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL));
                }
            }
        }
    }

热点参数统计

//增加通过数
 public void addPass(int count, Object... args) {
        add(RollingParamEvent.REQUEST_PASSED, count, args);
    }
//增加阻塞数
public void addBlock(int count, Object... args) {
        add(RollingParamEvent.REQUEST_BLOCKED, count, args);
}

@SuppressWarnings("rawtypes")
private void add(RollingParamEvent event, int count, Object... args) {
        if (args == null) {
            return;
        }
        try {
            for (int index = 0; index < args.length; index++) {
                HotParameterLeapArray param = rollingParameters.get(index);
                if (param == null) {
                    continue;
                }

                Object arg = args[index];
                if (arg == null) {
                    continue;
                }
                if (Collection.class.isAssignableFrom(arg.getClass())) {
                    for (Object value : ((Collection)arg)) {
                        param.addValue(event, count, value);
                    }
                } else if (arg.getClass().isArray()) {
                    int length = Array.getLength(arg);
                    for (int i = 0; i < length; i++) {
                        Object value = Array.get(arg, i);
                        param.addValue(event, count, value);
                    }
                } else {
                    param.addValue(event, count, arg);
                }

            }
        } catch (Throwable e) {
            RecordLog.warn("[ParameterMetric] Param exception", e);
        }
    }
  1. 可以发现热点参数的统计也是基于滑动时间窗口统计,这个就不具体分析了,滑动时间窗口前面有讲解,见滑动时间窗口
    2.有点不同的是:在限流规则里指标的是通过LongAdder分段统计的,而热点参数的指标是通过AtomicInteger的addAndGet方法统计的。
 public ParamMapBucket add(RollingParamEvent event, int count, Object value) {
        data[event.ordinal()].putIfAbsent(value, new AtomicInteger());
        AtomicInteger counter = data[event.ordinal()].get(value);
        counter.addAndGet(count);
        return this;
    }

3.热点参数获取的实际上是统计范围内一个平均值。如下:

 public double getPassParamQps(int index, Object value) {
        try {
            HotParameterLeapArray parameter = rollingParameters.get(index);
            if (parameter == null || value == null) {
                return -1;
            }
            return parameter.getRollingAvg(RollingParamEvent.REQUEST_PASSED, value);
        } catch (Throwable e) {
            RecordLog.info(e.getMessage(), e);
        }

        return -1;
    }

public double getRollingAvg(RollingParamEvent event, Object value) {
        return ((double) getRollingSum(event, value)) / getIntervalInSec();
 }

public long getRollingSum(RollingParamEvent event, Object value) {
        currentWindow();

        long sum = 0;

        List<ParamMapBucket> buckets = this.values();
        for (ParamMapBucket b : buckets) {
            sum += b.get(event, value);
        }

        return sum;
    }

五、我的总结

  1. 本文介绍热点参数的概念、使用、及部分源码分析。
  2. 热点参数限流使用需要单独引用Jar包,并设置规则后才会启动限流效果。
  3. 目前热点参数限流只是支持QPS模式,且还支持额外参数项进行限流。
  4. 热点参数的参数限流统计也是基于滑动窗口统计的,内部使用了AtomicInteger的原子结构统计及ConcurrentLinkedHashMap结构作为数据存储。

以上内容,如有不当之处,请指正

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容