Sentinel滑动窗口介绍

系列

开篇

  • sentinel 处理流程是基于slot链(ProcessorSlotChain)来完成的,如限流熔断等,其中重要的一个slot就是StatisticSlot,它是做各种数据统计的,而限流熔断的数据判断来源就是StatisticSlot。
  • StatisticSlot的各种数据统计都是基于滑动窗口来完成的,因此本文就重点分析StatisticSlot的滑动窗口统计机制。
  • StatisticSlot的滑动窗口需要了解统计指标的数据结构、滑动窗口的窗口定位,指标保存等概念。


StatisticNode

public class StatisticNode implements Node {
    // 对每秒指标统计
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);
    // 每分钟指标统计
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    private LongAdder curThreadNum = new LongAdder();
    private long lastFetchTime = -1;


    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }
}
  • 采集指标的统计节点,负责统计相关的采集指标。
  • StatisticNode包含rollingCounterInSecond和rollingCounterInMinute。
  • rollingCounterInSecond是对每秒指标的统计。
  • rollingCounterInMinute是对每分钟指标的统计。
  • rollingCounterInSecond和rollingCounterInMinute是ArrayMetric,负责保存统计指标。


统计指标

  • 统计指标使用ArrayMetric进行承载。
  • ArrayMetric内部是滑动窗口LeapArray对象。
  • LeapArray的每个元素为WindowWrap。
  • WindowWrap内部包含MetricBucket。


ArrayMetric

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
}
  • ArrayMetric作为保存指标的数组,通过滑动窗口LeapArray保存MetricBucket。
  • MetricBucket代表统计指标,LeapArray代表滑动窗口,滑动窗口的每个窗口是MetricBucket对象。


LeapArray

public class BucketLeapArray extends LeapArray<MetricBucket> {
    public BucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
    }
}

public abstract class LeapArray<T> {
    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    private double intervalInSecond;
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    private final ReentrantLock updateLock = new ReentrantLock();

    public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
}
  • LeapArray作为滑动窗口,BucketLeapArray作为其一种具体的实现。
  • LeapArray通过AtomicReferenceArray<WindowWrap<T>> array来实现滑动窗口。
  • 滑动窗口的统计指标MetricBucket通过WindowWrap进行包装。


WindowWrap

public class WindowWrap<T> {

    private final long windowLengthInMs; // 时间窗口的长度
    private long windowStart; // 时间窗口开始时间
    private T value; // MetricBucket对象,保存各个指标数据

    public WindowWrap(long windowLengthInMs, long windowStart, T value) {
        this.windowLengthInMs = windowLengthInMs;
        this.windowStart = windowStart;
        this.value = value;
    }
}
  • WindowWrap作为滑动窗口的每个元素的承载,内部保存MetricBucket。


MetricBucket

public class MetricBucket {

    private final LongAdder[] counters;
    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }
}

public enum MetricEvent {
    PASS, // 正常通过
    BLOCK, // 阻塞
    EXCEPTION, // 异常
    SUCCESS, // 成功
    RT, // RT统计
    OCCUPIED_PASS // 抢占通过
}
  • MetricBucket内部保存各个统计指标MetricEvent的LongAdder数组。
  • MetricEvent的枚举值代表各个采集指标。


滑动窗口定位

public abstract class LeapArray<T> {

    protected int windowLengthInMs; // 时间窗口的长度
    protected int sampleCount; // 时间窗口的个数
    protected int intervalInMs;
    private double intervalInSecond;
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }

    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 根据当前时间和时间窗口的长度进行计算获取窗口下标
        int idx = calculateTimeIdx(timeMillis);
        // 获取指定下标的时间窗口的开始时间
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                // 1.为空表示当前时间窗口为初始化过,创建WindowWrap并cas设置到array中
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                // 2.获取的时间窗口正好对应当前时间,直接返回
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                // 3.获取的时间窗口为老的,进行窗口reset操作复用
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // 4.时间回拨了,正常情况下不会走到这里
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());
    }

    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }
}


public class BucketLeapArray extends LeapArray<MetricBucket> {

    public BucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
    }

    @Override
    public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // 重置窗口的开始时间和对应的统计值
        w.resetTo(startTime);
        w.value().reset();
        return w;
    }
}
  • 1.为空表示当前时间窗口为初始化过,创建WindowWrap并cas设置到array中
  • 2.获取的时间窗口正好对应当前时间,直接返回
  • 3.获取的时间窗口为老的,进行窗口reset操作复用。reset操作负责重置时间窗口的开始时间和窗口统计值。
  • 4.时间回拨了正常情况下不会走到这里


指标保存

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }
}

public class MetricBucket {
    private final LongAdder[] counters;

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }
}
  • currentWindow返回当前时间对应的滑动窗口。
  • addPass通过add指定类型的MetricEvent指标到LongAdder当中。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容