sentinel统计

系统在触发限流情况下,肯定是有数据在做支撑,通过一些数值配置等参考,最终决定是否限流,所以说必须要有数据支撑。

那么这些数据是什么呢?假设一个请求发起到完成,这段时间都能统计到什么信息呢?首先,能知道是一次请求,假设理解为一次pass,第二,该请求如果成功返回结果,那么成功也需要记录,一次success,然后,在开始到得到最终结果,这段期间耗时,也需要记录,rt时间。当然,如果此次请求发生异常了,记录一次error,又或者本次请求被限流了即阻塞了,任然需要记录一次block。 最后,在一次请求这么一个动作中,需要记录这么多的信息。只有以这些数据为依据,才能为框架提供数据参考,然后保护系统,保证了系统的可靠性和稳定性。

我们基本已经确定了,需要记录的信息,pass-通过请求,succes-请求成功,rt-一次请求的耗时,error-请求异常,block-请求限流。在sentinel中 ArrayMetric 就是来记录这些数据的入口,如果添加记录,查询记录都时通过该类得到。


image.png

在sentinel中,也将这些维度数据抽象为度量事件:


image.png

在记录这些记录这些维度数据时,他们都有一个共同属性,就是当前时间。在做限流的依据时,在一定时期内通过维度数据做判定的,如果维度数据长时间做参考,是很不准确的。做限流,是因为系统在特定的时间内,突然接受到了大流量,而保护系统的。所以在sentinel中,基本时间的间隔在1000ms,即1s。但是请求时,都有当前时间的,他们在1秒间隔中,是散列分布在1秒内,所以引入了滑动窗口的概念,即将这1s平均分割成多个窗口,窗口的时间间隙都是一样的。但是时间是用于在变大,所以他是滑动的。

分析LeapArray类

所以LeapArray 是控制这些华东窗口的创建和重置重要管理类。但是滑动窗口并不是一直随着时间不停的创建的,其实是通过重置窗口方式重新得到当前的窗口。首先了解一下构造器:


public LeapArray(int sampleCount, int intervalInMs) {
    this.windowLengthInMs = intervalInMs / sampleCount;
    this.intervalInMs = intervalInMs;
    this.sampleCount = sampleCount;
    this.array =new AtomicReferenceArray<>(sampleCount);
}

构造器中有很重要参数,1.时间间隙,指在一段时间内的可用统计到的数据,2.窗口数量 sampleCount, 这段时间间隔的需要拆分窗口的数量,3.windowLengthInMs 就是窗口的时间长度,单位是毫秒;其中array这个属性,是由原子引用数组类型,并且数量是sampleCount值。所以我们认为滑动窗口概念肯定是在操作array这个原子引用数组。

好的,现在假如由个请求过来,需要记录各种维度数据了,在这个时刻需要从LeapArray中拿出一个匹配当前时刻的窗口进行数据添加,那么在拿到窗口时,有可能不存在,有可能过期,那怎么处理呢?看一下获取方法:

public WindowWrapcurrentWindow(long timeMillis) {
    int idx = calculateTimeIdx(timeMillis); // 计算当前时间时处于那个窗口的位置
    long windowStart = calculateWindowStart(timeMillis); // 得到当前窗口开始时间
    while (true) { // 进入一个循环
      WindowWrap old =array.get(idx);
      if (old ==null) { // 当前窗口不存在,需要生成新的窗口,该窗口包含了时间的长度,窗口的开始时间,和一个空槽
            WindowWrap window =new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        if (array.compareAndSet(idx, null, window)) { // 因为array是原子的,可以保证线程安全,更新成功,就得到了这个窗口了
            return window;
        }else {// 如果没有更新成功,那线程先挂起一定的时间,从新获取
            Thread.yield();
        }
      }else if (windowStart == old.windowStart()) {// 如果开始时间与窗口的开始时间一致,那么直接得到这个窗口
        return old;
      }else if (windowStart > old.windowStart()) { // 窗口的时间已经过期了,那么并没有去重新生成放置到array中,而是重置这个old窗口
        if (updateLock.tryLock()) {
            try {
                return resetWindowTo(old, windowStart);
            }finally {
              updateLock.unlock();
            }
         }else {
            Thread.yield();
        }
    } else if (windowStart < old.windowStart()) {
        return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
    }
  }
} 

了解这段代码,必须清楚,当前时间是处于数组的什么位置,calculateTimeIdx方法,通过该方法得到了当前时间处于哪个槽的位置


long timeId = timeMillis /windowLengthInMs;
return (int)(timeId %array.length());

然后得到在当前时间所属的窗口的开始时间是什么, calculateWindowStart(timeMillis);


return timeMillis - timeMillis %windowLengthInMs; // 取一个整数,该时间点就是窗口的开始时间

画一个滑动窗口图

滑动窗口的分开统计,可以给我们一个设计思路。在针对数据累加的时候,为了线程安全等因素,往往会对某一个属性进行加锁限制,保证原子操作。但是在高频发的情况下,首先是不能因为统计而降低系统的吞吐量。所以可以将时间进行分片,将时间分配在不同的窗口中,进行累加计算(类似空间换时间)。然后在同一窗口中,任然面临着计算压力大的困境,如何有效解决呢?

在sentinel中,引入了并发计数组件Striped64。首先一个滑动窗口中承载数据统计的是MetricBucket类,了解一下构造方法:


public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }

其中,真正记录数据的类是LongAdder 类,他是继承了Striped64 类
他的属性有
Cell[] cells: 一个cell数组,他的长度是2的倍数,并且cell中数据计算,是利用cas方式,保证数据安全;
long base:基本数值,当线程没有竞争时,会进行该值计算,也是利用了cas方式。或者在重置cell时,作为最终计算;
int busy: 表示状态,作用时在cell创建或拓展中使用
增加数值方法:

public void add(long x) {
        Cell[] as;
        long b, v;
        HashCode hc;
        Cell a;
        int n;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            int h = (hc = threadHashCode.get()).code;
            if (as == null || (n = as.length) < 1 ||
                (a = as[(n - 1) & h]) == null ||
                !(uncontended = a.cas(v = a.value, v + x))) { retryUpdate(x, hc, uncontended); }
        }
    }

首先了解在cells为空下优先进行base数值计算,如果cells已经存在,那么尝试取出一个cell进行cas数据交换,并且该cell的位置时通过线程的hashCode得到的。如果最终cell为空,或者没有更新cell成功。最终需要执行最核心的方法 retryUpdate(...) 方法,入参有 需要增加x的数值,hc的hashCode值,和一个状态;


    final void retryUpdate(long x, HashCode hc, boolean wasUncontended) {
        int h = hc.code;
        boolean collide = false;                // True if last slot nonempty
        for (; ; ) {
            Cell[] as;
            Cell a;
            int n;
            long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (busy == 0) {            // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (busy == 0 && casBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs;
                                int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                busy = 0;
                            }
                            if (created) { break; }
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                } else if (!wasUncontended)       // CAS already known to fail
                {
                    wasUncontended = true;      // Continue after rehash
                } else if (a.cas(v = a.value, fn(v, x))) {
                    break;
                } else if (n >= NCPU || cells != as) {
                    collide = false;            // At max size or stale
                } else if (!collide) {
                    collide = true;
                } else if (busy == 0 && casBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i) { rs[i] = as[i]; }
                            cells = rs;
                        }
                    } finally {
                        busy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h ^= h << 13;                   // Rehash
                h ^= h >>> 17;
                h ^= h << 5;
            } else if (busy == 0 && cells == as && casBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    busy = 0;
                }
                if (init) { break; }
            } else if (casBase(v = base, fn(v, x))) {
                break;                          // Fall back on using base
            }
        }
        hc.code = h;                            // Record index for next time
    }

1.他时在一个死循环内,可以肯定的是,retryUpdate方法是一定会更新数值成功;

2.当前cells不为空,并且长度大于一;

a.通过hash,拿到的cell为空,就需要初始化cell,在初始化时,可以看到busy的竞争,初始化失败就需要重新执行,成功就break循环;

b.上次为wasUncontended=false(有争议),则改为无竞争状态,并且重新执行;

c.cell不为空,则cas是否成功,失败继续执行下面的if语句,成功break;

d.当前的n大于等于cpu数量,或者as不等于了cells对象,那么不进行碰撞状态设置;

e.在发现当前存在碰撞,那么得到busy状态,并且进行cells数组扩大1倍,这里有个思考的问题,为什么n值大于等于cpu数量时,认为不存在激烈碰撞呢?可以这么认为,取得cell时通过hash值得到的,而且hash值是随机,认为当前线程得到的cell概率是相同的。当前操作系统n个cpu,则可以确定此时,最多有n个线程在竞争cell。那么最理想状态下,cell数量大于等于cpu数量,就大概率存在,一个线程获取的cell与其他n-1的线程得到cell是不相同的,所以可以认为没有碰撞。碰撞更多可以理解为,多个线程同时在对同一个cell进行竞争;

f.重新rehash值,换个cell尝试一下;

3.当前cells为空,并且需要得到busy状态,初始化数量,该数量起始值为2.

4.最终尝试 通过base字段进行数值计算

获取LongAdder 累计的值,知道LongAdder 增加数值是通过cells和base合作完成的,所以最终想要得到sum值是,也只要通过base值与cells中的值进行累加,就是想要的统计值了。

总结:

各个维度的统计数值是作为限流的最基本参考依据,在当前服务器的请求服务的高频次,高并发下,往往需要保证统计功能的安全可靠,并且计算能力要求更高,尽可能降低影响服务器的性能。在sentinel框架中,解决思想基本是,使用轻量级锁,通过空间换取一定的时间,解决竞争资源问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 224,815评论 6 522
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,251评论 3 402
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 171,999评论 0 366
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 60,996评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 69,993评论 6 400
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,477评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,848评论 3 428
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,823评论 0 279
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,361评论 1 324
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,401评论 3 346
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,518评论 1 354
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,119评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,850评论 3 338
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,292评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,424评论 1 275
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,072评论 3 381
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,588评论 2 365