eureka 源码阅读

Eureka 是什么?要解决什么问题?

Eureka 是 REST 风格的服务,主要是用于定位服务,目的是解决中间层服务的负载均衡故障转移

Eureka 整体结构

eureka_architecture.png

角色划分

  • Eureka Server: Eureka 服务端

  • Eureka Client:Eureka 客户端

    • Application Service: 服务生产者

    • Application Client:服务消费者

功能描述

服务注册 Register:Client 端主动向 Server 端注册。提供自身的信息,比如 IP 地址、端口,主页…等

心跳续约 Renew:Client 默认会每隔30秒发送一次心跳到 Server 来完成续约。这样来告诉Server说我还活着

主动下线 Cancel:Client 端在停机时(正常停止)主动向 Server 发送下线,告诉 Server 把自己剔除

获取服务列表 Get Registry:Client 从 Server 获取注册表,并将其缓存在本地。缓存信息默认每 30s 更新一次。

注册表同步 Replicate:多个 Server 之间通过 P2P 复制的方式完成服务注册表的同步。

服务剔除 Eviction:在默认的情况下,当 Client 连续 90 秒没有向 Server 发送服务续约,Server 会将该 Client 从服务注册列表删除。

Eureka 核心概念

实例 Instance: 对应类:InstanceInfo,InstanceInfo 代表一个实例的信息,趋近 POJO。

应用 Application: 对应类:Application,同名 InstanceInfo 集合

注册表 Applications: 对应类:Applications,Application 集合

Eureka 组件

定时任务组件 TimedSupervisorTask

TimedSupervisorTask 是一个固定间隔的周期性任务(继承了Runable接口),一旦遇到超时,下一次执行的间隔时间就会翻倍,如果继续超时,继续翻倍,直到达到设置的上限(达到上限之后,以上限作为固定时间间隔),当任务正常执行之后,时间间隔又会还原成初始值

public class TimedSupervisorTask extends TimerTask {

    private final String name;
    private final ScheduledExecutorService scheduler; // 延迟执行 this 线程池
    private final ThreadPoolExecutor executor; // task执行线程池
    private final long timeoutMillis; // 默认超时时间 也是默认延迟时间
    private final Runnable task; // 具体task
    private final AtomicLong delay; // 延迟时间
    private final long maxDelay; // 最大延迟时间

    @Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);
            delay.set(timeoutMillis);
        } catch (TimeoutException e) {
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) { // 略
        } catch (Throwable e) { // 略
        } finally {
            if (future != null) { future.cancel(true); }
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}
image-20211006151251101.png

例如起始间隔是2秒,上限是15秒,效果如下

image-20211006153716010.png

集群解析器 ClusterResolver

// 接口定义, 可以获取 EurekaEndpoint 列表,即服务接口地址
public interface ClusterResolver<T extends EurekaEndpoint> {
    String getRegion();
    List<T> getClusterEndpoints();
}

EurekaEndpoint 可以理解成 Server 接口地址的描述


image-20211006161408857.png

基于配置的集群解析器 ConfigClusterResolver

ConfigClusterResolver 基于配置service-url,实现获取 EurekaEndpoint 列表

@Override
public List<AwsEndpoint> getClusterEndpoints() {
    if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
        return getClusterEndpointsFromDns();
    } else {
        return getClusterEndpointsFromConfig();
    }
}
private List<AwsEndpoint> getClusterEndpointsFromConfig() {
    String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
    String myZone = InstanceInfo.getZone(availZones, myInstanceInfo);

    Map<String, List<String>> serviceUrls = EndpointUtils
            .getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka());

    List<AwsEndpoint> endpoints = new ArrayList<>();
    for (String zone : serviceUrls.keySet()) {
        for (String url : serviceUrls.get(zone)) {
            try {
                endpoints.add(new AwsEndpoint(url, getRegion(), zone));
            } catch (Exception ignore) {
                logger.warn("Invalid eureka server URI: {}; removing from the server pool", url);
            }
        }
    }
}

Zone亲和的集群解析器 ZoneAffinityClusterResolver

ZoneAffinityClusterResolver 持有 ConfigClusterResolver 对象,是静态代理,对 ConfigClusterResolver 返回的 EurekaEndpoint 做了随机排序。

public List<AwsEndpoint> getClusterEndpoints() {

    // 获取当前zone和其它zone的Endpoint
    List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);
    List<AwsEndpoint> myZoneEndpoints = parts[0];
    List<AwsEndpoint> remainingEndpoints = parts[1];
    List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);
    
    // 如果非zone亲和,倒序排列
    if (!zoneAffinity) {
        Collections.reverse(randomizedList);
    }
    return randomizedList;
}

// 将Endpoint随机排列并合并,合并策略是将当前zone的Endpoint放在其它zone的Endpoint前面
private List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) {
    if (myZoneEndpoints.isEmpty()) {
        return randomizer.randomize(remainingEndpoints);
    }
    if (remainingEndpoints.isEmpty()) {
        return randomizer.randomize(myZoneEndpoints);
    }
    List<AwsEndpoint> mergedList = randomizer.randomize(myZoneEndpoints);
    mergedList.addAll(randomizer.randomize(remainingEndpoints));
    return mergedList;
}

异步解析器 AsyncResolver

异步使用定时任务组件 TimedSupervisorTask 实现

具体解析过程由委托对象 ZoneAffinityClusterResolver 完成

public class AsyncResolver<T extends EurekaEndpoint> implements ClosableResolver<T> {
    private static final Logger logger = LoggerFactory.getLogger(AsyncResolver.class);

    private final AtomicBoolean warmedUp = new AtomicBoolean(false);
    private final AtomicBoolean scheduled = new AtomicBoolean(false);
    private final String name;    // a name for metric purposes
    private final ClusterResolver<T> delegate;  // 上文提到 ZoneAffinityClusterResolver
    private final ScheduledExecutorService executorService;
    private final ThreadPoolExecutor threadPoolExecutor;
    private final TimedSupervisorTask backgroundTask; // 上文提到 TimedSupervisorTask
    private final AtomicReference<List<T>> resultsRef; // 缓存
    private final int refreshIntervalMs;
    private final int warmUpTimeoutMs;
    private volatile long lastLoadTimestamp = -1;

    // 构造方法,初始化各种线程池
    AsyncResolver(String name,
                  ClusterResolver<T> delegate,
                  List<T> initialValue,
                  int executorThreadPoolSize,
                  int refreshIntervalMs,
                  int warmUpTimeoutMs) {
        this.name = name;
        this.delegate = delegate;
        this.refreshIntervalMs = refreshIntervalMs;
        this.warmUpTimeoutMs = warmUpTimeoutMs;

        this.executorService = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("AsyncResolver-" + name + "-%d")
                        .setDaemon(true)
                        .build());

        this.threadPoolExecutor = new ThreadPoolExecutor(
                1, executorThreadPoolSize, 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),  // use direct handoff
                new ThreadFactoryBuilder()
                        .setNameFormat("AsyncResolver-" + name + "-executor-%d")
                        .setDaemon(true)
                        .build()
        );

        this.backgroundTask = new TimedSupervisorTask(
                this.getClass().getSimpleName(),
                executorService,
                threadPoolExecutor,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS,
                5,
                updateTask
        );

        this.resultsRef = new AtomicReference<>(initialValue);
        Monitors.registerObject(name, this);
    }

    @Override
    public List<T> getClusterEndpoints() {
        long delay = refreshIntervalMs;
        // 如果未预热,开始预热,并标记已预热
        if (warmedUp.compareAndSet(false, true)) {
            if (!doWarmUp()) {
                delay = 0;
            }
        }
        
        // 如果未开始过定时任务,开始定时任务并做标记已开始
        if (scheduled.compareAndSet(false, true)) {
            scheduleTask(delay);
        }
        return resultsRef.get();
    }
    
    void scheduleTask(long delay) {
        executorService.schedule(backgroundTask, delay, TimeUnit.MILLISECONDS);
    }

    // 具体任务,获取 EurekaEndpoint 
    private final Runnable updateTask = new Runnable() {
        @Override
        public void run() {
            try {
                // ZoneAffinityClusterResolver 获取 EurekaEndpoint 列表
                List<T> newList = delegate.getClusterEndpoints();
                if (newList != null) {
                    resultsRef.getAndSet(newList);
                    lastLoadTimestamp = System.currentTimeMillis();
                } else {
                    logger.warn("Delegate returned null list of cluster endpoints");
                }
                logger.debug("Resolved to {}", newList);
            } catch (Exception e) {
                logger.warn("Failed to retrieve cluster endpoints from the delegate", e);
            }
        }
    };
}
image-20211105151013103.png

优秀的代理使用

通信

底层通信接口 EurekaHttpClient


image-20211006163632598.png

image-20211006163928620.png

EurekaHttpClient 系统的实现图 如上,左边是 Eureka 自身使用 Jersey 实现的Rest风格, 右边是 Spring Cloud 使用 RestTemplate 实现 Rest 风格

http 访问实际是通过 apache http client 实现,此处忽略。

右边这一块通过命名可以看出使用的是一个装饰器模式。

SessionedEurekaHttpClient:实现重连接,隔一段时间会重新断开连接,并重新建立,实现客户端负载均衡

RetryableEurekaHttpClient: 重试一定次数,并有对 server 的黑名单机制

RedirectingEurekaHttpClient:实现重定向

RestTemplateTransportClient: 基于RestTemplate的 Rest 风格的访问

SessionedEurekaHttpClient 核心代码

@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    long now = System.currentTimeMillis();
    long delay = now - lastReconnectTimeStamp;
    // 当 (now - lastReconnectTimeStamp)大于 currentSessionDurationMs 时,
    // 端开连接并将eurekaHttpClientRef置null
    // currentSessionDurationMs 取值范围:sessionDurationMs+-[0, sessionDurationMs/2]
    if (delay >= currentSessionDurationMs) {
        logger.debug("Ending a session and starting anew");
        lastReconnectTimeStamp = now;
        currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
        TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
    }

    EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
    if (eurekaHttpClient == null) {
        eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
    }
    return requestExecutor.execute(eurekaHttpClient);
}

RetryableEurekaHttpClient 核心代码

private List<EurekaEndpoint> getHostCandidates() {
    // 黑名单: quarantineSet 候选名单:candidateHosts 
    // 候选名单顺序是随机的 clusterResolver 为 ZoneAffinityClusterResolver
    // 黑名单对候选名单求交集,确保黑名单元素都在候选名单中
    List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
    quarantineSet.retainAll(candidateHosts);

    // 算出阈值, 默认阈值比例 0.66
    // 如果计算结果过大,则为candidateHosts.size
    int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
    if (threshold > candidateHosts.size()) {
        threshold = candidateHosts.size();
    }
    
    // 如黑名单多余阈值,则将黑名单清空,即如Server损坏太多,清空黑名单,这就是黑名单恢复机制
    // 如没有达到阈值,正常过滤
    if (quarantineSet.isEmpty()) {
        // no-op
    } else if (quarantineSet.size() >= threshold) {
        logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
        quarantineSet.clear();
    } else {
        List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
        for (EurekaEndpoint endpoint : candidateHosts) {
            if (!quarantineSet.contains(endpoint)) {
                remainingHosts.add(endpoint);
            }
        }
        candidateHosts = remainingHosts;
    }

    return candidateHosts;
}

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    List<EurekaEndpoint> candidateHosts = null;
    // 按顺序连接 Server
    int endpointIdx = 0;
    // 重试次数限制 numberOfRetries 默认3
    for (int retry = 0; retry < numberOfRetries; retry++) {
        // 如果缓存的  currentHttpClient 不存在, 则通过候选名单建立新连接
        EurekaHttpClient currentHttpClient = delegate.get();
        EurekaEndpoint currentEndpoint = null;
        if (currentHttpClient == null) {
            if (candidateHosts == null) {
                candidateHosts = getHostCandidates();
                if (candidateHosts.isEmpty()) {
                    throw new TransportException("cluster server list is empty");
                }
            }
            if (endpointIdx >= candidateHosts.size()) {
                throw new TransportException("Cannot execute request on any known server");
            }
            // 首次 或者 上一次请求失败 会重新复制endpointIdx,连接候选列表中的下个实例
            currentEndpoint = candidateHosts.get(endpointIdx++);
            currentHttpClient = clientFactory.newClient(currentEndpoint);
        }

        try {
            EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
            // 访问成功, 返回结束,否则会继续下次循环,并将当前server放入黑名单
            if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                delegate.set(currentHttpClient);
                return response;
            }
        } catch (Exception e) {
            logger.warn("Request execution failed with message: {}", e.getMessage());
        }
        delegate.compareAndSet(currentHttpClient, null);
        if (currentEndpoint != null) {
            quarantineSet.add(currentEndpoint);
        }
    }
    // 如果重试次数达到限制仍然失败,抛出异常
    throw new TransportException("Retry limit reached; giving up on completing the request");
}

DiscoveryClient 静态内部类 EurekaTransport 集合了所有访问 Server 的路径。

初始化代码在 DiscoveryClient # scheduleServerEndpointTask, 初始化结果是

结果是

private static final class EurekaTransport {
    private ClosableResolver bootstrapResolver;  // 即异步解析器 AsyncResolver 
    // 工厂类RestTemplateTransportClientFactory
    private TransportClientFactory transportClientFactory; 

    // 注册使用的EurekaHttpClient 与对应工厂, 实际为 SessionedEurekaHttpClient
    private EurekaHttpClient registrationClient;
    private EurekaHttpClientFactory registrationClientFactory;

    // 同上
    private EurekaHttpClient queryClient;
    private EurekaHttpClientFactory queryClientFactory;
}

最终由EurekaHttpClients#canonicalClientFactory 实现,可以看到此处返回的是 SessionedEurekaHttpClient SessionedEurekaHttpClient 持有 RetryableEurekaHttpClientFactory,

RetryableEurekaHttpClientFactory 持有 RedirectingEurekaHttpClientFactory,

RedirectingEurekaHttpClientFactory持有 TransportClientFactory(RestTemplateTransportClientFactory)

static EurekaHttpClientFactory canonicalClientFactory(final String name,
                                                      final EurekaTransportConfig transportConfig,
                                                      final ClusterResolver<EurekaEndpoint> clusterResolver,
                                                      final TransportClientFactory transportClientFactory) {

    return new EurekaHttpClientFactory() {
        @Override
        public EurekaHttpClient newClient() {
            return new SessionedEurekaHttpClient(
                    name,
                    RetryableEurekaHttpClient.createFactory(
                            name,
                            transportConfig,
                            clusterResolver,                            RedirectingEurekaHttpClient.createFactory(transportClientFactory),
                            ServerStatusEvaluators.legacyEvaluator()),
                    transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000
            );
        }

        @Override
        public void shutdown() {
            wrapClosable(clusterResolver).shutdown();
        }
    };
}

Eureka Instance

InstanceInfo 描述实例本身, 如instanceId, 应用名称,host、租约信息等。

public class InstanceInfo {
    private volatile String instanceId; // 唯一
    private volatile String appName;
    private volatile String appGroupName;
    private volatile String ipAddr;
    private volatile String sid = SID_DEFAULT;
    private volatile int port = DEFAULT_PORT;
    private volatile int securePort = DEFAULT_SECURE_PORT;
    private volatile String homePageUrl;
    private volatile String statusPageUrl;
    private volatile String healthCheckUrl;
    private volatile String secureHealthCheckUrl;
    private volatile String vipAddress;
    private volatile String secureVipAddress;
    private String statusPageRelativeUrl;
    private String statusPageExplicitUrl;
    private String healthCheckRelativeUrl;
    private String healthCheckSecureExplicitUrl;
    private String vipAddressUnresolved;
    private String secureVipAddressUnresolved;
    private String healthCheckExplicitUrl;
    private volatile int countryId = DEFAULT_COUNTRY_ID; // Defaults to US
    private volatile boolean isSecurePortEnabled = false;
    private volatile boolean isUnsecurePortEnabled = true;
    private volatile DataCenterInfo dataCenterInfo;
    private volatile String hostName;
    // starting 初始化、down健康检查失败、up正常服务、out_of_service 正常服务但是不接受请求 unkown 未知状态
    private volatile InstanceStatus status = InstanceStatus.UP;
    // 覆盖状态
    private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
    // 标注实例是否脏(client对比server) true表示需要向服务端注册
    private volatile boolean isInstanceInfoDirty = false;
    // 租约信息
    private volatile LeaseInfo leaseInfo;
    private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
    // 元数据
    private volatile Map<String, String> metadata;
    private volatile Long lastUpdatedTimestamp;
    private volatile Long lastDirtyTimestamp;
    private volatile ActionType actionType;
    private volatile String asgName;
    private String version = VERSION_UNKNOWN;
}
public class InstanceInfoFactory {
   public InstanceInfo create(EurekaInstanceConfig config) {
   。。。
   }
}

Eureka Client 实现

核心逻辑DiscoveryClient

// 初始化所有定时任务
private void initScheduledTasks() {
    // 拉取注册表
    if (clientConfig.shouldFetchRegistry()) {
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        // TimedSupervisorTask 定时任务, task 是 CacheRefreshThread
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        // scheduler 触发延时 cacheRefreshTask
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    // 需要注册到 Server
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // TimedSupervisorTask 定时任务, task 是 HeartbeatThread
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        );
        // scheduler 触发延时 cacheRefreshTask
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // 服务复制器
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
                
        // 状态变更监听器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                instanceInfoReplicator.onDemandUpdate();
            }
        };
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        // 服务复制开始
        instanceInfoReplicator.start(
        clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

缓存刷新 CacheRefreshThread

定时拉取增量数据更新本地缓存

心跳任务 HeartbeatThread

定时发送续约

服务复制器 InstanceInfoReplicator

当dirty时,重新注册

状态变更监听器 ApplicationInfoManager.StatusChangeListener

执行一次 InstanceInfoReplicator#run

Eureka Server 实现

Eureka 可视化仪表盘 EurekaController

Spring Cloud 提供了可视化的页面,可以通过 EurekaDashboardProperties 配置。

image-20211007155132174.png

资源层

使用Jersey实现的Restful web 服务,类似 @RestController


image-20211013212732106.png

ApplicationsResource

ApplicationResource

InstanceResource

InstancesResource

PeerReplicationResource

响应缓存实现-二级缓存 ResponseCacheImpl

逻辑图

一级缓存 只读缓存 ConcurrentMap:ConcurrentMap<Key, Value> readOnlyCacheMap

二级缓存 读写缓存 guaua:LoadingCache<Key, Value> readWriteCacheMap


image-20211013212144819.png

先看构造代码

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;
    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    
    this.readWriteCacheMap =CacheBuilder.newBuilder()
    .initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
    // key 删除监听,将 Key 从 regionSpecificKeys 移除
    .removalListener(new RemovalListener<Key, Value>() {
        @Override
        public void onRemoval(RemovalNotification<Key, Value> notification) {
            Key removedKey = notification.getKey();
            if (removedKey.hasRegions()) {
                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
            }
        }
    })
    // CacheLoader 缓存加载
    // Key 放入 regionSpecificKeys
    // 从registry 获取 Value 放入缓存
    .build(new CacheLoader<Key, Value>() {
        @Override
        public Value load(Key key) throws Exception {
            if (key.hasRegions()) {
                Key cloneWithNoRegions = key.cloneWithoutRegions();
                regionSpecificKeys.put(cloneWithNoRegions, key);
            }
            // 从注册表获取Value缓存
            Value value = generatePayload(key);
            return value;
        }
    });
    
    // 如果使用只读缓存更新定时任务
    if (shouldUseReadOnlyResponseCache) {
        timer.schedule(getCacheUpdateTask(),
                new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                        + responseCacheUpdateIntervalMs),
                responseCacheUpdateIntervalMs);
    }
    
    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
    }
}

再看获取缓存方法

Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        // 如果使用了只读缓存 readOnlyCacheMap,则优先从只读缓存读取,如果不存在,在从读写缓存读取,并更新只读缓存
        // 如果不使用只读缓存, 则直接从 读写缓存中读取
        if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key : {}", key, t);
    }
    return payload;
}

注册表实现

image-20211014103612323.png

LookupService 服务发现

LeaseManager 租约行为

AbstractInstanceRegistry 接口核心逻辑实现

PeerAwareInstanceRegistryImpl 同步复制

InstanceRegistry 发布 Spring 事件

public interface LookupService<T> {
    Application getApplication(String appName);
    Applications getApplications();
    List<InstanceInfo> getInstancesById(String id);
    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}
public interface LeaseManager<T> {
    void register(T r, int leaseDuration, boolean isReplication);
    boolean cancel(String appName, String id, boolean isReplication);
    boolean renew(String appName, String id, boolean isReplication);
    void evict();
}

注册表存储registry 本质上是一个双层的 ConcurrentHashMap,存储在内存中的。AbstractInstanceRegistry#registry

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  • 第一层的 key 是appName,value 是第二层 ConcurrentHashMap;
  • 第二层 ConcurrentHashMap 的 key 是服务的 InstanceId,value 是 Lease 对象,Lease 对象包含了实例详情和服务治理相关的属性。

服务注册 register

image-20211105164240817.png
InstanceRegistry
public void register(InstanceInfo info, boolean isReplication) {
    // 发送事件 EurekaInstanceRegisteredEvent
    handleRegistration(info, leaseDuration, isReplication);
    super.register(info, leaseDuration, isReplication);
}

PeerAwareInstanceRegistryImpl
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    // 复制同步到其它节点
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

AbstractInstanceRegistry
// registrant 注册对象、leaseDuration 租约时间、isReplication 是否来自eureka-server复制
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    // 获取读锁,即读取操作不受阻塞,写操作会阻塞。
    read.lock();
    try {
        //gMap是一个CurrentHashMap, 注册表获取应用所有租约信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        // 如果 gMap 为null, 则新建gNewMap放入注册表registry,并赋值给 gMap
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        // 获取对应实例的租约信息, 
        // 如果存在且已存在的lastdirtytime > 待注册的实例lastdirtytime, 使用已存在的实例
        // 如果不存在,认为是新注册实例,
        // expectedNumberOfClientsSendingRenews+1并更新numberOfRenewsPerMinThreshold
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                registrant = existingLease.getHolder();
            }
        } else {
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();
                }
            }
        }
        // 用注册对象,构建Lease,并放入gMap
        Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        // This is where the initial state transfer of overridden status happens
        // 如果覆盖状态不是 UNKNOWN,则需要处理覆盖状态
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            // 覆盖状态overriddenInstanceStatusMap不包含注册实例,则添加
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        // 如果 注册实例存在覆盖状态,则注册实例使用覆盖状态替换自身的覆盖状态
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // 根据覆盖状态规则,获取状态,并无脏更新状态
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        // 如果注册实例状态为UP, 更新lease serviceUpTimestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        // 将lease 封装成最近变更对象 RecentlyChangedItem,放入最近更新队列
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        // 使 二级缓存中的读写缓存失效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
        read.unlock();
    }
}

服务续约 renew

image-20211017210930371.png
InstanceRegistry
public boolean renew(final String appName, final String serverId, boolean isReplication) {
   // 获取字典排序的注册表
   // 遍历注册表 找到对应实例,并发布事件 EurekaInstanceRenewedEvent
   List<Application> applications = getSortedApplications();
   for (Application input : applications) {
      if (input.getName().equals(appName)) {
         InstanceInfo instance = null;
         for (InstanceInfo info : input.getInstances()) {
            if (info.getId().equals(serverId)) {
               instance = info;
               break;
            }
         }
         publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
         break;
      }
   }
   return super.renew(appName, serverId, isReplication);
}

PeerAwareInstanceRegistryImpl
public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
        // 续约动作成功,则复制同步到其它eureka-server节点
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

AbstractInstanceRegistry
public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    // 如果注册表中 renew 对象的Lease 为空,则返回false
    // 如果 Lease 非空, 则继续 renew 动作
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        return false;
    } else {
        // 获取 renew 对应的实例
        // 
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // 获取覆盖状态,如果覆盖状态为UNKNOWN, 返回false
            // 如果覆盖状态非UNKNOWN,则更新实例状态为覆盖状态
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        renewsLastMin.increment();
        // 续约动作 lastUpdateTimestamp = System.currentTimeMillis() + duration
        leaseToRenew.renew();
        return true;
    }
}

服务下线 cancel

image-20211017212221678.png
InstanceRegistry
public boolean cancel(String appName, String serverId, boolean isReplication) {
   // 发送事件 EurekaInstanceCanceledEvent
   handleCancelation(appName, serverId, isReplication);
   return super.cancel(appName, serverId, isReplication);
}

PeerAwareInstanceRegistryImpl
public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        // 成功撤销服务,则复制同步其它eureka-server
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

AbstractInstanceRegistry
protected boolean internalCancel(String appName, String id, boolean isReplication) {
    // 获取读锁,即读取操作不受阻塞,写操作会阻塞。
    read.lock();
    try {
        CANCEL.increment(isReplication);
        // 从注册表找到 cancel 的Lease,并从注册表删除
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        // 从overriddenInstanceStatusMap删除cancel的id
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {

        }
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            return false;
        } else {
            // 下线动作 evictionTimestamp = System.currentTimeMillis()
            // 将下线的lease封装成RecentlyChangedItem,放入最近变更队列
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            // 使 二级缓存中的读写缓存失效
            invalidateCache(appName, vip, svip);
        }
    } finally {
        read.unlock();
    }
    // 下线之后,
    // expectedNumberOfClientsSendingRenews-1并更新numberOfRenewsPerMinThreshold
    synchronized (lock) {
        if (this.expectedNumberOfClientsSendingRenews > 0) {
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
            updateRenewsPerMinThreshold();
        }
    }
    return true;
}

增量获取

最近变更队列 ConcurrentLinkedQueue, 由定时任务更新,定时任务遍历增量队列找到保留时间达到限制的元素删除。

private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<>();
// 构造代码
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                serverConfig.getDeltaRetentionTimerIntervalInMs(),
                serverConfig.getDeltaRetentionTimerIntervalInMs())

private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                // 如果最近更新时间 < 当前时间 - 增量保留时间配置,删除
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }
    };
}

获取增量, 遍历增量队列,将增量队列中实例添加到返回值中

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    if (null == remoteRegions) {
        remoteRegions = allKnownRemoteRegions; // null means all remote regions.
    }

    boolean includeRemoteRegion = remoteRegions.length != 0;

    if (includeRemoteRegion) {
        GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
    } else {
        GET_ALL_CACHE_MISS_DELTA.increment();
    }

    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    
    // 获取写锁,禁止其它线程读写
    write.lock();
    try {
        // 遍历增量队列,获取实例信息放入返回结果
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        while (iter.hasNext()) {
            Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
            InstanceInfo instanceInfo = lease.getHolder();
            Application app = applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
            app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
        }
        // 获取远程数据
        if (includeRemoteRegion) {
            for (String remoteRegion : remoteRegions) {
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                    if (null != remoteAppsDelta) {
                        for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                            if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                Application appInstanceTillNow =
                                        apps.getRegisteredApplications(application.getName());
                                if (appInstanceTillNow == null) {
                                    appInstanceTillNow = new Application(application.getName());
                                    apps.addApplication(appInstanceTillNow);
                                }
                                for (InstanceInfo instanceInfo : application.getInstances()) {
                                    appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                                }
                            }
                        }
                    }
                }
            }
        }

        Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}

服务剔除 evict

image-20211017212359449.png
image-20211017212359449.png
class EvictionTask extends TimerTask {
    // 上次执行事件时间
    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
    @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            evict(compensationTimeMs);
        } catch (Throwable e) {
        }
    }

    // 计算补偿时间(额外补偿的租约时间) (System.nanoTime() - lastExecutionNanos - evictionIntervalTimerInMs)
    // 如果补偿时间小于0,设置为0
    // 如: 上次执行时间使10s, 当前时间(这次执行时间)是80s, 服务剔除周期是60s, 则补偿时间是(80-10-60) = 10s
    long getCompensationTimeMs() {
        long currNanos = getCurrentTimeNano();
        long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
        if (lastNanos == 0l) {
            return 0l;
        }

        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
        long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
        return compensationTime <= 0l ? 0l : compensationTime;
    }
    long getCurrentTimeNano() {
        return System.nanoTime();
    }

}

public void evict(long additionalLeaseMs) {
    // !(shouldEnableSelfPreservation 配置不开启自我保护或最近renew次数大于自我保护的阈值)
    if (!isLeaseExpirationEnabled()) {
        return;
    }

    // 遍历注册表所有Lease,将满足剔除条件的Lease放入expiredLeases中
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                // (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs))
                // 实际过期时间应该是 两倍的 duration,这是一个已知的官方bug,
                // 但是因为只影响不雅下线的服务,所以没有修复
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // 计算最大剔除个数:注册表个数 - 注册表个数 * 自我保护阈值比例
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    // 本次执行剔除个数:最大剔除个数和符合剔除条件Lease个数较小值
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        // 随机剔除符合剔除条件Lease列表中的toEvict个
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);
            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            internalCancel(appName, id, false);
        }
    }
}

同步复制

image-20211108194254508.png

上文提到 PeerAwareInstanceRegistryImpl 在服务注册、服务续约和服务下线时,都会通过 replicateToPeers() 方法将数据同步到 eureka-server 的其它节点,看下代码。

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // 来自其它server的同步复制
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // 如果来自其它server的同步复制或者对等server节点列表为空 不需要同步复制
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 遍历对等server节点列表,非当前节点,同步复制数据
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}
// 通过PeerEurekaNode根据不同的Action 发出不同的同步程序
private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
    } finally {
        CurrentRequestVersion.remove();
    }
}

由上代码可知,


image-20211108143138054.png

对等节点管理器 PeerEurekaNodes

eureka 原生 PeerEurekaNodes

public class PeerEurekaNodes {

    protected final PeerAwareInstanceRegistry registry; // 注册表实现
    protected final EurekaServerConfig serverConfig; // server配置
    protected final EurekaClientConfig clientConfig; // client配置
    protected final ServerCodecs serverCodecs;
    private final ApplicationInfoManager applicationInfoManager; // 应用管理器
    
    // 管理维护的对等节点
    private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
    private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();

    // 线程池,用于定时更新对等节点列表
    private ScheduledExecutorService taskExecutor;

    public void start() {
        // 构造线程池
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        
        try {
            // 更新管理对等节点列表
            // resolvePeerUrls 从配置中获取非当前节点的server配置url列表
            // updatePeerEurekaNodes 删除、创建节点,并更新缓存列表
            updatePeerEurekaNodes(resolvePeerUrls());
            
            // 构造更新对等节点列表线程Runable,并开始定时任务
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }
}

Spring Cloud 定制 RefreshablePeerEurekaNodes,允许刷新对等节点,增加实现ApplicationListener接口,监听EnvironmentChangeEvent事件,更新对等节点列表

// 增加了ApplicationListener
static class RefreshablePeerEurekaNodes extends PeerEurekaNodes
      implements ApplicationListener<EnvironmentChangeEvent> {
    
   @Override
   public void onApplicationEvent(final EnvironmentChangeEvent event) {
      if (shouldUpdate(event.getKeys())) {
         updatePeerEurekaNodes(resolvePeerUrls());
      }
   }

   /*
    * 判断是否需要更新
    */
   protected boolean shouldUpdate(final Set<String> changedKeys) {
      assert changedKeys != null;

      // dns 不需要更新
      if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) {
         return false;
      }

      // 如果配置变更的key包含“eureka.client.region”,则需要更新
      if (changedKeys.contains("eureka.client.region")) {
         return true;
      }
      // 如果配置变更的Key包含“eureka.client.service-url.”
      // 或“eureka.client.availability-zones.”前缀,在需要更新
      for (final String key : changedKeys) {
         if (key.startsWith("eureka.client.service-url.")
               || key.startsWith("eureka.client.availability-zones.")) {
            return true;
         }
      }
      return false;
   }

}

综上:对等节点管理器 PeerEurekaNodes 管理维护者对等节点列表,有两种更新方式

  • 定时任务 (PeerEurekaNodes)
  • EnvironmentChangeEvent 事件监听 (RefreshablePeerEurekaNodes)

对等节点 PeerEurekaNode

看PeerEurekaNode成员对象可知,主要是对等节点信息,还有就是批量调度器(batchingDispatcher)和非批量调度器(nonBatchingDispatcher),后文仅介绍批量调度器(batchingDispatcher)。

public class PeerEurekaNode {

    public static final String BATCH_URL_PATH = "peerreplication/batch/";
    private final String serviceUrl; // 当前节点地址
    private final EurekaServerConfig config; // server配置
    private final long maxProcessingDelayMs; // 最大延迟时间
    private final PeerAwareInstanceRegistry registry; // 注册表实现,见上文
    private final String targetHost; // 目标host
    private final HttpReplicationClient replicationClient; // http访问客户端

    /**
    * batchingDispatcher 与 nonBatchingDispatcher为批量和非批量的调度器
    **/
    private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
    private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;
    
    
    // 构造函数
    PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                 HttpReplicationClient replicationClient, EurekaServerConfig config,
                                 int batchSize, long maxBatchingDelayMs,
                                 long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
    this.registry = registry;
    this.targetHost = targetHost;
    this.replicationClient = replicationClient;

    this.serviceUrl = serviceUrl;
    this.config = config;
    this.maxProcessingDelayMs = config.getMaxTimeForReplication();

    String batcherName = getBatcherName();
    ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
    this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
            batcherName,
            config.getMaxElementsInPeerReplicationPool(),
            batchSize,
            config.getMaxThreadsForPeerReplication(),
            maxBatchingDelayMs,
            serverUnavailableSleepTimeMs,
            retrySleepTimeMs,
            taskProcessor
    );
    this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
            targetHost,
            config.getMaxElementsInStatusReplicationPool(),
            config.getMaxThreadsForStatusReplication(),
            maxBatchingDelayMs,
            serverUnavailableSleepTimeMs,
            retrySleepTimeMs,
            taskProcessor
    );
}
    
}

TaskDispatcher 任务调度器创建代码,极其简单,创建了两个对象,AcceptorExecutor和TaskExecutors。其中process方法,就是执行AcceptorExecutor的process方法。

public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor<T> taskProcessor) {

    final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor(id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs);
    final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
    
    return new TaskDispatcher<ID, T>() {
        public void process(ID id, T task, long expiryTime) {
            acceptorExecutor.process(id, task, expiryTime);
        }
        public void shutdown() {
            acceptorExecutor.shutdown();
            taskExecutor.shutdown();
        }
    };
}
接收处理器 AcceptorExecutor

数据结构

// 新任务队列,一段进一段出
BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
// 重新处理任务队列,双端进出
BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
// 
// 等待处理的任务,定时任务
Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
Deque<ID> processingOrder = new LinkedList<>();

// 即将处理的任务,单个
Semaphore singleItemWorkRequests = new Semaphore(0);
BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();

// 即将要处理的任务,批量
Semaphore batchWorkRequests = new Semaphore(0);
BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();

提供的方法

// 提供给注册表实现的方法,接受任务,放入队列
void process(ID id, T task, long expiryTime) {
    acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
    acceptedTasks++;
}

// 提供给work线程TaskExecutors的方法,重新添加失败的任务,批量
void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
    reprocessQueue.addAll(holders);
    replayedTasks += holders.size();
    trafficShaper.registerFailure(processingResult);
}
// 提供给work线程TaskExecutors的方法,重新添加失败的任务,单个
void reprocess(TaskHolder<ID, T> taskHolder, ProcessingResult processingResult) {
    reprocessQueue.add(taskHolder);
    replayedTasks++;
    trafficShaper.registerFailure(processingResult);
}
// 提供给work线程TaskExecutors的方法,获取任务,单个
BlockingQueue<TaskHolder<ID, T>> requestWorkItem() {
    singleItemWorkRequests.release(); // 释放锁
    return singleItemWorkQueue;
}
// 提供给work线程TaskExecutors的方法,获取任务,批量
BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
    batchWorkRequests.release(); // 释放锁
    return batchWorkQueue;
}

定时任务

class AcceptorRunner implements Runnable {
    @Override
    public void run() {
        long scheduleTime = 0;
        while (!isShutdown.get()) {
            try {
                // 将reprocessQueue队列中元素挪至pendingTasks和processingOrder中,
                // 并记录超时未处理的
                // 将acceptorQueue队列中元素挪着pendingTasks和processingOrder中
                // 在放置pendingTasks元素过程中,完成相同任务覆盖
                // pendingTasks 放满即停止
                // 如果放置之后acceptorQueue、reprocessQueue、pendingTasks都为空,
                // 阻塞等待获取10毫秒
                // 每次都取满pendingTasks或acceptorQueue、reprocessQueue被取完
                drainInputQueues();

                
                int totalItems = processingOrder.size();

                long now = System.currentTimeMillis();
                if (scheduleTime < now) {
                    scheduleTime = now + trafficShaper.transmissionDelay();
                }
                if (scheduleTime <= now) {
                    // 将pendingTasks中任务取出批量放入batchWorkQueue中
                    assignBatchWork();
                    // 将pendingTasks中任务取出一个放入singleItemWorkQueue中
                    assignSingleItemWork();
                }

                // 没有任务从pendingTasks移走,说明work线程阻塞或延迟,等待10毫秒
                if (totalItems == processingOrder.size()) {
                    Thread.sleep(10);
                }
            } catch (InterruptedException ex) {
                // Ignore
            } catch (Throwable e) {
                logger.warn("Discovery AcceptorThread error", e);
            }
        }
    }
任务处理器 TaskExecutors

在构造函数中开启work线程

TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
    this.isShutdown = isShutdown;
    this.workerThreads = new ArrayList<>();

    ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
    for (int i = 0; i < workerCount; i++) {
        WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i);
        Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
        workerThreads.add(workerThread);
        workerThread.setDaemon(true);
        workerThread.start();
    }
}

work线程实现,已批量方式为例,

static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
    @Override
    public void run() {
        try {
            // 只要没有关闭 就一直重试
            while (!isShutdown.get()) {
                // 通过 AcceptorExecutor#requestWorkItems 获取任务,如果为空,poll 1秒
                List<TaskHolder<ID, T>> holders = getWork();
                List<T> tasks = getTasksOf(holders);
                
                // 由ReplicationTaskProcessor执行任务,
                // 如果是TransientError错误,如超时或连接失败,则放入重试队列
                ProcessingResult result = processor.process(tasks);
                switch (result) {
                    case Success:
                        break;
                    case Congestion:
                    case TransientError:
                        taskDispatcher.reprocess(holders, result);
                        break;
                    case PermanentError:
                        logger.warn();
                }
            }
        } catch (InterruptedException e) {
            // Ignore
        } catch (Throwable e) {
            logger.warn("Discovery WorkerThread error", e);
        }
    }

}

Eureka 配置

Eureka Instance 配置

public interface EurekaInstanceConfig {

    // eureka 中注册实例的id, 必须是惟一的,
    String getInstanceId();

    // eureka 中注册实例的应用名称 
    String getAppname();
    // eureka 中注册实例的应用组, 多个应用可分组,很少用,一般为null
    String getAppGroupName();

    // 这个属性用来决定应用服务是否一注册上就可以开始接收请求, 默认是false
    // 应用启动时,可以根据eureka.instance.instance-enabled-onit配置设定(默认为false),
    // 来配置初始注册到eureka server的时候,其status是UP,还是STARTING。
    // 默认初始化的时候是STARTING,之后自动注册的时候,变更为UP,表示可以开始接收请求。
    boolean isInstanceEnabledOnit();

    // 非安全端口
    int getNonSecurePort();
    // 安全端口
    int getSecurePort();

    // 非安全端口是否启用 
    boolean isNonSecurePortEnabled();
    // 安全端口是否启用 
    boolean getSecurePortEnabled();

    // 心跳续约间隔 默认30s
    int getLeaseRenewalIntervalInSeconds();

    // 租约到期时间 默认90秒
    int getLeaseExpirationDurationInSeconds();

    // 虚拟域名,其它实例可以通过虚拟域名找到这个实例,类似全限定域名访问 
    String getVirtualHostName();
    // 安全虚拟域名 
    String getSecureVirtualHostName();

    // 域名
    String getHostName(boolean refresh);

    // 元数据, K-V 结构,会被发送到 Server 端
    Map<String, String> getMetadataMap();

    // ip 
    String getIpAddress();

    // 获取状态路径, /actuator/info
    String getStatusPageUrlPath();

    // 获取状态url, / ,配合 getStatusPageUrlPath 使用
    String getStatusPageUrl();

    // 获取home page路径, / 
    String getHomePageUrlPath();

    // 获取home page路径, / , 配合 getHomePageUrlPath 使用
    String getHomePageUrl();

    // 健康检查url path  /actuator/health
    // 通常用来做依据实例健康做决策
    String getHealthCheckUrlPath();

    // 健康检查url, / , 配合 getHealthCheckUrlPath 使用 
    String getHealthCheckUrl();

    // 健康检查url, / , 配合 getHealthCheckUrlPath 使用 
    String getSecureHealthCheckUrl();

    // 配置命名空间,一般是 "eureka"
    String getNamespace();

}

对应 Springboot 环境 EurekaInstanceConfig 类的实现是 EurekaInstanceConfigBean 类

@ConfigurationProperties("eureka.instance")
public class EurekaInstanceConfigBean implements CloudEurekaInstanceConfig, EnvironmentAware {
        
}

EurekaInstanceConfig 被 InstanceInfoFactory 用来创建 InstanceInfo 对象。

public class InstanceInfoFactory {

    public InstanceInfo create(EurekaInstanceConfig config) {
        LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

        InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();

        String namespace = config.getNamespace();
        if (!namespace.endsWith(".")) {
            namespace = namespace + ".";
        }
        builder.setNamespace(namespace).setAppName(config.getAppname())
                .setInstanceId(config.getInstanceId())
                .setAppGroupName(config.getAppGroupName())
                .setDataCenterInfo(config.getDataCenterInfo())
                .setIPAddr(config.getIpAddress())
                .setHostName(config.getHostName(false))
                .setPort(config.getNonSecurePort())
                .enablePort(InstanceInfo.PortType.UNSECURE,
                        config.isNonSecurePortEnabled())
                .setSecurePort(config.getSecurePort())
                .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
                .setVIPAddress(config.getVirtualHostName())
                .setSecureVIPAddress(config.getSecureVirtualHostName())
                .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
                .setStatusPageUrl(config.getStatusPageUrlPath(),
                        config.getStatusPageUrl())
                .setHealthCheckUrls(config.getHealthCheckUrlPath(),
                        config.getHealthCheckUrl(), config.getSecureHealthCheckUrl())
                .setASGName(config.getASGName());

        // Start off with the STARTING state to avoid traffic
        // 如果 isInstanceEnabledOnit 配置为 false, 则启动状态为 STARTING
        // 否则启动状态为 UP
        if (!config.isInstanceEnabledOnit()) {
            InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
            if (log.isInfoEnabled()) {
                log.info("Setting initial instance status as: " + initialStatus);
            }
            builder.setStatus(initialStatus);
        }
        else {
            if (log.isInfoEnabled()) {
                log.info("Setting initial instance status as: "
                        + InstanceInfo.InstanceStatus.UP
                        + ". This may be too early for the instance to advertise itself as available. "
                        + "You would instead want to control this via a healthcheck handler.");
            }
        }

        // 添加任意的元数据
        for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
            String key = mapEntry.getKey();
            String value = mapEntry.getValue();
            // only add the metadata if the value is present
            if (value != null && !value.isEmpty()) {
                builder.add(key, value);
            }
        }

        InstanceInfo instanceInfo = builder.build();
        instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
        return instanceInfo;
    }

}

Eureka Client 配置

public interface EurekaClientConfig {

   // 获取注册列表时间间隔 默认30s
    int getRegistryFetchIntervalSeconds();

    // 更新实例信息的变化到 Server 的间隔时间,默认值30s
    int getInstanceInfoReplicationIntervalSeconds();
    // 行上面任务的初始化Delay延迟  默认值40s
    int getInitialInstanceInfoReplicationIntervalSeconds();

    // 轮询eureka服务器信息更改的频率(动态获取serviceUrl列表)
    // Server 可能会增加或减少,这个配置控制 Client 可以多块知晓变更
    // 默认是5分钟
    int getEurekaServiceUrlPollIntervalSeconds();

    // Server 的代理
    String getProxyHost();
    String getProxyPort();
    String getProxyUserName();
    String getProxyPassword();

    // 是否要对 Server 拉取的内容容进行 GZip 压缩 默认值是 true
    // true:会加过滤器 GZIPContentEncodingFilter,在请求上加请求头:`Accept-Encoding:gzip`
    boolean shouldGZipContent();

    // Client 访问 Server, 控制发送请求时的 readTimeout 值,默认是8s 
    int getEurekaServerReadTimeoutSeconds();
    // Client 访问 Server, 控制发送请求时的 connectionTimeout 值,默认是8s  
    int getEurekaServerConnectTimeoutSeconds();

    // 获取备注册中心的实现类(若你的EurekaClient链接的Server挂了,就使用它去连其它的)
    // 若你想做兜底,那么可以使用它(比如使用Nacos做备用)
    // 说明:内部并未提供任何实现,若有需要请自己提供实现类,且配置好就可生效
    String getBackupRegistryImpl();

    // Client 是可以并发发出N多个请求请求 Server 的,
    // 这里就给出了限制,避免单个 Client 实例把 Server 就给搞垮了
    // 控制 maxTotalConnections,也就是发送请求的连接池的最大容量,默认值200
    // 如果是Apache的HC,那就是控制它的连接池大小
    int getEurekaServerTotalConnections();

    //  单台host的允许的连接总数  默认值50
    int getEurekaServerTotalConnectionsPerHost();

    // ========下面这些配置均只有在eureka服务器ip地址列表是在DNS中才会用到,默认为null======= //
    // 表示eureka注册中心的路径,如果配置为eureka,则为http://x.x.x.x:x/eureka/
    // 在eureka的配置文件中加入此配置表示eureka作为客户端向注册中心注册,从而构成eureka集群
    String getEurekaServerURLContext();
    // 获取eureka服务器的端口
    String getEurekaServerPort();
    // 获取要查询的DNS名称来获得eureka服务器
    String getEurekaServerDNSName();
    // 是否使用 DNS 方式获取 Eureka-Server URL 地址  默认是false
    // 在获取到Server集群的节点后,会解析出url们。这里决定如何解析(要不要用DNS)
    boolean shouldUseDnsForFetchingServiceUrls();

    // 是否注册自己这个实例到Server上  默认是true
    // 有些情况,你不希望腻子注册到 Server, 可以选择不注册,如服务消费者
    // 如果不注册自己,很多定时任务是不需要的,因此Server端建议关闭此项
    // 当然你若想看看自己是否还“活着”,让其注册上去也无妨     
    boolean shouldRegisterWithEureka();

    // 在 Client shutdown 时,是否从 Server 上主动下线, 默认true
    // 它是在EurekaClient#shutdown方法里被调用的
    default boolean shouldUnregisterOnShutdown() {
        return true;
    }

    // 实例是否使用同一zone里的eureka服务器,默认为true
    // 理想状态下,eureka客户端与服务端是在同一zone下
    boolean shouldPreferSameZoneEureka();

    // 简单的说:是否允许Server端给你返回302重定向其它机器去处理(比如自己负载太高了不想处理)
    // 它的实现方式是发送请求时添加请求头:`X-Discovery-AllowRedirect:true`
    // 默认值是false
    boolean allowRedirects();

    // 当Client本地的实例们和Server返回的实例们出现差时(比如状态变更、元数据变更等),是否记录log日志
    // 默认值是false,不记录。毕竟这种日志意义并不大~~~没必要消耗性能
    boolean shouldLogDeltaDiff();

    // 是否禁用增量获取 true:每次全量获取,false:每次增量获取
    // 默认值是false
    boolean shouldDisableDelta();

    // Eureka-Server 的 URL 集合
    // 默认为http://XXXX:X/eureka/,但是如果采用DNS方式获取服务地址,则不需要配置此设置。
    List<String> getEurekaServerServiceUrls(String myZone);

    // 获取实例时是否过滤,仅保留UP状态的实例  默认值是true
    boolean shouldFilterOnlyUpInstances();

    // 控制连接线程池的。最大空闲时间,默认值30s
    int getEurekaConnectionIdleTimeoutSeconds();

    // 是否从Eureka服务端获取注册信息  默认true
    boolean shouldFetchRegistry();

    // 只去只获得一个 vipAddress 对应的应用实例们的注册信息。默认值null
    // 若指定来该值:那么只会从这个单一的vipAddress里获取
    @Nullable
    String getRegistryRefreshSingleVipAddress();

    // 心跳执行程序(续约线程)线程池的大小   默认为5
    int getHeartbeatExecutorThreadPoolSize();

    // 心跳超时重试延迟时间的最大乘数值。默认值是10
    // 也就是超时了的话  5*10  50s后再去试一把
    int getHeartbeatExecutorExponentialBackOffBound();

    // 缓存刷新线程池的初始化线程数  默认值5
    int getCacheRefreshExecutorThreadPoolSize();

    // 缓存刷新重试延迟时间的最大乘数值。默认值是10
    // 也就是超时了的话  5*10  50s后再去试一把    
    int getCacheRefreshExecutorExponentialBackOffBound();

    // eureka服务器序列化/反序列化的信息中获取“$”符号的的替换字符串。默认为“_-”
    String getDollarReplacement();

    // eureka服务器序列化/反序列化的信息中获取“_”符号的的替换字符串。默认为“__”
    String getEscapeCharReplacement();

    // true:通过ApplicationInfoManager本地实例状态时,立即触发更新到远程server
    // false:不立即触发,依赖于心跳,默认值true
    boolean shouldOnDemandUpdateStatusChange();

    // 客户端是否应在初始化期间强制注册  默认值false
    // 毕竟初始化期间不一定全部搞定了,所以为false较好
    default boolean shouldEnforceRegistrationAtInit() {
        return false;
    }


    String getEncoderName();
    String getDecoderName();

    // 加上请求头:`X-Eureka-Accept = xxx`
    // 可选值是:full/compact
    String getClientDataAccept();

    // experimental:试验的
    // 当尝试新功能迁移过程时,为了避免配置API污染,试验的这部分配置即可通过此接口传递
    // 可见:JerseyEurekaHttpClientFactory#buildExperimental用于构建试验的配置
    // 用于灰度一些实验性的配置比较好用
    String getExperimental(String name);

    // 为了兼容性,返回传输层配置类
    // EurekaTransportConfig会放在详解传输的时候解析
    // 也是在DiscoveryClient里会使用到
    EurekaTransportConfig getTransportConfig();
}

Springboot 环境,EurekaClientConfig 的实现类是 EurekaClientConfigBean。

@ConfigurationProperties(EurekaClientConfigBean.PREFIX)
public class EurekaClientConfigBean implements EurekaClientConfig, Ordered {
}

public static final String PREFIX = "eureka.client";

Eureka Server 配置

public interface EurekaServerConfig {

    // 是否开启自我保护机制 默认开启true
    boolean shouldEnableSelfPreservation();
    // 开启自我保护模式比例,超过该比例后开启自我保护模式。默认0.85
    double getRenewalPercentThreshold();
    // 自我保护模式比例更新定时任务执行频率。
    int getRenewalThresholdUpdateIntervalMs();
    // 期望客户端发送心跳的间隔。
    // 默认值为30秒。
    // 如果客户端以不同的频率发送心跳,比如说,每15秒发送一次,那么应该相应地调整此参数,
    // 否则,自我保护将无法按预期工作。
    int getExpectedClientRenewalIntervalSeconds();

    // server 节点间相互发现周期
    int getPeerEurekaNodesUpdateIntervalMs();

    // 复制数据使用压缩,通过 Accept-Encoding 实现。
    boolean shouldEnableReplicatedRequestCompression();
    // 复制同步重试次数
    int getNumberOfReplicationRetries();
    
    // server 节点状态刷新周期
    int getPeerEurekaStatusRefreshTimeIntervalMs();

    // 设置如果Eureka Server启动时无法从临近Eureka Server节点获取注册信息,
    // 它多久不对外提供注册服务
    int getWaitTimeInMsWhenSyncEmpty();

    // server 节点间访问连接超时时间
    int getPeerNodeConnectTimeoutMs();

    // server 节点间读超时时间
    int getPeerNodeReadTimeoutMs();

    // 节点间 最大连接数
    int getPeerNodeTotalConnections();

    // 节点间对某一节点最大连接数
    int getPeerNodeTotalConnectionsPerHost();

    // 连接空闲时间
    int getPeerNodeConnectionIdleTimeoutSeconds();

    // 增量队列缓存保留时间
    long getRetentionTimeInMSInDeltaQueue();

    // 增量队列检查缓存过期时间周期
    long getDeltaRetentionTimerIntervalInMs();

    // 剔除心跳周期
    long getEvictionIntervalTimerInMs();

    // 只读缓存过期时间
    long getResponseCacheAutoExpirationInSeconds();

    // 二级缓存过期时间
    long getResponseCacheUpdateIntervalMs();

    // 使用一级缓存
    boolean shouldUseReadOnlyResponseCache();

    // 是否关闭使用增量信息
    boolean shouldDisableDelta();

    // 同步线程池 线程空闲时间
    long getMaxIdleThreadInMinutesAgeForStatusReplication();

    // 同步线程池 最小线程数
    int getMinThreadsForStatusReplication();

    // 同步线程池 最大线程数
    int getMaxThreadsForStatusReplication();

    // 同步线程池 同步队列最大值
    int getMaxElementsInStatusReplicationPool();

    // 当时间戳不同时 是否同步
    boolean shouldSyncWhenTimestampDiffers();

    // server启动时,从其他节点拉取注册表 重试次数
    int getRegistrySyncRetries();

    // server启动时,从其他节点拉取注册表 重试等待时间
    long getRegistrySyncRetryWaitMs();

    // 同步线程 批量大小
    int getMaxElementsInPeerReplicationPool();
    // 最小up状态的server节点数目
    int getHealthStatusMinNumberOfAvailablePeers();

    // 批量同步最大延迟时间,超时不处理
    int getMaxTimeForReplication();
    // 是否使用批量同步
    boolean shouldBatchReplication();
    // 当前serverUrl
    String getMyUrl();
    // 是否启动限流
    boolean isRateLimiterEnabled();
    // 总限流值
    int getRateLimiterBurstSize();
    // 拉取请求限流值
    int getRateLimiterRegistryFetchAverageRate();
    // 全量拉取限流值
    int getRateLimiterFullFetchAverageRate();
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容