引子
在上篇文章中我们介绍了sentinel中的滑动窗口算法,发现限流的准确度依赖于划分的子窗口数量。而在很多情况下,我们的限流更多的是需要限制到参数级别,比如我们需要限制每个用户调用接口的频次,这种就需要精确到接口参数级别。而如果用滑动窗口来实现,就需要统计每个窗口内不同参数对应的请求数量,这样过于复杂,对于这种热点参数的限流,sentinel根据限流效果的不同,分别使用了漏桶算法和令牌桶算法来进行实现。
漏桶算法原理
主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。看下漏桶算法的原理图
请求先进入到漏桶里,漏桶以一定的速度出水,当水请求过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
sentinel中的漏桶算法实现
@Spi(order = -3000)
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode>{
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
//获取热点参数配置
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
//初始化热点参数
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
//passCheck 校验请求是否通过
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
}
public final class ParamFlowChecker{
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
if (args == null) {
return true;
}
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
// Get parameter value.
Object value = args[paramIdx];
// Assign value with the result of paramFlowKey method
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
// If value is null, then pass
if (value == null) {
return true;
}
//集群参数流控
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
//单机参数流控
return passLocalCheck(resourceWrapper, rule, count, value);
}
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
//参数类型如果为Collection,循环校验每个参数
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
}
//参数类型如果为Collection,循环校验每个参数
else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
}
//单个元素校验
else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
重点来了,passThrottleLocalCheck方法就是漏桶算法的具体实现,主要原理就是:根据rule配置的每多少秒可以通过多少请求来计算出一个请求需要多少毫秒costTime,取出上次请求通过的时间+costTime,就能得到我们期望此次请求到来的时间(expectedTime),如果请求到达时间大于期望时间,则放行;如果小于期望时间,说明流量过于密集,需要限制其速率,等待一段时间后(expectedTime-now)再放行。
/**
* 匀速排队限流
* @param resourceWrapper
* @param rule
* @param acquireCount
* @param value
* @return
*/
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
if (timeRecorderMap == null) {
return true;
}
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
//根据rule配置的每多少秒可以通过多少请求来计算出一个请求需要多少毫秒
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
return true;
}
//AtomicLong timeRecorder = timeRecorderMap.get(value);
//上次请求通过的时间
long lastPassTime = timeRecorder.get();
//期望的请求到达时间
long expectedTime = lastPassTime + costTime;
/**
* 实际请求到达时间如果大于期望的请求到达时间 并且 多出来的这部分时间大于设置的等待时间,则限流
* 否则请求
*/
if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
long waitTime = expectedTime - currentTime;
/**
* 等待时间大于0,说明请求在期望时间之前到达,流量太过密集,等待之后放行;小于0,说明请求在期望时间之后到达,直接放行
*/
if (waitTime > 0) {
lastPastTimeRef.set(expectedTime);
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
return true;
} else {
Thread.yield();
}
} else {
return false;
}
}
}
}
令牌桶算法原理
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
sentinel-令牌桶算法实现
public final class ParamFlowChecker {
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
//令牌计数器
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
//时间计数器
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// Calculate max token count (threshold)
//针对参数例外项单独限流
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
//最大令牌数 = 设置的阈值 + 额外允许的突发流量数
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
//获取上一次令牌更新时间
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
//表示系统启动后的第一次请求,直接返回校验通过
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
//计算在时间窗口内有多少次请求可以通过
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
// Calculate the time duration since last token was added.
//计算此次请求距离上一次请求(这里的上一次请求指的是一个统计窗口周期内的第一个请求)过去了多久
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
//一个统计时间窗口已经过去,会重新计算在此时间窗口内有多少次请求可以通过
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
//获取请求通过时剩余令牌数
long restQps = oldQps.get();
/** * 举例说明,比如1s 允许通过100个请求,当前时间是1900ms,
* 上一次请求时间是800ms * 那么totalCount = 1100 * 100 / 1 * 1000 = 110
*/
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
/**
* 最新的qps需要之前剩余的加上间隔时间内新增的,但是不能超过最大值
*/
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
//这里需要更新lastAddTokenTime
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
//未超过时间窗口,获取剩余令牌数
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
//这里不需要更新lastAddTokenTime,因为还在一个统计窗口时间周期内
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
}
}
代码较为冗长,我们来看一下整体的流程图:
令牌桶算法最重要的就是关注令牌生成的部分。而在sentinel的实现中,令牌生成是在 当前请求时间距离上一次请求时间已经大于设置的窗口时间。那么该算法是怎么支持突发流量的呢。
我们以一个具体示例来看:
假定我们设置一分钟请求不能超过1000次。时间窗口durationInSec = 60s
,tokenCount = 1000;允许突发的流量为burstCount = 2000; maxCount = tokenCount + burstCount = 3000;
假设第1s来了一个请求,因为首次请求,通过,剩余令牌数为 2999。在第10s-20s 流量增大陆续来了2900次请求,因为都在一个窗口时间内且令牌数足够,所有请求均通过,剩余令牌数为9。随后教长一段时间没有请求,一直到第90s请求陆续到达,这时90s距离上一次请求时间20s已经过去了一个时间窗口,我们需要计算在这 70s内我们需要新增多少个令牌数
toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000)
=70s * 1000ms * 1000 / 60s * 1000ms = 1166.67
newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
上次剩余的令牌数 restQps = 9
newQps = 1166.67 + 9 -1 = 1174.67 < maxCount
所有在接下来的 60s-120s 时间段内,请求总数不可以超过1174.67。
但是随着时间的流逝,只要在一段时间内没有请求过来,令牌又会达到maxCount个,理论上只要3分钟内没有请求到达,令牌数最少又会恢复到maxCount=3000个。
我们发现,sentinel应对突发流量,有个缓冲的效果,首次流量突发之后,如果再有流量突发的情况,必须给系统足够的缓冲时间,否则依然会被限流。
总结:
漏桶算法和令牌桶算法两者主要区别在于“漏桶算法”能够强行限制数据的传输速率,而“令牌桶算法”在能够限制数据的平均传输速率外,还允许某种程度的突发传输。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的门限,所以它适合于具有突发特性的流量