各个ProcesserSlot分析
• NodeSelectorSlot:
选择一个统计Node节点放置当前的curEntry中,entry方法
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
}
// Build invocation tree
((DefaultNode)context.getLastNode()).addChild(node);
}
}
// context的curEntry的当前节点设置成该node
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
DefaultNode是通过context中name得到,然后放入到map中。之前说过,一个ResourceWapper对应一个Chain对象,也说明了一个ResourceWapper对应一个NodeSelectorSlot对象,由于context创建可以设置不同的name,所以导致了一个ResourceWapper可能得到不同的DefaultNode对象,不同的context文本设置的curEntry的Node也是不一样的。在执行fireEntry方法时,将obj对象设置成了DefaultNode了。
• ClusterBuilderSlot:entry方法
// 创建一个 集群节点
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode();
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
// origin有值,则需要对他创建 统计node,并且放入到clusterNode的originCountMap中
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
// 将当前的entry设置origin节点
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
clusterNode是ClusterBuilderSlot的属性,所以一个ResourceWapper对应一个clusterNode对象,并且设置了DefaultNode对象中的clusterNode对象,如果存在origin来源标识,那么通过clusterNode去创建一个与origin关联的Node。从这里看出clusterNode作用,他是记录同一个resource的所有统计信息,与context没有关系。
• StatisticSlot:记录事件功能
它是各个事件统计的入口,例如通过各个processorSlot后,增加线程,增加qps,或者出现block异常时,增加blockQps。在退出方法中,增加相应耗时记录,减少线程等。部分entry方法代码:
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// 针对当前entry对象对应的origin节点进行增加
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
// 增对系统会输入状态的节点进行增加
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
} catch(BlockException e) {
// .....
}
首先StatisticSlot是先去调用其他ProcessorSlot结束后,才开始执行自身的逻辑。该统计,不仅仅只是使用当前node进行记录,有curEntry中对应的OriginNode也会进行记录,还有一个Constants.ENTRY_NODE 该节点是整个服务器的记录节点,记录整个服务器的请求信息,为什么需要他?可以认为系统也需要限流,保证系统的稳定。
在exit方法中
if (context.getCurEntry().getError() == null) {
// Calculate response time (max RT is TIME_DROP_VALVE).
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// Record response time and success count.
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
在退出方法中,也是相关节点记录的也是需要记录,并且在没有异常的情况下,例如Block异常是不需要进行统计在里面的。这里可能会产生误区,该error异常,是通过processorSlot执行过程中产生的,所以对于业务逻辑出现异常,他是不关心的。该记录还是需要记录。因为业务逻辑产生的异常,无法控制,不过可以通过抓取异常,进行降级处理,也是不错的方案。
• SystemSlot:系统处理槽
监控系统情况,主要通过SystemRuleManager的checkSystem方法,通过监控系统的负载,cpu使用情况等,进行check系统本身,如果不通过就抛出SystemBlockException异常。当然,sentinel设计该检测服务器状态,不仅仅应该操作系统的本身负载,cpu等信息。还会通过Constants.ENTRY_NODE 该节点的统计信息,也作为参考指标。该设计原因,github,作者也有阐述:
https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81
• AuthoritySlot:黑白名单限制
主要通过AuthorityRule规则,设置白名单或者黑名单状态,并且通过context得到origin请求源,进行限定。主要是在消费者,生产者中关系最为常见。
• DegradeSlot:降级处理槽
主要通过DegradeRule规则进行限定,有响应耗时,异常比例,异常数量等等模式去控制。并且通过当前resource获取ClusterNode节点统计信息,作为参考数值。当被限制时,会将cut状态设置为true,将后面的调用都拒绝。但是cut的状态会通过重置线程,一定的时间后被重置。保证了之后的调用不会被直接拒绝。DegradeRule中passCheck方法部分代码:
......
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < RT_MAX_EXCEED_N) {
return true;
}
}
.....
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
在平均响应时间做参考时,到达阈值时并没有拒绝,而是任然可以通过RT_MAX_EXCEED_N数量的调用。resetTask主要是重置passCount和cut状态,他是被放在一个线程池中延迟启动线程。
• FlowSlot 流式处理槽
当qps,success,pass,block有这些数据时候,怎么通过这些数据去提供限流。当然check pass的依据是通过 FlowRule中规则限定的。基本上FlowRule中依据的规则主要是qps和thread方式做限制的。但是这些qps,succes等维度统计数据都记录在Node中,发现StatisticNode中相关的具体Node有好多形式,例如与Origin关联的Node,与当前文本关联的DefaultNode,还有与resource关联的ClusterNode。那么限流肯定会取其中一个Node做依据,所以FlowRule中一个属性strategy,来去选择某一个Node。
/**
* Flow control strategy based on invocation chain.
*
* {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin);
* {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource);
* {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource).
*/
private int strategy = RuleConstant.STRATEGY_DIRECT;
从注释中,可以解读:
STRATEGY_DIRECT 直接通过origin相关的Node做控制;
STRATEGY_RELATE 通过refResource属性获取Node做关联控制;
STRATEGY_CHAIN 就是通过入口的Node进行控制;
至于如何代码实现逻辑可以通过 FlowRuleChecker类中静态方法selectNodeByRequesterAndStrategy()得到Node。
总结:
sentinel如何限流基本可以略知一二,主要在调用时,需要声明资源名字,对该资源依附自身的StatisticNode,经过一些列的ProcessorSlot的entry,包括系统限定,降级限定,还有流式限定等等。如果无法通过时,Block异常,那么就不会真正执行业务方法,保护了资源对应的系统功能,间接保证服务的稳定。