一、引子
该篇是插槽分析的最后一篇。
何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:
- 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
- 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。
Sentinel 利用 LRU 策略,结合底层的滑动窗口机制来实现热点参数统计。LRU 策略可以统计单位时间内,最近最常访问的热点参数,而滑动窗口机制可以帮助统计每个参数的 QPS。
二、使用
使用热点限流功能,需要引入以下依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<version>x.y.z</version>
</dependency>
然后为对应的资源配置热点参数限流规则,并在 entry
的时候传入相应的参数,即可使热点参数限流生效。
注:若自行扩展并注册了自己实现的
SlotChainBuilder
,并希望使用热点参数限流功能,则可以在chain
里面合适的地方插入ParamFlowSlot
。
public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException
public static Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException
其中最后的一串 args 就是要传入的参数,有多个就按照次序依次传入。比如要传入两个参数 paramA 和 paramB,则可以:
// paramA in index 0, paramB in index 1.
SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);
三、热点参数规则
热点参数规则(ParamFlowRule
)类似于流量控制规则(FlowRule
):
属性 | 说明 | 默认值 |
---|---|---|
resource | 资源名,必填 | |
count | 限流阈值,必填 | |
grade | 限流模式 | QPS 模式 |
paramIdx | 热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置 |
|
paramFlowItemList | 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型
|
可以通过 ParamFlowRuleManager
的 loadRules
方法更新热点参数规则,下面是一个示例:
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(0)
.setCount(5);
// 针对 int 类型的参数 PARAM_B,单独设置限流 QPS 阈值为 10,而不是全局的阈值 5.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
.setClassType(int.class.getName())
.setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
四、源码分析
1、ParamFlowSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
//检查该资源是否存在限流资源
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, args);
return;
}
//限流规则检测
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, args);
}
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args)
throws BlockException {
//若存在限流规则
if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
//获取限流规则
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
if (rules == null) {
return;
}
for (ParamFlowRule rule : rules) {
// Initialize the parameter metrics.
//初始化参数统计
initHotParamMetricsFor(resourceWrapper, rule.getParamIdx());
//若热点参数限流符合,则进行限流处理
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
// Here we add the block count.
addBlockCount(resourceWrapper, count, args);
String message = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
message = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), message);
}
}
}
}
通过checkFlow进行限流规则检测:
- 若该资源存在限流规则,并获取该资源的限流规则;否则跳过检测;
- 循环判断规则,并先初始化热点参数统计窗口,如下:
void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ int index) {
ParameterMetric metric;
// Assume that the resource is valid.
if ((metric = metricsMap.get(resourceWrapper)) == null) {
synchronized (LOCK) {
if ((metric = metricsMap.get(resourceWrapper)) == null) {
metric = new ParameterMetric();
metricsMap.put(resourceWrapper, metric);
RecordLog.info("[ParamFlowSlot] Creating parameter metric for: " + resourceWrapper.getName());
}
}
}
metric.initializeForIndex(index);
}
通过ParameterMetric的initialiazeForIndex初始了一个时间窗口。
- 通过ParamFlowChecker的passCheck方法检测规则。
2、ParamFlowChecker
static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
// 如果参数不存在直接返回
if (args == null) {
return true;
}
int paramIdx = rule.getParamIdx();
//参数的个数小于规则的索引直接返回
if (args.length <= paramIdx) {
return true;
}
Object value = args[paramIdx];
//规则调用
return passLocalCheck(resourceWrapper, rule, count, value);
}
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.info("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
在passLocalCheck方法中:
1.依次对Collection类型,Array类型,非数组集合类型进行处理,进入到passSingleValueCheck中;
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//获取当前参数在该资源时间窗口内通过的数量
double curCount = getHotParameters(resourceWrapper).getPassParamQps(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
// Pass check for exclusion items.
int itemQps = rule.getParsedHotItems().get(value);
return curCount + count <= itemQps;
} else if (curCount + count > rule.getCount()) {
if ((curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0) {
return true;
}
return false;
}
}
return true;
}
- 获取rule的解析参数集合HotItems;
- 获取当前参数在该资源时间窗口内通过的数量;
- 如果exclusionItems(参数例外项)包含该值,判断
curCount + count
大于itemQps
,itemQps
是该参数对应的count,若大于则说明超过限流阈值,则返回false,反之返回true; - 如果exclusionItems不包含该值,并且
curCount + count
大于rule.getCount()
时,并判断(curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0
是否满足,若满足则返回true,反之返回false。
3、ParameterMetric
//保存了对应资源的时间窗口数据
private Map<Integer, HotParameterLeapArray> rollingParameters =
new ConcurrentHashMap<Integer, HotParameterLeapArray>();
public Map<Integer, HotParameterLeapArray> getRollingParameters() {
return rollingParameters;
}
public synchronized void clear() {
rollingParameters.clear();
}
// 初始化一个时间窗口
public void initializeForIndex(int index) {
if (!rollingParameters.containsKey(index)) {
synchronized (this) {
// putIfAbsent
if (rollingParameters.get(index) == null) {
rollingParameters.put(index, new HotParameterLeapArray(
1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL));
}
}
}
}
热点参数统计
//增加通过数
public void addPass(int count, Object... args) {
add(RollingParamEvent.REQUEST_PASSED, count, args);
}
//增加阻塞数
public void addBlock(int count, Object... args) {
add(RollingParamEvent.REQUEST_BLOCKED, count, args);
}
@SuppressWarnings("rawtypes")
private void add(RollingParamEvent event, int count, Object... args) {
if (args == null) {
return;
}
try {
for (int index = 0; index < args.length; index++) {
HotParameterLeapArray param = rollingParameters.get(index);
if (param == null) {
continue;
}
Object arg = args[index];
if (arg == null) {
continue;
}
if (Collection.class.isAssignableFrom(arg.getClass())) {
for (Object value : ((Collection)arg)) {
param.addValue(event, count, value);
}
} else if (arg.getClass().isArray()) {
int length = Array.getLength(arg);
for (int i = 0; i < length; i++) {
Object value = Array.get(arg, i);
param.addValue(event, count, value);
}
} else {
param.addValue(event, count, arg);
}
}
} catch (Throwable e) {
RecordLog.warn("[ParameterMetric] Param exception", e);
}
}
- 可以发现热点参数的统计也是基于滑动时间窗口统计,这个就不具体分析了,滑动时间窗口前面有讲解,见滑动时间窗口。
2.有点不同的是:在限流规则里指标的是通过LongAdder分段统计的,而热点参数的指标是通过AtomicInteger的addAndGet方法统计的。
public ParamMapBucket add(RollingParamEvent event, int count, Object value) {
data[event.ordinal()].putIfAbsent(value, new AtomicInteger());
AtomicInteger counter = data[event.ordinal()].get(value);
counter.addAndGet(count);
return this;
}
3.热点参数获取的实际上是统计范围内一个平均值。如下:
public double getPassParamQps(int index, Object value) {
try {
HotParameterLeapArray parameter = rollingParameters.get(index);
if (parameter == null || value == null) {
return -1;
}
return parameter.getRollingAvg(RollingParamEvent.REQUEST_PASSED, value);
} catch (Throwable e) {
RecordLog.info(e.getMessage(), e);
}
return -1;
}
public double getRollingAvg(RollingParamEvent event, Object value) {
return ((double) getRollingSum(event, value)) / getIntervalInSec();
}
public long getRollingSum(RollingParamEvent event, Object value) {
currentWindow();
long sum = 0;
List<ParamMapBucket> buckets = this.values();
for (ParamMapBucket b : buckets) {
sum += b.get(event, value);
}
return sum;
}
五、我的总结
- 本文介绍热点参数的概念、使用、及部分源码分析。
- 热点参数限流使用需要单独引用Jar包,并设置规则后才会启动限流效果。
- 目前热点参数限流只是支持QPS模式,且还支持额外参数项进行限流。
- 热点参数的参数限流统计也是基于滑动窗口统计的,内部使用了AtomicInteger的原子结构统计及ConcurrentLinkedHashMap结构作为数据存储。
以上内容,如有不当之处,请指正