集群限流源码分析(二)
接下来分析sentinel-cluster-server-default模块中server包下的内容。
1、源码目录结构

2、源码分析

2.1 编解码codec
2.1.1 ServerEntityCodecProvider
编解码提供工具类,用于获取RequestEntityDecoder与ResponseEntityWriter;
源码如下:
public final class ServerEntityCodecProvider {
    private static RequestEntityDecoder requestEntityDecoder = null;
    private static ResponseEntityWriter responseEntityWriter = null;
    static {
        resolveInstance();
    }
    private static void resolveInstance() {
        ResponseEntityWriter writer = SpiLoader.loadFirstInstance(ResponseEntityWriter.class);
        if (writer == null) {
            RecordLog.warn("[ServerEntityCodecProvider] No existing response entity writer, resolve failed");
        } else {
            responseEntityWriter = writer;
            RecordLog.info(
                "[ServerEntityCodecProvider] Response entity writer resolved: " + responseEntityWriter.getClass()
                    .getCanonicalName());
        }
        RequestEntityDecoder decoder = SpiLoader.loadFirstInstance(RequestEntityDecoder.class);
        if (decoder == null) {
            RecordLog.warn("[ServerEntityCodecProvider] No existing request entity decoder, resolve failed");
        } else {
            requestEntityDecoder = decoder;
            RecordLog.info(
                "[ServerEntityCodecProvider] Request entity decoder resolved: " + requestEntityDecoder.getClass()
                    .getCanonicalName());
        }
    }
    public static RequestEntityDecoder getRequestEntityDecoder() {
        return requestEntityDecoder;
    }
    public static ResponseEntityWriter getResponseEntityWriter() {
        return responseEntityWriter;
    }
}
实例变量
- requestEntityDecoder:客户端请求解码器
- responseEntityWriter:服务端响应编码器
静态方法
- resolveInstance:通过SPI机制获取requestEntityDecoder与responseEntityWriter的实现类;并保存在实例变量中。
- requestEntityDecoder与responseEntityWriter实现类分别是DefaultRequestEntityDecoder、DefaultResponseEntityWriter;实现类的指定配置META-INF.services目录下。
2.1.2 DefaultRequestEntityDecoder
默认的ClusterRequest对象解码器
解码格式如下:
- +--------+---------+---------+
- | xid(4) | type(1) | data... |
- +--------+---------+---------+
源码
@Override
public ClusterRequest decode(ByteBuf source) {
    //1. 判断可读的字节数是否大于5个字节
    if (source.readableBytes() >= 5) {
        //2. 获取xid:4个字节
        int xid = source.readInt();
        //3. 获取type 1个字节;三种类型:PING、FLOW、PARAM_FLOW:下文会讲
        int type = source.readByte();
        //4. 从注册器中获取具体类型的解码器
        EntityDecoder<ByteBuf, ?> dataDecoder = RequestDataDecodeRegistry.getDecoder(type);
        if (dataDecoder == null) {
            RecordLog.warn("Unknown type of request data decoder: {0}", type);
            return null;
        }
        Object data;
        //5. 再次确认下可读取的字节数
        if (source.readableBytes() == 0) {
            data = null;
        } else {
            // 6. 读取数据
            data = dataDecoder.decode(source);
        }
        return new ClusterRequest<>(xid, type, data);
    }
    return null;
}
2.1.3 DefaultResponseEntityWriter
默认的ClusterResponse响应编码器
源码
@Override
public void writeTo(ClusterResponse response, ByteBuf out) {
    //1. 获取响应类型及具体的响应数据编码器
    int type = response.getType();
    EntityWriter<Object, ByteBuf> responseDataWriter = ResponseDataWriterRegistrygetWriter(type);
    if (responseDataWriter == null) {
        writeHead(response.setStatus(ClusterConstants.RESPONSE_STATUS_BAD), out);
        RecordLog.warn("[NettyResponseEncoder] Cannot find matching writer for type <{0>", response.getType());
        return;
    }
    // 2. 写入头部数据
    writeHead(response, out);
    // 3. 写入数据包
    responseDataWriter.writeTo(response.getData(), out);
}
private void writeHead(Response response, ByteBuf out) {
    out.writeInt(response.getId());
    out.writeByte(response.getType());
    //状态,如错误状态会写入
    out.writeByte(response.getStatus());
}
2.1.3 registry
- RequestDataDecodeRegistry:RequestData解码注册器
- ResponseDataWriterRegistry:ResponseData编码注册器
 源码
public final class RequestDataDecodeRegistry {
    //保存EntityDecoder的Map,key是类型type
    private static final Map<Integer, EntityDecoder<ByteBuf, ?>> DECODER_MAP = new HashMap<>();
    public static boolean addDecoder(int type, EntityDecoder<ByteBuf, ?> decoder) {
        if (DECODER_MAP.containsKey(type)) {
            return false;
        }
        DECODER_MAP.put(type, decoder);
        return true;
    }
    public static EntityDecoder<ByteBuf, Object> getDecoder(int type) {
        return (EntityDecoder<ByteBuf, Object>)DECODER_MAP.get(type);
    }
    public static boolean removeDecoder(int type) {
        return DECODER_MAP.remove(type) != null;
    }
}
public final class ResponseDataWriterRegistry {
    //保存EntityWriter的Map,key是类型type
    private static final Map<Integer, EntityWriter<Object, ByteBuf>> WRITER_MAP = new HashMap<>();
    public static <T> boolean addWriter(int type, EntityWriter<T, ByteBuf> writer) {
        if (WRITER_MAP.containsKey(type)) {
            return false;
        }
        WRITER_MAP.put(type, (EntityWriter<Object, ByteBuf>)writer);
        return true;
    }
    public static EntityWriter<Object, ByteBuf> getWriter(int type) {
        return WRITER_MAP.get(type);
    }
    public static boolean remove(int type) {
        return WRITER_MAP.remove(type) != null;
    }
}
源码很简单,可以理解就是一个工具类;那么Map的中编码、解码器数据在哪个地方设置的呢;在后面的init包中源码会分析。
2.1.4 netty
这个包下其实就是handler类了,会放在ChannelPipeline中;在NettyTransportServer启动类中可以看到。
源码分析
public class NettyRequestDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //1. 获取RequestEntityDecoder
        RequestEntityDecoder<ByteBuf, Request> requestDecoder = ServerEntityCodecProvider.getRequestEntityDecoder();
        if (requestDecoder == null) {
            RecordLog.warn("[NettyRequestDecoder] Cannot resolve the global request entity decoder, "
                + "dropping the request");
            return;
        }
        // TODO: handle decode error here.
        // 2. 请求数据解码
        Request request = requestDecoder.decode(in);
        if (request != null) {
            out.add(request);
        }
    }
}
public class NettyResponseEncoder extends MessageToByteEncoder<ClusterResponse> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ClusterResponse response, ByteBuf out) throws Exception {
        //1. 获取ResponseEntityWriter
        ResponseEntityWriter<ClusterResponse, ByteBuf> responseEntityWriter = ServerEntityCodecProvider.getResponseEntityWriter();
        if (responseEntityWriter == null) {
            RecordLog.warn("[NettyResponseEncoder] Cannot resolve the global response entity writer, reply bad status");
            //写入错误状态数据
            writeBadStatusHead(response, out);
            return;
        }
        //2. 写入数据
        responseEntityWriter.writeTo(response, out);
    }
    private void writeBadStatusHead(Response response, ByteBuf out) {
        out.writeInt(response.getId());
        out.writeByte(ClusterConstants.RESPONSE_STATUS_BAD);
        out.writeByte(response.getStatus());
    }
}
- NettyRequestDecoder类继承ByteToMessageDecoder,而MessageToByteEncoder又继承ChannelInboundHandlerAdapter,熟悉netty的知道,通过继承ChannelInboundHandlerAdapter可以自定义拦截器
- NettyResponseEncoder继承MessageToByteEncoder,MessageToByteEncoder继承ChannelOutboundHandlerAdapter
2.1.5 data
该包下就是EntityWriter和EntityDecoder的实现类了;其中EntityDecoder的子类有FlowRequestDataDecoder,ParamFlowRequestDataDecoder,PingRequestDataDecoder;EntityWriter的子类有FlowResponseDataWriter,PingResponseDataWriter。
2.1.5.1 对请求数据解码
主要可以看到FlowRequestDataDecoder、ParamFlowRequestDataDecoder、PingRequestDataDecoder类
对ClusterResponse响应的数据进行解码。
- FlowRequestDataDecoder
 通用限流响应数据解码器,解码后对象FlowRequestData,配合FlowRequestDataWriter查看
| flow ID (8) | count (4) | priority flag (1) |
源码
public class FlowRequestDataDecoder implements EntityDecoder<ByteBuf, FlowRequestData> {
    @Override
    public FlowRequestData decode(ByteBuf source) {
        // 字节数判断,需要大于12
        if (source.readableBytes() >= 12) {
            FlowRequestData requestData = new FlowRequestData()
                .setFlowId(source.readLong())
                .setCount(source.readInt());
            //判断是否有priority属性
            if (source.readableBytes() >= 1) {
                requestData.setPriority(source.readBoolean());
            }
            return requestData;
        }
        return null;
    }
}
- ParamFlowRequestDataDecoder
 热点参数解码器,解码后的对象是ParamFlowRequestData,配合ParamFlowRequestDataWriter查看
| flow ID (8) | count (4) | param count (4) |
源码
public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, ParamFlowRequestData> {
    @Override
    public ParamFlowRequestData decode(ByteBuf source) {
        //读取16个字符
        //| flow ID (8) | count (4) | param count (4) |
        if (source.readableBytes() >= 16) {
            ParamFlowRequestData requestData = new ParamFlowRequestData()
                .setFlowId(source.readLong())
                .setCount(source.readInt());
            //热点参数个数
            int amount = source.readInt();
            if (amount > 0) {
                List<Object> params = new ArrayList<>(amount);
                for (int i = 0; i < amount; i++) {
                    //解析热点参数
                    decodeParam(source, params);
                }
                requestData.setParams(params);
                return requestData;
            }
        }
        return null;
    }
    private boolean decodeParam(ByteBuf source, List<Object> params) {
        //热点参数类型
        byte paramType = source.readByte();
        switch (paramType) {
            //int
            case ClusterConstants.PARAM_TYPE_INTEGER:
                params.add(source.readInt());
                return true;
            //string
            case ClusterConstants.PARAM_TYPE_STRING:
                //先读取字符长度
                int length = source.readInt();
                byte[] bytes = new byte[length];
                //读取字符
                source.readBytes(bytes);
                // TODO: take care of charset?
                params.add(new String(bytes));
                return true;
            
            //boolean
            case ClusterConstants.PARAM_TYPE_BOOLEAN:
                params.add(source.readBoolean());
                return true;
            
            //double
            case ClusterConstants.PARAM_TYPE_DOUBLE:
                params.add(source.readDouble());
                return true;
            //long
            case ClusterConstants.PARAM_TYPE_LONG:
                params.add(source.readLong());
                return true;
            //float
            case ClusterConstants.PARAM_TYPE_FLOAT:
                params.add(source.readFloat());
                return true;
            //byte
            case ClusterConstants.PARAM_TYPE_BYTE:
                params.add(source.readByte());
                return true;
            //short
            case ClusterConstants.PARAM_TYPE_SHORT:
                params.add(source.readShort());
                return true;
            default:
                return false;
        }
    }
}
- PingRequestDataDecoder
 测试数据
2.1.5.2 响应数据编码
有FlowResponseDataWriter、PingRequestDataWriter
- FlowResponseDataWriter:Server端响应编码器,需要配合FlowResponseDataDecoder查看
源码
public class FlowResponseDataWriter implements EntityWriter<FlowTokenResponseData, ByteBuf> {
    @Override
    public void writeTo(FlowTokenResponseData entity, ByteBuf out) {
        //剩下的数据
        out.writeInt(entity.getRemainingCount());
        //等待时间
        out.writeInt(entity.getWaitInMs());
    }
}
- PingRequestDataWriter
 测试数据
2.2 配置config
这个包下主要是server的一些配置,包括常量配置、动态配置等。
2.2.1 ServerTransportConfigObserver接口
源码
public interface ServerTransportConfigObserver {
    /**
     * Callback on server transport config (e.g. port) change.
     *
     * @param config new server transport config
     */
     //定义了一个回调方法,用于传输配置变更时通知
    void onTransportConfigChange(ServerTransportConfig config);
}
2.2.2 ServerTransportConfig
实例变量
- port:端口
- idleSeconds:活跃时间
2.2.3 ServerFlowConfig
服务流控规则配置及默认值,看源码
源码
    public static final double DEFAULT_EXCEED_COUNT = 1.0d;
    public static final double DEFAULT_MAX_OCCUPY_RATIO = 1.0d;
    public static final int DEFAULT_INTERVAL_MS = 1000;
    public static final int DEFAULT_SAMPLE_COUNT= 10;
    public static final double DEFAULT_MAX_ALLOWED_QPS= 30000;
    private final String namespace;
    //超过数
    private double exceedCount = DEFAULT_EXCEED_COUNT;
    //最大比例
    private double maxOccupyRatio = DEFAULT_MAX_OCCUPY_RATIO;
    //间隔ms
    private int intervalMs = DEFAULT_INTERVAL_MS;
    //采样个数
    private int sampleCount = DEFAULT_SAMPLE_COUNT;
    //最大允许qps
    private double maxAllowedQps = DEFAULT_MAX_ALLOWED_QPS;
2.2.4 ClusterServerConfigManager
集群流控配置管理器
实例变量并赋值默认值
private static boolean embedded = false;
    /**
     * Server global transport and scope config.
     * 全局的服务端传送配置
     */
    private static volatile int port = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT;
    private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS;
    private static volatile Set<String> namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE);
    /**
     * Server global flow config.
     * 服务端流控配置
     */
    private static volatile double exceedCount = ServerFlowConfig.DEFAULT_EXCEED_COUNT;
    private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO;
    private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS;
    private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT;
    private static volatile double maxAllowedQps = ServerFlowConfig.DEFAULT_MAX_ALLOWED_QPS;
动态配置初始化
    /**
     * Namespace-specific flow config for token server.
     * Format: (namespace, config).
     */
    //token server的命名配置
    private static final Map<String, ServerFlowConfig> NAMESPACE_CONF = new ConcurrentHashMap<>();
    //服务端Transport观察者
    private static final List<ServerTransportConfigObserver> TRANSPORT_CONFIG_OBSERVERS = new ArrayList<>();
    /**
     * Property for cluster server global transport configuration.
     */
    //传输动态配置
    private static SentinelProperty<ServerTransportConfig> transportConfigProperty = new DynamicSentinelProperty<>();
    /**
     * Property for cluster server namespace set.
     */
    //namespace动态配置
    private static SentinelProperty<Set<String>> namespaceSetProperty = new DynamicSentinelProperty<>();
    /**
     * Property for cluster server global flow control configuration.
     */
    //流控规则动态配置
    private static SentinelProperty<ServerFlowConfig> globalFlowProperty = new DynamicSentinelProperty<>();
    //配置监听者
    private static final PropertyListener<ServerTransportConfig> TRANSPORT_PROPERTY_LISTENER
        = new ServerGlobalTransportPropertyListener();
    private static final PropertyListener<ServerFlowConfig> GLOBAL_FLOW_PROPERTY_LISTENER
        = new ServerGlobalFlowPropertyListener();
    private static final PropertyListener<Set<String>> NAMESPACE_SET_PROPERTY_LISTENER
        = new ServerNamespaceSetPropertyListener();
    //启动时加载,增加监听器
    static {
        transportConfigProperty.addListener(TRANSPORT_PROPERTY_LISTENER);
        globalFlowProperty.addListener(GLOBAL_FLOW_PROPERTY_LISTENER);
        namespaceSetProperty.addListener(NAMESPACE_SET_PROPERTY_LISTENER);
    }
    //省略部分代码
    
    //增加传送变更配置变更者
    public static void addTransportConfigChangeObserver(ServerTransportConfigObserver observer) {
        AssertUtil.notNull(observer, "observer cannot be null");
        TRANSPORT_CONFIG_OBSERVERS.add(observer);
    }
监听器内部类
//namespace 监听器
private static class ServerNamespaceSetPropertyListener implements PropertyListener<Set<String>> {
    @Override
    public synchronized void configLoad(Set<String> set) {
        if (set == null || set.isEmpty()) {
            RecordLog.warn("[ClusterServerConfigManager] WARN: empty initial server namespace set");
            return;
        }
        //更新
        applyNamespaceSetChange(set);
    }
    @Override
    public synchronized void configUpdate(Set<String> set) {
        // TODO: should debounce?
        applyNamespaceSetChange(set);
    }
}
private static void applyNamespaceSetChange(Set<String> newSet) {
    if (newSet == null) {
        return;
    }
    RecordLog.info("[ClusterServerConfigManager] Server namespace set will be update to: " + newSet);
    if (newSet.isEmpty()) {
        ClusterServerConfigManager.namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE);
        return;
    }
    newSet = new HashSet<>(newSet);
    // Always add the `default` namespace to the namespace set.
    newSet.add(ServerConstants.DEFAULT_NAMESPACE);
    if (embedded) {
        // In embedded server mode, the server itself is also a part of service,
        // so it should be added to namespace set.
        // By default, the added namespace is the appName.
        //嵌入模式也需要增加
        newSet.add(ConfigSupplierRegistry.getNamespaceSupplier().get());
    }
    //更新
    Set<String> oldSet = ClusterServerConfigManager.namespaceSet;
    if (oldSet != null && !oldSet.isEmpty()) {
        for (String ns : oldSet) {
            // Remove the cluster rule property for deprecated namespace set.
            if (!newSet.contains(ns)) {
                ClusterFlowRuleManager.removeProperty(ns);
                ClusterParamFlowRuleManager.removeProperty(ns);
            }
        }
    }
    ClusterServerConfigManager.namespaceSet = newSet;
    //注册规则属性,在后面会讲解
    for (String ns : newSet) {
        // Register the rule property if needed.
        ClusterFlowRuleManager.registerPropertyIfAbsent(ns);
        ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns);
        // Initialize the global QPS limiter for the namespace.
        GlobalRequestLimiter.initIfAbsent(ns);
    }
} 
//globaTransport监听器
private static class ServerGlobalTransportPropertyListener implements PropertyListener<ServerTransportConfig> {
    @Override
    public void configLoad(ServerTransportConfig config) {
        if (config == null) {
            RecordLog.warn("[ClusterServerConfigManager] Empty initial server transport config");
            return;
        }
        applyConfig(config);
    }
    @Override
    public void configUpdate(ServerTransportConfig config) {
        //应用配置
        applyConfig(config);
    }
    private synchronized void applyConfig(ServerTransportConfig config) {
        //校验配置
        if (!isValidTransportConfig(config)) {
            RecordLog.warn(
                "[ClusterServerConfigManager] Invalid cluster server transport config, ignoring: " + config);
            return;
        }
        RecordLog.info("[ClusterServerConfigManager] Updating new server transport config: " + config);
        if (config.getIdleSeconds() != idleSeconds) {
            idleSeconds = config.getIdleSeconds();
        }
        //更新server的token
        updateTokenServer(config);
    }
}
private static void updateTokenServer(ServerTransportConfig config) {
    int newPort = config.getPort();
    AssertUtil.isTrue(newPort > 0, "token server port should be valid (positive)");
    if (newPort == port) {
        return;
    }
    ClusterServerConfigManager.port = newPort;
    for (ServerTransportConfigObserver observer : TRANSPORT_CONFIG_OBSERVERS) {
        //更新,通过观察者模式更新
        observer.onTransportConfigChange(config);
    }
}
//globalFlow流控监听器  
private static class ServerGlobalFlowPropertyListener implements PropertyListener<ServerFlowConfig> {
    @Override
    public void configUpdate(ServerFlowConfig config) {
        //更新
        applyGlobalFlowConfig(config);
    }
    @Override
    public void configLoad(ServerFlowConfig config) {
        applyGlobalFlowConfig(config);
    }
    private synchronized void applyGlobalFlowConfig(ServerFlowConfig config) {
        //校验规则
        if (!isValidFlowConfig(config)) {
            RecordLog.warn(
                "[ClusterServerConfigManager] Invalid cluster server global flow config, ignoring: " + config);
            return;
        }
        RecordLog.info("[ClusterServerConfigManager] Updating new server global flow config: " + config);
        //判断有没有更新
        if (config.getExceedCount() != exceedCount) {
            exceedCount = config.getExceedCount();
        }
        if (config.getMaxOccupyRatio() != maxOccupyRatio) {
            maxOccupyRatio = config.getMaxOccupyRatio();
        }
        if (config.getMaxAllowedQps() != maxAllowedQps) {
            maxAllowedQps = config.getMaxAllowedQps();
            //调用GlobalRequestLimiter设置qps变更
            GlobalRequestLimiter.applyMaxQpsChange(maxAllowedQps);
        }
        int newIntervalMs = config.getIntervalMs();
        int newSampleCount = config.getSampleCount();
        if (newIntervalMs != intervalMs || newSampleCount != sampleCount) {
            if (newIntervalMs <= 0 || newSampleCount <= 0 || newIntervalMs % newSampleCount != 0) {
                RecordLog.warn("[ClusterServerConfigManager] Ignoring invalid flow interval or sample count");
            } else {
                intervalMs = newIntervalMs;
                sampleCount = newSampleCount;
                // Reset all the metrics.
                //重置统计指标
                ClusterMetricStatistics.resetFlowMetrics();
                ClusterParamMetricStatistics.resetFlowMetrics();
            }
        }
    }
}
public static boolean isValidTransportConfig(ServerTransportConfig config) {
    return config != null && config.getPort() > 0 && config.getPort() <= 65535;
}
public static boolean isValidFlowConfig(ServerFlowConfig config) {
    return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0
        && config.getMaxAllowedQps() >= 0
        && FlowRuleUtil.isWindowConfigValid(config.getSampleCount(), config.getIntervalMs());
}
2.3 连接器connection
这个包下主要就是集群链接器、连接器组的管理。
2.3.1 Connection接口
源码
public interface Connection extends AutoCloseable {
    //获取SocketAddress
    SocketAddress getLocalAddress();
    //port
    int getRemotePort();
    //ip
    String getRemoteIP();
    //刷新readTime
    void refreshLastReadTime(long lastReadTime);
    //获取lastReadTime
    long getLastReadTime();
    //获取链接的key
    String getConnectionKey();
}
Connection继承AutoCloseable接口后就可以自动释放资源了,JDK中的文件流操作在1.7版本后也实现了。
定义了6个方法,在子类实现。
2.3.2 NettyConnection
实例变量
- remoteIp:远程ip
- remotePort:远程port
- channel:渠道Channel
- lastReadTime:上传刷新事件
- pool:链接池
源码
//Netty链接器
public class NettyConnection implements Connection {
    private String remoteIp;
    private int remotePort;
    private Channel channel;
    private long lastReadTime;
    private ConnectionPool pool;
    //构造器,需要传入channel以及pool;ConnectionPool下面会说
    public NettyConnection(Channel channel, ConnectionPool pool) {
        this.channel = channel;
        this.pool = pool;
        //获取socketAddress
        InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
        this.remoteIp = socketAddress.getAddress().getHostAddress();
        this.remotePort = socketAddress.getPort();
        this.lastReadTime = System.currentTimeMillis();
    }
     @Override
    public SocketAddress getLocalAddress() {
        return channel.localAddress();
    }
    //省略部分代码
    @Override
    public String getConnectionKey() {
        //ip:port
        return remoteIp + ":" + remotePort;
    }
    @Override
    //实现了AutoCloseable的close方法,可以自动关闭资源
    public void close() {
        // Remove from connection pool.
        pool.remove(channel);
        // Close the connection.
        if (channel != null && channel.isActive()){
            channel.close();
        }
    }
}
2.3.3 ConnectionPool
通用连接池连接管理。
实例变量
- TIMER:初始化了一个可定时执行的线程执行器,核心线程2个
- CONNECTION_MAP:链接器保存对象:Format: ("ip:port", connection)
- scanTaskFuture:定期扫描任务
源码
public class ConnectionPool {
    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);
    /**
     * Format: ("ip:port", connection)
     */
    private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();
    /**
     * Periodic scan task.
     */
    private ScheduledFuture scanTaskFuture = null;
    //创建连接器
    public void createConnection(Channel channel) {
        if (channel != null) {
            //通过构造方法创建
            Connection connection = new NettyConnection(channel, this);
            //获取connKey并保存在CONNECTION_MAP中
            String connKey = getConnectionKey(channel);
            CONNECTION_MAP.put(connKey, connection);
        }
    }
    /**
     * Start the scan task for long-idle connections.
     */
    //启动一个定时任务,定时任务下面讲解
    private synchronized void startScan() {
        if (scanTaskFuture == null
            || scanTaskFuture.isCancelled()
            || scanTaskFuture.isDone()) {
            scanTaskFuture = TIMER.scheduleAtFixedRate(
                new ScanIdleConnectionTask(this), 10, 30, TimeUnit.SECONDS);
        }
    }
    /**
     * Format to "ip:port".
     *
     * @param channel channel
     * @return formatted key
     */
    private String getConnectionKey(Channel channel) {
        InetSocketAddress socketAddress = (InetSocketAddress)channel.remoteAddress();
        String remoteIp = socketAddress.getAddress().getHostAddress();
        int remotePort = socketAddress.getPort();
        return remoteIp + ":" + remotePort;
    }
    private String getConnectionKey(String ip, int port) {
        return ip + ":" + port;
    }
    //刷新readTime,readTime为当前时间
    public void refreshLastReadTime(Channel channel) {
        if (channel != null) {
            String connKey = getConnectionKey(channel);
            Connection connection = CONNECTION_MAP.get(connKey);
            if (connection != null) {
                connection.refreshLastReadTime(System.currentTimeMillis());
            }
        }
    }
    //获取链接,直接从map中获取
    public Connection getConnection(String remoteIp, int remotePort) {
        String connKey = getConnectionKey(remoteIp, remotePort);
        return CONNECTION_MAP.get(connKey);
    }
    public void remove(Channel channel) {
        String connKey = getConnectionKey(channel);
        CONNECTION_MAP.remove(connKey);
    }
    public List<Connection> listAllConnection() {
        List<Connection> connections = new ArrayList<Connection>(CONNECTION_MAP.values());
        return connections;
    }
    public int count() {
        return CONNECTION_MAP.size();
    }
    public void clear() {
        CONNECTION_MAP.clear();
    }
    //shoudownAll
    public void shutdownAll() throws Exception {
        for (Connection c : CONNECTION_MAP.values()) {
            c.close();
        }
    }
    //刷新定时任务
    public void refreshIdleTask() {
        if (scanTaskFuture == null || scanTaskFuture.cancel(false)) {
            startScan();
        } else {
            RecordLog.info("The result of canceling scanTask is error.");
        }
    }
}
2.3.4 ScanIdleConnectionTask
ScanIdleConnectionTask是定时任务执行实现了Runnable的方法。
源码
public class ScanIdleConnectionTask implements Runnable {
    private final ConnectionPool connectionPool;
    //构造方法
    public ScanIdleConnectionTask(ConnectionPool connectionPool) {
        this.connectionPool = connectionPool;
    }
    @Override
    public void run() {
        try {
            
            //获取链接获取时间,默认600s
            int idleSeconds = ClusterServerConfigManager.getIdleSeconds();
            long idleTimeMillis = idleSeconds * 1000;
            if (idleTimeMillis < 0) {
                idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SECONDS * 1000;
            }
            long now = System.currentTimeMillis();
            //拿到所有的连接
            List<Connection> connections = connectionPool.listAllConnection();
            for (Connection conn : connections) {
                //如果当前时间-上次readTime大于活跃时间,说明链接可以关闭了
                if ((now - conn.getLastReadTime()) > idleTimeMillis) {
                    RecordLog.info(
                        String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. "
                            + "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds)
                    );
                    //关闭链接
                    conn.close();
                }
            }
        } catch (Throwable t) {
            RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t);
        }
    }
}
2.3.5 连接组ConnectionGroup
连接组可以理解就是连接connection的集合。
实例变量
- namespace:命名空间
- connectionSet:ConnectionDescriptor的集合,ConnectionDescriptor的实例变量是address、host。
- connectedCount:连接次数,定义的是AtomicInteger类型
源码分析
public class ConnectionGroup {
    private final String namespace;
    private final Set<ConnectionDescriptor> connectionSet = Collections.synchronizedSet(new HashSet<ConnectionDescriptor>());
    private final AtomicInteger connectedCount = new AtomicInteger();
    //带namespace构造函数
    public ConnectionGroup(String namespace) {
        AssertUtil.notEmpty(namespace, "namespace cannot be empty");
        this.namespace = namespace;
    }
    //无参构造函数
    public ConnectionGroup() {
        this(ServerConstants.DEFAULT_NAMESPACE);
    }
    //增加连接
    public ConnectionGroup addConnection(String address) {
        AssertUtil.notEmpty(address, "address cannot be empty");
        //获取host,若是ip:port形式,就只需要ip
        String[] ip = address.split(":");
        String host;
        if (ip != null && ip.length >= 1) {
            host = ip[0];
        } else {
            host = address;
        }
        
        //已经重写的equals、hashCode方法
        boolean newAdded = connectionSet.add(new ConnectionDescriptor().setAddress(address).setHost(host));
        //增加成功,连接数加1
        if (newAdded) {
            connectedCount.incrementAndGet();
        }
        return this;
    }
    //移除连接
    public ConnectionGroup removeConnection(String address) {
        AssertUtil.notEmpty(address, "address cannot be empty");
        if (connectionSet.remove(new ConnectionDescriptor().setAddress(address))) {
            connectedCount.decrementAndGet();
        }
        return this;
    }
}
2.3.6 连接管理ConnectionManager
顾名思义ConnectionManager是对连接的管
实例变量
- CONN_MAP:Connection map (namespace, connection).
- NAMESPACE_MAP:namespace map (address, namespace).
源码分析
public final class ConnectionManager {
    /**
     * Connection map (namespace, connection).
     */
    private static final Map<String, ConnectionGroup> CONN_MAP = new ConcurrentHashMap<>();
    /**
     * namespace map (address, namespace).
     */
    private static final Map<String, String> NAMESPACE_MAP = new ConcurrentHashMap<>();
    /**
     * Get connected count for specific namespace.
     *
     * @param namespace namespace to check
     * @return connected count for specific namespace
     */
    //获取namespace的连接数
    public static int getConnectedCount(String namespace) {
        AssertUtil.notEmpty(namespace, "namespace should not be empty");
        ConnectionGroup group = CONN_MAP.get(namespace);
        return group == null ? 0 : group.getConnectedCount();
    }
    //查询获取创建连接组,注意:连接没有创建
    public static ConnectionGroup getOrCreateGroup(String namespace) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        ConnectionGroup group = CONN_MAP.get(namespace);
        if (group == null) {
            //synchronized锁住,防止并发问题
            synchronized (CREATE_LOCK) {
                if ((group = CONN_MAP.get(namespace)) == null) {
                    //创建并保存在CONN_MAP方便获取
                    group = new ConnectionGroup(namespace);
                    CONN_MAP.put(namespace, group);
                }
            }
        }
        return group;
    }
    //移除连接
    public static void removeConnection(String address) {
        AssertUtil.assertNotBlank(address, "address should not be empty");
        String namespace = NAMESPACE_MAP.get(address);
        if (namespace != null) {
            ConnectionGroup group = CONN_MAP.get(namespace);
            if (group == null) {
                return;
            }
            
            //调用ConnectionGroup的方法,上面讲解过,其实就是移除保存在set里面的方法,
            group.removeConnection(address);
            RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace);
        }
        //map中移除
        NAMESPACE_MAP.remove(address);
    }
    //传入两个参数移除,少走一步获取namespace
    public static void removeConnection(String namespace, String address) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        AssertUtil.assertNotBlank(address, "address should not be empty");
        ConnectionGroup group = CONN_MAP.get(namespace);
        if (group == null) {
            return;
        }
        group.removeConnection(address);
        NAMESPACE_MAP.remove(address);
        RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace);
    }
    //增加连接,有address参数,需要把address增加到NAMESPACE_MAP map中,并增加连接次数
    public static ConnectionGroup addConnection(String namespace, String address) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        AssertUtil.assertNotBlank(address, "address should not be empty");
        ConnectionGroup group = getOrCreateGroup(namespace);
        group.addConnection(address);
        NAMESPACE_MAP.put(address, namespace);
        RecordLog.info("[ConnectionManager] Client <{0}> registered with namespace <{1}>", address, namespace);
        return group;
    }
    //增加连接
    public static ConnectionGroup getOrCreateConnectionGroup(String namespace) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        ConnectionGroup group = getOrCreateGroup(namespace);
        return group;
    }
    // 拿到连接组
    public static ConnectionGroup getConnectionGroup(String namespace) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        ConnectionGroup group = CONN_MAP.get(namespace);
        return group;
    }
    static void clear() {
        CONN_MAP.clear();
        NAMESPACE_MAP.clear();
    }
    private static final Object CREATE_LOCK = new Object();
    private ConnectionManager() {}
}
2.4 处理器processor
这个包下主要是流控的处理器,分别有普通限流和热点限流处理器
2.4.1 接口RequestProcessor
public interface RequestProcessor<T, R> {
    /**
     * Process the cluster request.
     *
     * @param request Sentinel cluster request
     * @return the response after processed
     */
     //处理请求
     //有两个实现类,分别是FlowRequestProcessor,ParamFlowRequestProcessor
    ClusterResponse<R> processRequest(ClusterRequest<T> request);
}
2.4.2 RequestProcessorProvider
请求流控提供者,类似于工厂类
实例变量
- PROCESSOR_MAP:请求类型对应的请求处理器
- SERVICE_LOADER:通过ServiceLoader加载流控实现类
源码分析
public final class RequestProcessorProvider {
    private static final Map<Integer, RequestProcessor> PROCESSOR_MAP = new ConcurrentHashMap<>();
    //默认配置的实现类有FlowRequestProcessor,ParamFlowRequestProcessor
    private static final ServiceLoader<RequestProcessor> SERVICE_LOADER = ServiceLoaderUtil.getServiceLoader(
        RequestProcessor.class);
    //静态代码快,类启动是会加载
    static {
        loadAndInit();
    }
    private static void loadAndInit() {
        for (RequestProcessor processor : SERVICE_LOADER) {
            Integer type = parseRequestType(processor);
            if (type != null) {
                //放入map中
                PROCESSOR_MAP.put(type, processor);
            }
        }
    }
    //获取RequestType
    private static Integer parseRequestType(RequestProcessor processor) {
        //配置在注解上
        RequestType requestType = processor.getClass().getAnnotation(RequestType.class);
        if (requestType != null) {
            return requestType.value();
        } else {
            return null;
        }
    }
    //获取RequestProcessor
    public static RequestProcessor getProcessor(int type) {
        return PROCESSOR_MAP.get(type);
    }
    static void addProcessorIfAbsent(int type, RequestProcessor processor) {
        // TBD: use putIfAbsent in JDK 1.8.
        if (PROCESSOR_MAP.containsKey(type)) {
            return;
        }
        PROCESSOR_MAP.put(type, processor);
    }
    static void addProcessor(int type, RequestProcessor processor) {
        AssertUtil.notNull(processor, "processor cannot be null");
        PROCESSOR_MAP.put(type, processor);
    }
    private RequestProcessorProvider() {}
}
2.4.3 FlowRequestProcessor
源码分析
@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
        //获取TokenService,配置的是DefaultTokenService
        TokenService tokenService = TokenServiceProvider.getService();
        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        boolean prioritized = request.getData().isPriority();
        //获取请求token
        TokenResult result = tokenService.requestToken(flowId, count, prioritized);
        //解析响应结果
        return toResponse(result, request);
    }
    private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
        return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
            new FlowTokenResponseData()
                .setRemainingCount(result.getRemaining())
                .setWaitInMs(result.getWaitInMs())
        );
    }
}
2.4.4 ParamFlowRequestProcessor
源码分析
@RequestType(ClusterConstants.MSG_TYPE_PARAM_FLOW)
public class ParamFlowRequestProcessor implements RequestProcessor<ParamFlowRequestData, FlowTokenResponseData> {
    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<ParamFlowRequestData> request) {
        TokenService tokenService = TokenServiceProvider.getService();
        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        Collection<Object> args = request.getData().getParams();
        //请求热点参数
        TokenResult result = tokenService.requestParamToken(flowId, count, args);
        //流控接口解析
        return toResponse(result, request);
    }
    private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
        return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
            new FlowTokenResponseData()
                .setRemainingCount(result.getRemaining())
                .setWaitInMs(0)
        );
    }
}
流控解析,下一面讲解。
2.5 启动加载器DefaultClusterServerInitFunc
源码
//实现了InitFunc,InitFunc会在系统启动时加载
public class DefaultClusterServerInitFunc implements InitFunc {
    @Override
    public void init() throws Exception {
        //初始化Decoders
        initDefaultEntityDecoders();
        //初始化wriders
        initDefaultEntityWriters();
        //初始化processors
        initDefaultProcessors();
        // Eagerly-trigger the SPI pre-load of token service.
        // 这个时候就把TokenService加载好了
        TokenServiceProvider.getService();
        RecordLog.info("[DefaultClusterServerInitFunc] Default entity codec and processors registered");
    }
    private void initDefaultEntityWriters() {
        //ping
        ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PING, new PingResponseDataWriter());
        //流控Writer
        ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_FLOW, new FlowResponseDataWriter());
        //热点参数Writer和Flow一样
        ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PARAM_FLOW, new FlowResponseDataWriter());
    }
    private void initDefaultEntityDecoders() {
        //ping
        RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PING, new PingRequestDataDecoder());
        //普通Flow
        RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_FLOW, new FlowRequestDataDecoder());
        //热点参数
        RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PARAM_FLOW, new ParamFlowRequestDataDecoder());
    }
    private void initDefaultProcessors() {
        // Eagerly-trigger the SPI pre-load.
        //获取默认,实际上就是加载了,这个类在上面已经讲过了
        RequestProcessorProvider.getProcessor(0);
    }
}
2.6 TokenServiceProvider
类似于RequestProcessor,可以了解到sentinel的源码作者,习惯于使用Provider作为一个工厂使用。
源码
public final class TokenServiceProvider {
    private static TokenService service = null;
    static {
        resolveTokenServiceSpi();
    }
    public static TokenService getService() {
        return service;
    }
    private static void resolveTokenServiceSpi() {
        //加载TokenServeice,若不存在的则使用默认的DefalutTokenService,这里使用的就是默认的
        service = SpiLoader.loadFirstInstanceOrDefault(TokenService.class, DefaultTokenService.class);
        if (service != null) {
            RecordLog.info("[TokenServiceProvider] Global token service resolved: "
                + service.getClass().getCanonicalName());
        } else {
            RecordLog.warn("[TokenServiceProvider] Unable to resolve TokenService: no SPI found");
        }
    }
}
3、其他内容
- ClusterTokenServer:TokenServer相关的在后面与client关联时再讲解,这边有NettyTransportServer、SentinelDefaultTokenServer、DefaultEmbeddedTokenServer类,以及handler包下的TokenServerHandler。
- log下的ClusterServerStatLogUtil主要用于集群流控记录日志的
- command.handler包下类主要用于管理中心动态配置的查询与变更的交互:如流控规则、监控指标、集群配置等