一、概述
上篇文章介绍过了NodeSelectorSlot和ClusterBuilderSlot两种插槽,接下来我们沿着默认的插槽链继续分析LogSlot和StatisticSlot。
二、LogSlot
我们看代码:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.info("Entry exception", e);
}
}
可以发现,如果在接下来的插槽链中发生BlockException异常的话,LogSlot会记录日志信息。
继续看EagleEyeLogUtil类:
public class EagleEyeLogUtil {
public static final String FILE_NAME = "sentinel-block.log";
private static StatLogger statLogger;
static {
String path = LogBase.getLogBaseDir() + FILE_NAME;
statLogger = EagleEye.statLoggerBuilder("sentinel-block-log")
.intervalSeconds(1)
.entryDelimiter('|')
.keyDelimiter(',')
.valueDelimiter(',')
.maxEntryCount(6000)
.configLogFilePath(path)
.maxFileSizeMB(300)
.maxBackupIndex(3)
.buildSingleton();
}
public static void log(String resource, String exceptionName, String ruleLimitApp, String origin, int count) {
statLogger.stat(resource, exceptionName, ruleLimitApp, origin).count(count);
}
}
可以发现block日志是记录在sentienl-block.log中,这里block日志的格式化及本地文件的写入功能在eagleeye包中,如图:
个人觉得这个异常日志记录相对于sentienl的record日志开发有点过于复杂,鉴于不是sentienl的核心功能,我也只大概看了里面的代码。
大致就是在StatEntry中调用count的方法,然后在StatRollingData中调用StatLogController中scheduleWriteTask方法进而创建一个定时任务线程池,任务是StatLogWriteTask,在这个任务中写入日志到本地文件中。
三、StatisticSlot
StatisticSlot是sentienl的指标数据统计插槽,也是sentienl种非常重要的一个模块,sentienl后续的限流,降级,熔断都是根据这一阶段的统计数据进行。
前面文章在介绍sentinel的滑动时间窗口时,已经知道sentienl的指标统计是基于滑动时间窗口的,可以看文章 Sentinel之滑动时间窗口设计。
下面主要看在StatisticSlot插槽中,sentinel做了哪些数据的统计的。
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest();
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps();
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps();
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps();
}
throw e;
}
}
在方法entry中,这里是先调用fireEntry方法继续资源保护检查。
请求通过
增加node的线程数increaseThreadNum。
在DefaultNode类中increaseThreadNum方法中:
@Override
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
然后调用StatisticNode中increaseThreadNum方法,在increaseThreadNum方法中执行curThreadNum.incrementAndGet()。
这里线程数增加是通过定义一个原子变量的,curThreadNum是一个AtomicInteger原子变量,初始值是0 ,执行incrementAndGet方法加1,并返回当前值。
增加node的通过请求数addPassRequest。
在DefaultNode类中addPassRequest方法中:
@Override
public void addPassRequest() {
super.addPassRequest();
this.clusterNode.addPassRequest();
}
在StatisticNode中的addPassRequest方法:
@Override
public void addPassRequest() {
rollingCounterInSecond.addPass();
rollingCounterInMinute.addPass();
}
根据滑动时间窗口统计了两个时间窗口的数据。
- rollingCounterInSecond:时间窗口是1s
- rollingCounterInMinute:时间窗口是1分钟
源节点originNode
如果originNode存在,则也需要增加originNode的线程数和请求通过数。
ENTRY_NODE
如果资源包装类型是IN的话,则需要ENTRY_NODE的线程数和请求通过数。
ENTRY_NODE是sentinel全局的统计节点,用于后续系统规则检查。
ProcessorSlotEntryCallback
然后再循环处理注册了ProcessorSlotEntryCallback的StatisticSlot。
StatisticSlotCallbackRegistry是一个StatisticSlot回调注册器,目前只有两种回调支持,ProcessorSlotEntryCallback和ProcessorSlotExitCallback。
ProcessorSlotEntryCallback是在资源正常调用时处理,ProcessorSlotExitCallback是在发送BlockException异常或者资源退出时调用。
目前只有热点限流的统计把ParamFlowStatisticSlotCallbackInit注册到ProcessorSlotEntryCallback中,看下面代码。
public class ParamFlowStatisticSlotCallbackInit implements InitFunc {
@Override
public void init() {
StatisticSlotCallbackRegistry.addEntryCallback(ParamFlowStatisticEntryCallback.class.getName(),
new ParamFlowStatisticEntryCallback());
}
}
这里系统若是引用了热点参数限流模块,则客户端系统会在启动时把ProcessorSlotEntryCallback注册到StatisticSlotCallbackRegistry中。
在ProcessorSlotEntryCallback中
public class ParamFlowStatisticEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {
@Override
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Exception {
// The "hot spot" parameter metric is present only if parameter flow rules for the resource exist.
ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper);
if (parameterMetric != null) {
parameterMetric.addPass(count, args);
}
}
@Override
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param,
int count, Object... args) {
// Here we don't add block count here because checking the type of block exception can affect performance.
// We add the block count when throwing the ParamFlowException instead.
}
}
onPass方法就是用来热点参数的指标的,热点参数限流可以专门一个模块来学习,这里后面会再专门讲解。
抛出BlockException
SetError
这里会设置Context的CurEntry的Error属性,error属性可以在资源退出的时候判断使用,若不存在error,会统计响应时间rt等。
增加block线程数
在DefaultNode类中increaseBlockQps方法中:
@Override
public void increaseBlockQps() {
super.increaseBlockQps();
this.clusterNode.increaseBlockQps();
}
在StatisticNode中的increaseBlockQps方法:
@Override
public void increaseBlockQps() {
rollingCounterInSecond.addBlock();
rollingCounterInMinute.addBlock();
}
根据滑动时间窗口统计了两个时间窗口的数据。
rollingCounterInSecond:时间窗口是1s
rollingCounterInMinute:时间窗口是1分钟
源节点originNode
如果originNode存在,则也需要增加originNode请求Block数。
ENTRY_NODE
如果资源包装类型是IN的话,则需要ENTRY_NODE的Block数。
然后再进行ProcessorSlotEntryCallback的onBlocke方法,这个也是热点参数限流才会有。
抛出Throwable
先设置curEntry的error
增加node的异常数
node.increaseExceptionQps();
DefaultNode中:
@Override
public void increaseExceptionQps() {
super.increaseExceptionQps();
this.clusterNode.increaseExceptionQps();
}
然后判断originNode和包装类型,增加对应的exception数。
在Exit中
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
// Calculate response time (max RT is TIME_DROP_VALVE).
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// Record response time and success count.
node.rt(rt);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().rt(rt);
}
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.rt(rt);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
// Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
fireExit(context, resourceWrapper, count);
}
这里主要统计这次请求的响应时间rt,
public final static int TIME_DROP_VALVE = 4900;
如果rt大于TIME_DROP_VALVE,则设置rt为TIME_DROP_VALVE。
- 设置node的rt:node.rt(rt);
- 设置originNode的rt:context.getCurEntry().getOriginNode().rt(rt);
- 减掉node的线程数 node.decreaseThreadNum();
- 减掉originNode线程数 context.getCurEntry().getOriginNode().decreaseThreadNum();
- Constants.ENTRY_NODE的设置
- 最后处理ProcessorSlotExitCallback的的onExit方法。
四、我的总结
1、介绍LogSlot和StatisticSlot插槽的代码设计,其中LogSlot没有详细深入。
2、LogSlot代码设计较为复杂,个人感觉功能可以重新设计。
3、StatisticSlot的统计是根据滑动时间窗口,但是线程数的统计是设置一个AtomicInteger原子变量。
3、如果StatisticSlotCallbackRegistry中注册ProcessorSlotEntryCallback和ProcessorSlotExitCallback回调器,则在回调器中也会统计数据,目前只有热点参数限流会使用。
4、Sentinel后续的插槽就是根据StatisticSlot统计的数据进行资源的保护的。
以上内容若有不当之处,请指正,谢谢!