Spring Cloud Eureka 源码分析 —— 服务访问延迟(感知延迟)

一. 前言

本篇主要说明eureka客户端与服务端间感知延迟的原因,并从源码层面解释各个延迟点的源码实现,以及如何保证调用方平滑感知实例上下线。

二. 服务上线

  1. service注册到服务端(启动后即时注册),0s
  2. 服务端只读/读写缓存同步周期,responseCacheUpdateIntervalMs=30s
  3. 客户端拉取最新注册表周期,registryFetchIntervalSeconds=30s
  4. 客户端ribbon缓存serverList更新周期,ServerListRefreshInterval=30s

服务上线被客户端感知的最大耗时为90s

三. 服务下线

正常下线

  1. service下线发起cancel请求到服务端(服务停止前即时请求),0s
  2. 服务端只读/读写缓存同步周期,responseCacheUpdateIntervalMs=30s
  3. 客户端拉取最新注册表周期,registryFetchIntervalSeconds=30s
  4. 客户端ribbon缓存serverList更新周期,ServerListRefreshInterval=30s

服务正常下线被客户端感知的最大耗时为90s,延迟点跟上线完全一致。

异常下线

异常下线指service下线时并没有主动发送cancel请求,例如kill -9 或直接宕机。

  1. 服务端剔除(evict)过期任务的执行周期, evictionIntervalTimerInMs=60s
  2. 剔除任务会对90s内未发起续约的请求进行剔除,leaseExpirationDurationInSeconds=90s
  3. 服务端只读/读写缓存同步周期,responseCacheUpdateIntervalMs=30s
  4. 客户端拉取最新注册表周期,registryFetchIntervalSeconds=30s
  5. 客户端ribbon缓存serverList更新周期,ServerListRefreshInterval=30s

服务异常下线被客户端感知的最大耗时为240s

下面将依次分析每个步骤所在源码的位置及实现。

四. 源码分析

服务端只读/读写缓存同步

ResponseCacheImpl

  // responseCacheUpdateIntervalMs 默认30s
  if (shouldUseReadOnlyResponseCache) {
      timer.schedule(getCacheUpdateTask(),
              new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                      + responseCacheUpdateIntervalMs),
              responseCacheUpdateIntervalMs);
  }

  private TimerTask getCacheUpdateTask() {
      return new TimerTask() {
          @Override
          public void run() {
              for (Key key : readOnlyCacheMap.keySet()) {
                  try {
                      CurrentRequestVersion.set(key.getVersion());
                      Value cacheValue = readWriteCacheMap.get(key);
                      Value currentCacheValue = readOnlyCacheMap.get(key);
                      if (cacheValue != currentCacheValue) {
                          readOnlyCacheMap.put(key, cacheValue);
                      }
                  } catch (Throwable th) {
                      logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                  }
              }
          }
      };
  }

getCacheUpdateTask的schedule每隔30s执行一次,遍历readOnlyCacheMap中的每个key,从readWriteCacheMap中取出最新值,保存到value中。

如果是新的key,readOnlyCacheMap之前并没有缓存,则会在getValue时,完成readOnlyCacheMap的填充。

Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key : {}", key, t);
    }
    return payload;
 }

客户端拉取最新注册表任务

DiscoveryClient

/**
 * Initializes all scheduled tasks.
 */
// registryFetchIntervalSeconds 默认值30s
private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    ...
}

在client端启动过程中,DiscoveryClient的构造方法中初始化了多个schedule任务,其中一个就是开启周期拉取服务端注册表任务,周期时间为30s。执行任务是new CacheRefreshThread(),拉取到的最新注册表会保存到本地缓存中localRegionApps。
AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>()

客户端ribbon缓存serverList更新

PollingServerListUpdater

@Override
public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };
        // initialDelayMs 默认1s
        // refreshIntervalMs 默认30s
        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

在ribbon启动过程中,DynamicServerListLoadBalancer的构造方法调用了serverListUpdater.start(updateAction); 开启了周期刷新serverList的任务,每隔30s执行一次。
执行的任务是doUpdate()

  protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
      @Override
      public void doUpdate() {
          updateListOfServers();
      }
  };

  public void updateListOfServers() {
      List<T> servers = new ArrayList<T>();
      if (serverListImpl != null) {
          servers = serverListImpl.getUpdatedListOfServers();
          
          if (filter != null) {
              servers = filter.getFilteredListOfServers(servers);
          }
      }
      updateAllServerList(servers);
   }

这里在获取最新的server注册表时,使用的是eureka缓存的值,localRegionAppsremoteRegionVsApps,并没有发起远程拉取注册表的请求。将更新后的serverList缓存到BaseLoadBalancer父类中

   @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections
            .synchronizedList(new ArrayList<Server>());

我们在自定义ribbon rule时,继承AbstractLoadBalancerRule,即可直接通过getLoadBalancer()来获取当前的serverList。

服务端剔除(evict)过期任务

AbstractInstanceRegistry

class EvictionTask extends TimerTask {

    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

    @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }
}

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    // EvictionIntervalTimerInMs 默认60s
    evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs());
}

EurekaBootStrap初始化eureka上下文过程中,内部调用AbstractInstanceRegistry.postInit开启EvictionTask, 每隔60s执行一次剔除任务。在剔除过程中会计算每次的补偿时间(compensationTimeMs),防止因为gc或时钟回拨等因素产生误差。

剔除任务会对90s内未发起续约的请求进行剔除

AbstractInstanceRegistry

public void evict(long additionalLeaseMs) {
    // ... 
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                // 这里用于判定是否过期
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }
    // ... 后续将过期的实例随机剔除一部分,不超过总实例数的15%。
}

// 判定是否过期,additionalLeaseMs为补偿时间
public boolean isExpired(long additionalLeaseMs) {
    return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}

通过判定过期时间可以发现,两次的续约时间差需要小于(duration + additionalLeaseMs),假如不考虑补偿因素,那么续约时间差需小于duration

再看下duration是怎么来的。

public Lease(T r, int durationInSecs) {
    holder = r;
    registrationTimestamp = System.currentTimeMillis();
    lastUpdateTimestamp = registrationTimestamp;
    duration = (durationInSecs * 1000);
}

首先,duration是通过Lease的构造方法赋值,而lease对象的取值是从registry缓存中获得的,registry缓存则是在实例注册方法实现中进行保存。

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    //...
    Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
    if (existingLease != null) {
        lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
    }
    gMap.put(registrant.getId(), lease);
    //...
}

通过实例注册的方法实现可以看到,判定过期周期时间是在InstanceInfo中定义的,如果为空,则使用默认值Lease.DEFAULT_DURATION_IN_SECS是90s。

InstanceInfo是由客户端侧将本次注册的实例信息传递来,所以继续看客户端对于InstanceInfo的封装过程。

客户端的注册实现主要在DiscoveryClient中。

InstanceInfo myInfo = applicationInfoManager.getInfo();
// ...
/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

EurekaClientAutoConfiguration

@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
    InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
    return new ApplicationInfoManager(config, instanceInfo);
}

注册参数instanceInfo是在注入过程中保存到了ApplicationInfoManager中,ApplicationInfoManager在创建时,会通过InstanceInfoFactory工厂来创建一个InstanceInfo的实例,duration则定义在了factory构造中。

public class InstanceInfoFactory {

    public InstanceInfo create(EurekaInstanceConfig config) {
        LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

        // Builder the instance information to be registered with eureka server
        InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
        // ... build各种参数
        
        InstanceInfo instanceInfo = builder.build();
        instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
        return instanceInfo;
    }
    // ...
}

可以看到,这里使用config.getLeaseExpirationDurationInSeconds()作为duration的值,在EurekaInstanceConfigBean中配置了duration的值,默认90s

private int leaseExpirationDurationInSeconds = 90;

至此我们可以确定,客户端默认使用90s作为实例剔除的过期时间。

五. 配置优化

经过上面的分析,我们了解到各个情况服务感知的所有延迟点,以及实现原理。在此基础上,我们可以对真实场景下eureka参数配置进行适当优化。

主要解决两个问题:

  1. 减少客户端的感知时间
  2. 保证客户端可以正确访问已从erueka下线的实例,不会因为其下线而缓存未及时更新导致失败

1. 减少客户端的感知时间

eureka server 配置

服务端只读/读写缓存同步周期缩短到10s,因为只限于内存间两个map的操作,可以大幅缩短缓存同步时间 (30s -> 10s)

eureka.server.response-cache-update-interval-ms=10000

eureka client 配置

客户端拉取最新注册表周期缩短到10s,因为客户端每次只会主动拉取增量配置,这里也适当缩短拉取时间 (30s -> 10s)

eureka.client.registry-fetch-interval-seconds=10

客户端ribbon缓存serverList更新周期缩短到5s,ribbon的更新只会进行内存间的同步,这里可以大幅度缩短时间 (30s -> 5s)

ribbon.ServerListRefreshInterval=5000

此时,服务上线感知时间最大耗时 25s

2. 旧实例从eureka server下线后继续保持可用(平滑启动)

eureka为了保证调用的高效率和高可用性,在内部模型中加入了各级缓存(包括ribbon),这就导致如果旧实例下线后,如果客户端没有及时把旧实例地址剔除,请求仍然可以被打到下线实例上导致报错。

结合上文的内容,如果想实现平滑启动需要完成以下几步:

  1. 旧实例shutdown前,eureka client需要感知实例即将关闭,并及时告知eureka server即将下线
  2. 调用方需要尽可能快速感知到旧实例的状态变化
  3. 旧实例从发送下线通知到彻底shutdown这个周期需要被拉长,来保证客户端更新缓存前,请求打到此实例上依然可以处理。

继续从源码层面分析这三步该如何实现。

2.1 eureka client感知实例即将关闭

spring通过在启动过程注册shutdown hook,当实例关闭前,会发送ContextClosedEvent事件。

关于Spring处理服务关闭的详细过程请参考 Spring 源码分析 —— 服务优雅关闭

eureka会监听ContextClosedEvent事件,来完成通知server端下线的操作。

EurekaAutoServiceRegistration

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof WebServerInitializedEvent) {
            onApplicationEvent((WebServerInitializedEvent) event);
        }
        else if (event instanceof ContextClosedEvent) {
            onApplicationEvent((ContextClosedEvent) event);
        }
    }
    public void onApplicationEvent(ContextClosedEvent event) {
        if (event.getApplicationContext() == context) {
            stop();
        }
    }

    @Override
    public void stop() {
        this.serviceRegistry.deregister(this.registration);
        this.running.set(false);
    }
}

EurekaAutoServiceRegistration实现SmartApplicationListener接口来监听ContextClosedEvent事件,最终会调用deregister

除此之外,在Spring 源码分析 —— 服务优雅关闭 一篇分析到spring的shutdown hook除了发送ContextClosedEvent事件之外,还会调用所有的lifecycle的stop方法,实现所有lifecycle的关闭动作,所以这里的stop方法也会在事件处理完成之后再次被调用,最终也会调用deregister

@Override
public void deregister(EurekaRegistration reg) {
    if (reg.getApplicationInfoManager().getInfo() != null) {
        if (log.isInfoEnabled()) {
            log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status DOWN");
        }
        reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
    }
}

deregister方法将ApplicationInfo中的实例status修改成DOWN,eureka内部会监听状态变更事件发送给eureka server端,此时server是可以及时更新实例状态为DOWN。

2.2 调用方尽快感知server中实例新状态

上文中也讲到,调用方服务有两层缓存,分别是eureka客户端拉取server列表的localRegionApps缓存和ribbon的serverList缓存,为此我们将更新周期分别改成了10s和5s

eureka.client.registry-fetch-interval-seconds=10
ribbon.ServerListRefreshInterval=5000

这样客户端最大的感知时间就是15s,我们需要保证在这15s内,访问旧实例不会失败,因此需要拉长旧实例的下线时间。

2.3 拉长旧实例的下线时间

想要拉长下线时间比较容易,通过sleep就可以,但是有个前提:必须保证server端的实例状态已经为DOWN, 且servlet容器没有被停止,在这个阶段的sleep才有意义。

如何来保证sleep的时机,需要继续深入分析eureka的实现。

如何确定sleep线程的时机

前文已经提到,eureka中EurekaAutoServiceRegistration分别实现了SmartLifecycle, SmartApplicationListener,因此有两个入口来感知关闭事件,分别是

@Override
public void stop(Runnable callback) {
    stop();
    callback.run();
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
    if (event instanceof WebServerInitializedEvent) {
        onApplicationEvent((WebServerInitializedEvent) event);
    }
    else if (event instanceof ContextClosedEvent) {
        onApplicationEvent((ContextClosedEvent) event);
    }
}
这两个入口的调用先后顺序是如何的?

Spring 源码分析 —— 服务优雅关闭中有提到,AbstractApplicationContext#doClose()方法的实现中,会先发送ContextClosedEvent事件,再通过lifecycleProcessor调用所有lifecycle的stop

AbstractApplicationContext

protected void doClose() {
    // ...
    try {
        // Publish shutdown event.
        // 发布ContextClosedEvent关闭事件
        publishEvent(new ContextClosedEvent(this));
    }
    catch (Throwable ex) {
        logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
    }

    // Stop all Lifecycle beans, to avoid delays during individual destruction.
    // 调用所有lifecycle子类bean的关闭方法
    if (this.lifecycleProcessor != null) {
        try {
            this.lifecycleProcessor.onClose();
        }
        catch (Throwable ex) {
            logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
        }
    }
    // ...
}

可以得知,listener的处理先于lifecycle的处理

了解了eureka停止时机之后,还需要看看servlet容器是何时停止的。

servlet容器(Tomcat)何时被停止

从这张继承关系图可以发现,除了EurekaAutoServiceRegistration之外,还有两个WebServer的bean,WebServerGracefulShutdownLifecycleWebServerStartStopLifecycle也同样实现了SmartLifecycle

WebServerStartStopLifecycle 负责webServer的启动和停止
WebServerStartStopLifecycle 只负责webServer的优雅停止(默认不执行)

class WebServerStartStopLifecycle implements SmartLifecycle {

    private final ServletWebServerApplicationContext applicationContext;

    private final WebServer webServer;

    private volatile boolean running;

    WebServerStartStopLifecycle(ServletWebServerApplicationContext applicationContext, WebServer webServer) {
        this.applicationContext = applicationContext;
        this.webServer = webServer;
    }

    @Override
    public void start() {
        this.webServer.start();
        this.running = true;
        this.applicationContext
                .publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext));
    }

    @Override
    public void stop() {
        this.webServer.stop();
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public int getPhase() {
        return Integer.MAX_VALUE - 1;
    }
}
class WebServerGracefulShutdownLifecycle implements SmartLifecycle {

    private final WebServer webServer;

    private volatile boolean running;

    WebServerGracefulShutdownLifecycle(WebServer webServer) {
        this.webServer = webServer;
    }

    @Override
    public void start() {
        this.running = true;
    }

    @Override
    public void stop() {
        throw new UnsupportedOperationException("Stop must not be invoked directly");
    }

    @Override
    public void stop(Runnable callback) {
        this.running = false;
        this.webServer.shutDownGracefully((result) -> callback.run());
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }
}
SmartLifecycle实现类的执行顺序是如何的?

他们的执行顺序是由getPhase的值决定的。在spring启动过程中,会根据phase的值从小到大执行,在停止过程中,会从大到小执行(注意是相反的),具体的实现在DefaultLifecycleProcessor中。

SmartLifecycle phase
EurekaAutoServiceRegistration 0
WebServerStartStopLifecycle Integer.MAX_VALUE - 1
WebServerGracefulShutdownLifecycle Integer.MAX_VALUE

启动顺序: EurekaAutoServiceRegistration -> WebServerStartStopLifecycle -> WebServerGracefulShutdownLifecycle
停止顺序: WebServerGracefulShutdownLifecycle -> WebServerStartStopLifecycle -> EurekaAutoServiceRegistration

结合上面的分析,整个shutdown的执行顺序如下图:

shutdown 执行顺序

至此,我们可以在图中插入点1插入点2完成sleep的操作。

但是因为WebServerGracefulShutdownLifecycle已经是最高优先级了,如果我们默认没有开启优雅关闭,可以在插入点2实现SmartLifecycle并配置最高优先级,否则为了稳妥和保证扩展性,更应该在插入点1来完成。

插入点1: SmartApplicationListener实现
@Slf4j
public class UnawareBootListener implements SmartApplicationListener {

    // server读写cache的同步周期 
    public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
    // eureka.client 配置
    private final EurekaClientOptimizeConfigBean eurekaConfig;
    // ribbon 配置
    private final RibbonOptimizeConfigBean ribbonConfig;

    public UnawareBootListener(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
        this.eurekaConfig = eurekaConfig;
        this.ribbonConfig = ribbonConfig;
    }

    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return ContextClosedEvent.class.isAssignableFrom(eventType);
    }

    // eureka是0,这里设置成1,比eureka低1级
    @Override
    public int getOrder() {
        return 1;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
        if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
            registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
        }
        Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
        if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
            serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
        }
        int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000)
                + DEFAULT_EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
        try {
            Thread.sleep(shutDownWaitTime * 1000L);
        } catch (InterruptedException e) {
            log.warn("UnawareBootListener wait to shutdown interrupted");
        }
        log.info("UnawareBootListener wait to shutdown seconds: {}s finish", shutDownWaitTime);
    }
}
插入点2: lifecycle实现
@Slf4j
public class UnawareBoot implements SmartLifecycle {

    // server读写cache的同步周期 
    public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
    // eureka.client 配置
    private final EurekaClientOptimizeConfigBean eurekaConfig;
    // ribbon 配置
    private final RibbonOptimizeConfigBean ribbonConfig;
    private volatile boolean running = false;

    public UnawareBoot(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
        this.eurekaConfig = eurekaConfig;
        this.ribbonConfig = ribbonConfig;
    }

    @Override
    public void start() {
        running = true;
    }

    @Override
    public void stop() {
        running = false;
        Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
        if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
            registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
        }
        Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
        if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
            serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
        }
        int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000) + EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
        try {
            Thread.sleep(shutDownWaitTime * 1000L);
        } catch (InterruptedException e) {
            log.warn("UnawareBoot wait to shutdown interrupted");
        }
        log.info("UnawareBoot wait to shutdown seconds: {}s finish", shutDownWaitTime);
    }

    /**
     * 设置最高优先级,stop时优先阻塞
     */
    @Override
    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    @Override
    public boolean isRunning() {
        return running;
    }
}

看上去很完美,整个思路没有问题,确实可以解决平滑重启的问题,但是中间少考虑的一点,就是eureka client续约心跳任务。如果当前代码在sleep之前,client先发送了续约请求,那样同步给server的状态就从DOWN变成了UP。

漏洞修复

心跳续约任务实现

InstanceInfoReplicator

 public void run() {
    try {
        // 刷新实例状态,也就是这个方法将之前的DOWN转为了UP
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

DiscoveryClient

void refreshInstanceInfo() {
    applicationInfoManager.refreshDataCenterInfoIfRequired();
    applicationInfoManager.refreshLeaseInfoIfRequired();

    InstanceStatus status;
    try {
        status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
    } catch (Exception e) {
        logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
        status = InstanceStatus.DOWN;
    }

    if (null != status) {
        applicationInfoManager.setInstanceStatus(status);
    }
}

InstanceInfoReplicator心跳续约任务会每隔replicationIntervalSeconds(默认30s),向server同步当前状态,同步之前会计算当前的最新状态。计算状态由DiscoveryClient.getHealthCheckHandler().getStatus()完成。

EurekaHealthCheckHandler

@Override
public InstanceStatus getStatus(InstanceStatus instanceStatus) {
    return getHealthStatus();
}

protected InstanceStatus getHealthStatus() {
    final Status status;
    // statusAggregator默认会在初始化时注入
    if (statusAggregator != null) {
        status = getStatus(statusAggregator);
    }
    else {
        status = getStatus(getHealthIndicator());
    }
    return mapToInstanceStatus(status);
}

protected Status getStatus(StatusAggregator statusAggregator) {
    Status status;

    Set<Status> statusSet = new HashSet<>();
    if (healthIndicators != null) {
        statusSet.addAll(
                healthIndicators.values().stream().map(HealthIndicator::health)
                            .map(Health::getStatus).collect(Collectors.toSet()));
    }

    if (reactiveHealthIndicators != null) {
        statusSet.addAll(reactiveHealthIndicators.values().stream()
                .map(ReactiveHealthIndicator::health).map(Mono::block)
                .filter(Objects::nonNull).map(Health::getStatus)
                .collect(Collectors.toSet()));
    }
    // 这个方法会将set集合中的每个status进行排序,返回order最低的一个set
    status = statusAggregator.getAggregateStatus(statusSet);
    return status;
}

这个getStatus会计算当前最新的状态,计算的方式遍历所有的healthIndicators,基于当前实例的各种状态、参数、数据库状态等分别计算Status,构成一个Set<Status>集合。

SimpleStatusAggregator

@Override
public Status getAggregateStatus(Set<Status> statuses) {
    return statuses.stream().filter(this::contains).min(this.comparator).orElse(Status.UNKNOWN);
}

SimpleStatusAggregator会将Set集合进行排序,返回order最低的一个set,默认顺序从低到高依次是
DOWN -> OUT_OF_SERVICE -> UP -> UNKNOWN

如果一切正常,这里比较后的状态就是UP,重新设置到InstanceInfo中,变更事件会将此次变更发给server,server中的实例状态就被更新为UP。

解释完心跳续约的过程之后,我们知道,如果只是单纯依赖变更事件去同步server实例DOWN的状态是不严谨的。需要彻底将eureka shutdown才可以。

shutdown eureka client

先看下eureka自己是怎么实现shutdown的。

EurekaClientAutoConfiguration

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
        EurekaClientConfig config, EurekaInstanceConfig instance,
        @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
            
    ApplicationInfoManager appManager;
    if (AopUtils.isAopProxy(manager)) {
        appManager = ProxyUtils.getTargetObject(manager);
    }
    else {
        appManager = manager;
    }
    CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
    cloudEurekaClient.registerHealthCheck(healthCheckHandler);
    return cloudEurekaClient;
}

EurekaClient的定义中,指定了destroyMethod属性,当bean在被回收时,会调用此方法。
DiscoveryClient

@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            // 从applicationInfoManager移除事件变更监听器
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
        // 取消所有的定时任务
        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) {
            // 设置状态为DOWN,并主动发送注销请求cancel到server端,这里不在依赖监听器发送
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }
        // 关闭Transport client
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }
        // 关闭各种Monitor
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        Monitors.unregisterObject(this);

        logger.info("Completed shut down of DiscoveryClient");
    }
}

private void cancelScheduledTasks() {
    // 停止心跳续约任务
    if (instanceInfoReplicator != null) {
        instanceInfoReplicator.stop();
    }
    // 停止心跳续约执行器
    if (heartbeatExecutor != null) {
        heartbeatExecutor.shutdownNow();
    }
    // 停止定时拉取server注册表执行器
    if (cacheRefreshExecutor != null) {
        cacheRefreshExecutor.shutdownNow();
    }
    // 停止ScheduledExecutorService
    if (scheduler != null) {
        scheduler.shutdownNow();
    }
    // 停止定时拉取server注册表任务
    if (cacheRefreshTask != null) {
        cacheRefreshTask.cancel();
    }
    // 停止心跳任务
    if (heartbeatTask != null) {
        heartbeatTask.cancel();
    }
}

shutdown主要完成了几件事:移除事件变更监听器停止所有的定时任务设置实例状态为DOWN发起注销请求关闭Transport client

了解了整个关闭过程之后,如果我们想彻底保证server的注册表处于DOWN的状态,只需要手动调用DiscoveryClient.shutdown()
准确来说shutdown之后的server注册表已经把当前实例下掉了,不再显示DOWN状态。

五. 平滑启动完整版实现

@Slf4j
public class UnawareBootListener implements SmartApplicationListener, ApplicationContextAware {

    // server读写cache的同步周期 
    public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
    // eureka.client 配置
    private final EurekaClientOptimizeConfigBean eurekaConfig;
    // ribbon 配置
    private final RibbonOptimizeConfigBean ribbonConfig;

    public UnawareBootListener(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
        this.eurekaConfig = eurekaConfig;
        this.ribbonConfig = ribbonConfig;
    }
    
    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return ContextClosedEvent.class.isAssignableFrom(eventType);
    }

    //之前指定order的值也可以忽略了,都已经主动shutdown,不需要在关心listener的顺序

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        DiscoveryClient discoveryClient = applicationContext.getBean(DiscoveryClient.class);
        // 主动触发eureka client shutdown
        discoveryClient.shutdown();
        Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
        if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
            registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
        }
        Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
        if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
            serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
        }
        int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000)
                + DEFAULT_EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
        try {
            Thread.sleep(shutDownWaitTime * 1000L);
        } catch (InterruptedException e) {
            log.warn("UnawareBootListener wait to shutdown interrupted");
        }
        log.info("UnawareBootListener wait to shutdown seconds: {}s finish", shutDownWaitTime);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

六. 总结

本文从源码层面剖析了eureka内部如何感知实例上下线,如何刷新缓存等,并给出了解决平滑启动的最佳实践。

写在文末:

实践代码虽然只有区区几十行,但至少需要了解上百倍代码量实现。了解如何启动、如何刷新、如何停止,考虑前后依赖的各种组件,前后耗时一个月,花了十几个小时,才写出这几十行代码。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容