Sentinel源码解析三(滑动窗口流量统计)

前言

Sentinel的核心功能之一是流量统计,例如我们常用的指标QPS,当前线程数等。上一篇文章中我们已经大致提到了提供数据统计功能的Slot(StatisticSlot)StatisticSlotSentinel的整个体系中扮演了一个非常重要的角色,后续的一系列操作(限流,熔断)等都依赖于StatisticSlot所统计出的数据。

本文所要讨论的重点就是StatisticSlot是如何做的流量统计?

其实在之前介绍常用限流算法常用限流算法的时候已经有提到过一个算法滑动窗口限流,该算法的滑动窗口原理其实跟Sentinel所提供的流量统计原理是一样的,都是基于时间窗口的滑动统计

回到StatisticSlot


public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,

 boolean prioritized, Object... args) throws Throwable {

...

// 当前请求线程数加一

node.increaseThreadNum();

// 新增请求数

node.addPassRequest(count);

...

}

可以看到StatisticSlot主要统计了两种类型的数据

  1. 线程数

  2. 请求数(QPS)

对于线程数的统计比较简单,通过内部维护一个LongAdder来进行当前线程数的统计,每进入一个请求加1,每释放一个请求减1,从而得到当前的线程数

对于请求数QPS的统计则相对比较复杂,其中有用到滑动窗口原理(也是本文的重点),下面根据源码来深入的分析

DefaultNode和StatisticNode


public void addPassRequest(int count) {

  // 调用父类(StatisticNode)来进行统计

 super.addPassRequest(count);

  // 根据clusterNode 汇总统计(背后也是调用父类StatisticNode)

 this.clusterNode.addPassRequest(count);

}

最终都是调用了父类StatisticNodeaddPassRequest方法


/**

* 按秒统计,分成两个窗口,每个窗口500ms,用来统计QPS

 */

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,

 IntervalProperty.INTERVAL);

/**

* 按分钟统计,分成60个窗口,每个窗口 1000ms

 */

private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

public void addPassRequest(int count) {

 rollingCounterInSecond.addPass(count);

 rollingCounterInMinute.addPass(count);

}

代码比较简单,可以知道内部是调用了ArrayMetricaddPass方法来统计的,并且统计了两种不同时间维度的数据(秒级和分钟级)

ArrayMetric


private final LeapArray<MetricBucket> data;

public ArrayMetric(int sampleCount, int intervalInMs) {

 this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

}

public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {

 if (enableOccupy) {

 this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

 } else {

 this.data = new BucketLeapArray(sampleCount, intervalInMs);

 }

}

public void addPass(int count) {

  // 1\. 获取当前窗口

 WindowWrap<MetricBucket> wrap = data.currentWindow();

  // 2\. 当前窗口加1

 wrap.value().addPass(count);

}

ArrayMetric其实也是一个包装类,内部通过实例化LeapArray的对应实现类,来实现具体的统计逻辑,LeapArray是一个抽象类,OccupiableBucketLeapArrayBucketLeapArray都是其具体的实现类

OccupiableBucketLeapArray在1.5版本之后才被引入,主要是为了解决一些高优先级的请求在限流触发的时候也能通过(通过占用未来时间窗口的名额来实现) 也是默认使用的LeapArray实现类

而统计的逻辑也比较清楚,分成了两步:

  1. 定位到当前窗口

  2. 获取到当前窗口WindowWrapMetricBucket并执行addPass逻辑

这里我们先看下第二步中的MetricBucket类,看看它做了哪些事情

MetricBucket


/**

 * 存放当前窗口各种类型的统计值(类型包括 PASS BLOCK EXCEPTION 等)

 */

private final LongAdder[] counters;

public MetricBucket() {

 MetricEvent[] events = MetricEvent.values();

 this.counters = new LongAdder[events.length];

 for (MetricEvent event : events) {

 counters[event.ordinal()] = new LongAdder();

 }

 initMinRt();

}

// 统计pass数

public void addPass(int n) {

 add(MetricEvent.PASS, n);

}

// 统计可占用的pass数

public void addOccupiedPass(int n) {

 add(MetricEvent.OCCUPIED_PASS, n);

}

// 统计异常数

public void addException(int n) {

 add(MetricEvent.EXCEPTION, n);

}

// 统计block数

public void addBlock(int n) {

 add(MetricEvent.BLOCK, n);

}

....

MetricBucket通过定义了一个LongAdder数组来存储不同类型的流量统计值,具体的类型则都定义在MetricEvent枚举中。

执行addPass方法对应LongAdder数组索引下表为0的值递增

下面再来看下data.currentWindow()的内部逻辑

OccupiableBucketLeapArray

OccupiableBucketLeapArray继承了抽象类LeapArray,核心逻辑也是在LeapArray


/**

* 时间窗口大小  单位ms

 */

protected int windowLengthInMs;

/**

* 切分的窗口数

 */

protected int sampleCount;

/**

 * 统计的时间间隔 intervalInMs = windowLengthInMs * sampleCount

 */ 

protected int intervalInMs;

/**

 * 窗口数组 数组大小 = sampleCount

 */

protected final AtomicReferenceArray<WindowWrap<T>> array;

/**

 * update lock 更新窗口时需要上锁

 */

private final ReentrantLock updateLock = new ReentrantLock();

/**

 * @param sampleCount 需要划分的窗口数

 * @param intervalInMs 间隔的统计时间

 */

public LeapArray(int sampleCount, int intervalInMs) {

 this.windowLengthInMs = intervalInMs / sampleCount;

 this.intervalInMs = intervalInMs;

 this.sampleCount = sampleCount;

 this.array = new AtomicReferenceArray<>(sampleCount);

}

/**

* 获取当前窗口

 */

public WindowWrap<T> currentWindow() {

 return currentWindow(TimeUtil.currentTimeMillis());

}

以上需要着重理解的是几个参数的含义:

  1. sampleCount 定义的窗口的数

  2. intervalInMs 统计的时间间隔

  3. windowLengthInMs 每个窗口的时间大小 = intervalInMs / sampleCount

sampleCount 比较好理解,就是需要定义几个窗口(默认秒级统计维度的话是两个窗口),intervalInMs 指的就是我们需要统计的时间间隔,例如我们统计QPS的话那就是1000ms,windowLengthInMs 指的每个窗口的大小,是由intervalInMs除以sampleCount得来

类似下图

image.png

理解了上诉几个参数的含义后,我们直接进入到LeapArraycurrentWindow(long time)方法中去看看具体的实现


public WindowWrap<T> currentWindow(long timeMillis) {

 if (timeMillis < 0) {

 return null;

 }

  // 根据当前时间戳计算当前所属的窗口数组索引下标

 int idx = calculateTimeIdx(timeMillis);

  // 计算当前窗口的开始时间戳

 long windowStart = calculateWindowStart(timeMillis);

 /*

 * 从窗口数组中获取当前窗口项,分为三种情况

 *

 * (1) 当前窗口为空还未创建,则初始化一个

 * (2) 当前窗口的开始时间和上面计算出的窗口开始时间一致,表明当前窗口还未过期,直接返回当前窗口

 * (3) 当前窗口的开始时间  小于  上面计算出的窗口开始时间,表明当前窗口已过期,需要替换当前窗口

 */

 while (true) {

 WindowWrap<T> old = array.get(idx);

 if (old == null) {

 /*

 * 第一种情况,新建一个窗口项

 */

 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

 if (array.compareAndSet(idx, null, window)) {

  // Successfully updated, return the created bucket.

 return window;

 } else {

  // Contention failed, the thread will yield its time slice to wait for bucket available.

 Thread.yield();

 }

 } else if (windowStart == old.windowStart()) {

 /*

 * 第二种情况 直接返回

 */

 return old;

 } else if (windowStart > old.windowStart()) {

 /*

 * 第三种情况 替换窗口

 */

 if (updateLock.tryLock()) {

 try {

  // Successfully get the update lock, now we reset the bucket.

 return resetWindowTo(old, windowStart);

 } finally {

 updateLock.unlock();

 }

 } else {

  // Contention failed, the thread will yield its time slice to wait for bucket available.

 Thread.yield();

 }

 } else if (windowStart < old.windowStart()) {

  // 第四种情况,讲道理不会走到这里

 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

 }

 }

}

/**

* 根据当前时间戳计算当前所属的窗口数组索引下标

 */

private int calculateTimeIdx(/*@Valid*/ long timeMillis) {

 long timeId = timeMillis / windowLengthInMs;

 return (int)(timeId % array.length());

}

/**

* 计算当前窗口的开始时间戳

 */

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {

 return timeMillis - timeMillis % windowLengthInMs;

}

上面的方法就是整个滑动窗口逻辑的核心代码,注释其实也写的比较清晰了,简单概括下可以分为以下几步:

  1. 根据当前时间戳 和 窗口数组大小 获取到当前的窗口数组索引下标idx,如果窗口数是2,那其实idx只有两种值(0 或 1)

  2. 根据当前时间戳(windowStart) 计算得到当前窗口的开始时间戳值。通过calculateWindowStart计算来得到,这个方法还蛮有意思的,通过当前时间戳和窗口时间大小取余来得到 与当前窗口开始时间的 偏移量。比我用定时任务实现高级多了 ... 😆 可以去对比一下我之前文章中的蠢实现 滑动窗口算法定时任务实现

  3. 然后就是根据上面得到的两个值 来获取当前时间窗口,这里其实又分为三种情况

  • 当前窗口为空还未创建,则初始化一个

  • 当前窗口的开始时间和上面计算出的窗口开始时间(windowStart)一致,表明当前窗口还未过期,直接返回当前窗口

  • 当前窗口的开始时间 小于 上面计算出的窗口(windowStart)开始时间,表明当前窗口已过期,需要替换当前窗口

总结

总的来说,currentWindow方法的实现还是非常巧妙的,因为我在看Sentinel的源码前也写过一篇限流算法的文章,恰好其中也实现过一个滑动窗口限流算法,不过相比于Sentinel的实现,我用了定时任务去做窗口的切换更新,显然性能上更差,实现的也不优雅,大家也可以去对比一下。常用限流算法

Sentinel系列

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容