系统在触发限流情况下,肯定是有数据在做支撑,通过一些数值配置等参考,最终决定是否限流,所以说必须要有数据支撑。
那么这些数据是什么呢?假设一个请求发起到完成,这段时间都能统计到什么信息呢?首先,能知道是一次请求,假设理解为一次pass,第二,该请求如果成功返回结果,那么成功也需要记录,一次success,然后,在开始到得到最终结果,这段期间耗时,也需要记录,rt时间。当然,如果此次请求发生异常了,记录一次error,又或者本次请求被限流了即阻塞了,任然需要记录一次block。 最后,在一次请求这么一个动作中,需要记录这么多的信息。只有以这些数据为依据,才能为框架提供数据参考,然后保护系统,保证了系统的可靠性和稳定性。
我们基本已经确定了,需要记录的信息,pass-通过请求,succes-请求成功,rt-一次请求的耗时,error-请求异常,block-请求限流。在sentinel中 ArrayMetric 就是来记录这些数据的入口,如果添加记录,查询记录都时通过该类得到。
在sentinel中,也将这些维度数据抽象为度量事件:
在记录这些记录这些维度数据时,他们都有一个共同属性,就是当前时间。在做限流的依据时,在一定时期内通过维度数据做判定的,如果维度数据长时间做参考,是很不准确的。做限流,是因为系统在特定的时间内,突然接受到了大流量,而保护系统的。所以在sentinel中,基本时间的间隔在1000ms,即1s。但是请求时,都有当前时间的,他们在1秒间隔中,是散列分布在1秒内,所以引入了滑动窗口的概念,即将这1s平均分割成多个窗口,窗口的时间间隙都是一样的。但是时间是用于在变大,所以他是滑动的。
分析LeapArray类
所以LeapArray 是控制这些华东窗口的创建和重置重要管理类。但是滑动窗口并不是一直随着时间不停的创建的,其实是通过重置窗口方式重新得到当前的窗口。首先了解一下构造器:
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array =new AtomicReferenceArray<>(sampleCount);
}
构造器中有很重要参数,1.时间间隙,指在一段时间内的可用统计到的数据,2.窗口数量 sampleCount, 这段时间间隔的需要拆分窗口的数量,3.windowLengthInMs 就是窗口的时间长度,单位是毫秒;其中array这个属性,是由原子引用数组类型,并且数量是sampleCount值。所以我们认为滑动窗口概念肯定是在操作array这个原子引用数组。
好的,现在假如由个请求过来,需要记录各种维度数据了,在这个时刻需要从LeapArray中拿出一个匹配当前时刻的窗口进行数据添加,那么在拿到窗口时,有可能不存在,有可能过期,那怎么处理呢?看一下获取方法:
public WindowWrapcurrentWindow(long timeMillis) {
int idx = calculateTimeIdx(timeMillis); // 计算当前时间时处于那个窗口的位置
long windowStart = calculateWindowStart(timeMillis); // 得到当前窗口开始时间
while (true) { // 进入一个循环
WindowWrap old =array.get(idx);
if (old ==null) { // 当前窗口不存在,需要生成新的窗口,该窗口包含了时间的长度,窗口的开始时间,和一个空槽
WindowWrap window =new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) { // 因为array是原子的,可以保证线程安全,更新成功,就得到了这个窗口了
return window;
}else {// 如果没有更新成功,那线程先挂起一定的时间,从新获取
Thread.yield();
}
}else if (windowStart == old.windowStart()) {// 如果开始时间与窗口的开始时间一致,那么直接得到这个窗口
return old;
}else if (windowStart > old.windowStart()) { // 窗口的时间已经过期了,那么并没有去重新生成放置到array中,而是重置这个old窗口
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
}finally {
updateLock.unlock();
}
}else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
了解这段代码,必须清楚,当前时间是处于数组的什么位置,calculateTimeIdx方法,通过该方法得到了当前时间处于哪个槽的位置
long timeId = timeMillis /windowLengthInMs;
return (int)(timeId %array.length());
然后得到在当前时间所属的窗口的开始时间是什么, calculateWindowStart(timeMillis);
return timeMillis - timeMillis %windowLengthInMs; // 取一个整数,该时间点就是窗口的开始时间
画一个滑动窗口图
滑动窗口的分开统计,可以给我们一个设计思路。在针对数据累加的时候,为了线程安全等因素,往往会对某一个属性进行加锁限制,保证原子操作。但是在高频发的情况下,首先是不能因为统计而降低系统的吞吐量。所以可以将时间进行分片,将时间分配在不同的窗口中,进行累加计算(类似空间换时间)。然后在同一窗口中,任然面临着计算压力大的困境,如何有效解决呢?
在sentinel中,引入了并发计数组件Striped64。首先一个滑动窗口中承载数据统计的是MetricBucket类,了解一下构造方法:
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
其中,真正记录数据的类是LongAdder 类,他是继承了Striped64 类
他的属性有
Cell[] cells: 一个cell数组,他的长度是2的倍数,并且cell中数据计算,是利用cas方式,保证数据安全;
long base:基本数值,当线程没有竞争时,会进行该值计算,也是利用了cas方式。或者在重置cell时,作为最终计算;
int busy: 表示状态,作用时在cell创建或拓展中使用
增加数值方法:
public void add(long x) {
Cell[] as;
long b, v;
HashCode hc;
Cell a;
int n;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
int h = (hc = threadHashCode.get()).code;
if (as == null || (n = as.length) < 1 ||
(a = as[(n - 1) & h]) == null ||
!(uncontended = a.cas(v = a.value, v + x))) { retryUpdate(x, hc, uncontended); }
}
}
首先了解在cells为空下优先进行base数值计算,如果cells已经存在,那么尝试取出一个cell进行cas数据交换,并且该cell的位置时通过线程的hashCode得到的。如果最终cell为空,或者没有更新cell成功。最终需要执行最核心的方法 retryUpdate(...) 方法,入参有 需要增加x的数值,hc的hashCode值,和一个状态;
final void retryUpdate(long x, HashCode hc, boolean wasUncontended) {
int h = hc.code;
boolean collide = false; // True if last slot nonempty
for (; ; ) {
Cell[] as;
Cell a;
int n;
long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (busy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (busy == 0 && casBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs;
int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
busy = 0;
}
if (created) { break; }
continue; // Slot is now non-empty
}
}
collide = false;
} else if (!wasUncontended) // CAS already known to fail
{
wasUncontended = true; // Continue after rehash
} else if (a.cas(v = a.value, fn(v, x))) {
break;
} else if (n >= NCPU || cells != as) {
collide = false; // At max size or stale
} else if (!collide) {
collide = true;
} else if (busy == 0 && casBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i) { rs[i] = as[i]; }
cells = rs;
}
} finally {
busy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h ^= h << 13; // Rehash
h ^= h >>> 17;
h ^= h << 5;
} else if (busy == 0 && cells == as && casBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
busy = 0;
}
if (init) { break; }
} else if (casBase(v = base, fn(v, x))) {
break; // Fall back on using base
}
}
hc.code = h; // Record index for next time
}
1.他时在一个死循环内,可以肯定的是,retryUpdate方法是一定会更新数值成功;
2.当前cells不为空,并且长度大于一;
a.通过hash,拿到的cell为空,就需要初始化cell,在初始化时,可以看到busy的竞争,初始化失败就需要重新执行,成功就break循环;
b.上次为wasUncontended=false(有争议),则改为无竞争状态,并且重新执行;
c.cell不为空,则cas是否成功,失败继续执行下面的if语句,成功break;
d.当前的n大于等于cpu数量,或者as不等于了cells对象,那么不进行碰撞状态设置;
e.在发现当前存在碰撞,那么得到busy状态,并且进行cells数组扩大1倍,这里有个思考的问题,为什么n值大于等于cpu数量时,认为不存在激烈碰撞呢?可以这么认为,取得cell时通过hash值得到的,而且hash值是随机,认为当前线程得到的cell概率是相同的。当前操作系统n个cpu,则可以确定此时,最多有n个线程在竞争cell。那么最理想状态下,cell数量大于等于cpu数量,就大概率存在,一个线程获取的cell与其他n-1的线程得到cell是不相同的,所以可以认为没有碰撞。碰撞更多可以理解为,多个线程同时在对同一个cell进行竞争;
f.重新rehash值,换个cell尝试一下;
3.当前cells为空,并且需要得到busy状态,初始化数量,该数量起始值为2.
4.最终尝试 通过base字段进行数值计算
获取LongAdder 累计的值,知道LongAdder 增加数值是通过cells和base合作完成的,所以最终想要得到sum值是,也只要通过base值与cells中的值进行累加,就是想要的统计值了。
总结:
各个维度的统计数值是作为限流的最基本参考依据,在当前服务器的请求服务的高频次,高并发下,往往需要保证统计功能的安全可靠,并且计算能力要求更高,尽可能降低影响服务器的性能。在sentinel框架中,解决思想基本是,使用轻量级锁,通过空间换取一定的时间,解决竞争资源问题。