聊聊Apache Sentinel

概述

Sentinel 是一个用于微服务架构的流量控制和熔断保护框架,旨在帮助开发者构建更加稳定和可靠的系统。核心功能包括限流、熔断、降级和系统监控。它不仅支持基于线程数、QPS等多种限流策略,还能实时监控系统状态,帮助开发者快速识别和解决问题。

基础概念

Sentinel 需要对每个资源的访问进行管理和追踪,这就涉及到两个核心概念:EntryContext

  • Entry:用于保护资源访问的核心抽象。每次对某个受保护资源的访问,都会通过创建一个 Entry 来标识这次请求。Entry 记录了资源的名称、类型、调用方信息等,用于后续的统计和控制。
  • Context:代表了调用链的上下文。通过 Context,Sentinel 能够跟踪整个调用链中多个资源的访问情况,并为后续的流量控制、限流、熔断等功能提供支持。

在谈及流量控制时,首先需要解决的问题无疑是数据统计。只有在对请求进行充分的数据统计后,才能实施有效的流量控制。为此,Sentinel 设计了一系列类来进行精确的数据统计,为后续的流量管理提供基础支持。

  • StatisticNode:最为基础的统计节点,用于存储统计数据。
  • DefaultNode:上下文资源维度数据存储。
  • ClusterNode:集群维度数据存储。
  • EntranceNode:上下文维度数据存储,通过遍历上下文下面的所有子节点统计数据。

有了数据统计的类,接下来就要考虑如何采集数据并实施流量控制。这时,Sentinel 中的另一个关键组件——ProcessorSlotChain 就派上用场了。ProcessorSlotChain 通过将不同功能的 Slot 串联在一起,形成一条责任链,负责数据采集和流量控制。每个 Slot 在责任链中承担着不同的任务,例如流量统计、熔断判断和降级处理等。

核心原理

在分析 Sentinel 核心源码之前,先通过一个简单的 demo 展示 Sentinel 的基本使用方法。通过这个例子,我们可以看到如何设置限流规则并保护资源。

实例演示:流量控制

public class SentinelDemo {
    public static void main(String[] args) {
        // 定义流量控制规则
        FlowRule rule = new FlowRule();
        rule.setResource("testResource");  // 设置资源名称
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);  // 按照 QPS 限流
        rule.setCount(1);  // 设置 QPS 限制为 1
       
        // 加载规则
        FlowRuleManager.loadRules(Collections.singletonList(rule));
        
        while (true) {
            // 使用 Entry 保护资源
            try (Entry entry = SphU.entry("testResource")) {
                // 被保护的业务逻辑
                System.out.println("访问资源成功!");
            } catch (BlockException e) {
                // 限流后的处理逻辑
                System.out.println("资源被限流!");
            }
        }
    }
}

在上述代码中,资源 testResource 设置了每秒最多允许一次访问。程序会在资源被限流时打印 "资源被限流!",否则打印 "访问资源成功!"。

代码分析:核心流程解读

这个简单的 demo 展现了 Sentinel 的基本工作流程,接下来我们深入分析其中的核心实现逻辑。

1. SphU.entry():创建资源访问入口

SphU.entry() 是 Sentinel 中保护资源的入口方法。每次调用 entry() 方法时,Sentinel 会为该资源创建一个 Entry,并依赖其背后的责任链机制执行流量控制和熔断判断。

2. ProcessorSlotChain:责任链的核心

ProcessorSlotChain 是 Sentinel 内部用来处理流量控制、统计和熔断的核心组件。每次调用 SphU.entry(),都将触发一系列 Slot 的执行,如流量统计、限流判断等。

public class CtSph implements Sph {
    // 同一资源使用同一个调用链
    private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
        = new HashMap<ResourceWrapper, ProcessorSlotChain>();
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
        Context context = ContextUtil.getContext();
        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        Entry e = new CtEntry(resourceWrapper, chain, context, count, args);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
}

为更好地理解 ProcessorSlotChain 的执行流程,通过一张链路图来直观展示各个 Slot 在责任链中的工作顺序:

ProcessorSlotChain 中的 fireEntry() 方法依次触发不同的 Slot,每个 Slot 完成自己的逻辑后,再调用下一个 Slot。通常这些 Slot 实现限流、统计、熔断、权限控制等功能。每个 Slot 的实现继承自 AbstractLinkedProcessorSlot,负责接收请求并执行特定的逻辑。

NodeSelectorSlot 是责任链中的第一个 Slot,负责根据资源名构建资源调用树,并为每个资源创建 Node,将其与当前 Context 绑定,通过资源树结构,帮助统计和监控整个调用链路的数据。

public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
    // 同一资源在不同上下文被访问时,会创建多个DefaultNode
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }
        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

ClusterBuilderSlot 负责为每个资源构建簇点 (ClusterNode),用于记录资源的全局统计数据(如 QPS、异常次数等),以及按调用来源(IP 或应用)划分的统计数据。

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    // 同一资源使用同一个ClusterNode
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

    private static final Object lock = new Object();

    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);
                    clusterNodeMap = newMap;
                }
            }
        }
        node.setClusterNode(clusterNode);
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

StatisticSlot 是统计请求数据的核心 Slot,它会为当前资源记录 QPS、响应时间、异常次数等关键信息,为限流和熔断的触发提供数据基础。

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
          // Do some checking.
          fireEntry(context, resourceWrapper, node, count, prioritized, args);
          // Request passed, add thread count and pass count.
          node.increaseThreadNum();
          node.addPassRequest(count);
          if (context.getCurEntry().getOriginNode() != null) {
              // Add count for origin node.
              context.getCurEntry().getOriginNode().increaseThreadNum();
              context.getCurEntry().getOriginNode().addPassRequest(count);
          }
          if (resourceWrapper.getEntryType() == EntryType.IN) {
              // Add count for global inbound entry node for global statistics.
              Constants.ENTRY_NODE.increaseThreadNum();
              Constants.ENTRY_NODE.addPassRequest(count);
          }
          // Handle pass event with registered entry callback handlers.
          for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
              handler.onPass(context, resourceWrapper, node, count, args);
          }
    }
}

SystemSlot 主要负责根据系统的整体状态(如系统负载、CPU 使用率等)进行自适应的限流。它主要防止系统过载,确保在高负载时系统能够平稳运行。

public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        SystemRuleManager.checkSystem(resourceWrapper, count);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

AuthoritySlot 负责基于访问权限进行控制。通过 IP 或应用来源的黑白名单进行限流,确保只有授权的请求才能访问某些资源。

public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        checkBlackWhiteAuthority(resourceWrapper, context);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

FlowSlot 是 Sentinel 中实现限流的核心 Slot,它会根据当前资源的实时数据(如 QPS、并发线程数等)判断是否需要触发限流,并执行相应的限流逻辑。

public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

DegradeSlot 负责实现熔断和降级功能。当某个资源的异常比率或响应时间超过预设的阈值时,触发熔断,暂停该资源的访问。

public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        performChecking(context, resourceWrapper);
        
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

总结

Sentinel 的核心架构基于责任链模式,将各个功能模块(如流量控制、统计、熔断、权限管理等)以 Slot 的形式串联起来。每个 Slot 负责执行特定的任务,整个调用链条从资源访问的入口一直贯穿到流量保护、熔断、降级等逻辑。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容