一. 前言
本篇主要说明eureka客户端与服务端间感知延迟的原因,并从源码层面解释各个延迟点的源码实现,以及如何保证调用方平滑感知实例上下线。
二. 服务上线
- service注册到服务端(启动后即时注册),0s
- 服务端只读/读写缓存同步周期,
responseCacheUpdateIntervalMs=30s
- 客户端拉取最新注册表周期,
registryFetchIntervalSeconds=30s
- 客户端ribbon缓存serverList更新周期,
ServerListRefreshInterval=30s
服务上线被客户端感知的最大耗时为90s
三. 服务下线
正常下线
- service下线发起cancel请求到服务端(服务停止前即时请求),0s
- 服务端只读/读写缓存同步周期,
responseCacheUpdateIntervalMs=30s
- 客户端拉取最新注册表周期,
registryFetchIntervalSeconds=30s
- 客户端ribbon缓存serverList更新周期,
ServerListRefreshInterval=30s
服务正常下线被客户端感知的最大耗时为90s,延迟点跟上线完全一致。
异常下线
异常下线指service下线时并没有主动发送cancel请求,例如kill -9 或直接宕机。
- 服务端剔除(evict)过期任务的执行周期,
evictionIntervalTimerInMs=60s
- 剔除任务会对90s内未发起续约的请求进行剔除,
leaseExpirationDurationInSeconds=90s
- 服务端只读/读写缓存同步周期,
responseCacheUpdateIntervalMs=30s
- 客户端拉取最新注册表周期,
registryFetchIntervalSeconds=30s
- 客户端ribbon缓存serverList更新周期,
ServerListRefreshInterval=30s
服务异常下线被客户端感知的最大耗时为240s
下面将依次分析每个步骤所在源码的位置及实现。
四. 源码分析
服务端只读/读写缓存同步
ResponseCacheImpl
// responseCacheUpdateIntervalMs 默认30s
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
for (Key key : readOnlyCacheMap.keySet()) {
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
getCacheUpdateTask
的schedule每隔30s执行一次,遍历readOnlyCacheMap
中的每个key,从readWriteCacheMap
中取出最新值,保存到value中。
如果是新的key,readOnlyCacheMap
之前并没有缓存,则会在getValue
时,完成readOnlyCacheMap的填充。
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
客户端拉取最新注册表任务
DiscoveryClient
/**
* Initializes all scheduled tasks.
*/
// registryFetchIntervalSeconds 默认值30s
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
...
}
在client端启动过程中,DiscoveryClient的构造方法中初始化了多个schedule任务,其中一个就是开启周期拉取服务端注册表任务,周期时间为30s。执行任务是new CacheRefreshThread()
,拉取到的最新注册表会保存到本地缓存中localRegionApps。
AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>()
客户端ribbon缓存serverList更新
PollingServerListUpdater
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
// initialDelayMs 默认1s
// refreshIntervalMs 默认30s
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
在ribbon启动过程中,DynamicServerListLoadBalancer的构造方法调用了serverListUpdater.start(updateAction);
开启了周期刷新serverList的任务,每隔30s执行一次。
执行的任务是doUpdate()
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
这里在获取最新的server注册表时,使用的是eureka缓存的值,localRegionApps
或remoteRegionVsApps
,并没有发起远程拉取注册表的请求。将更新后的serverList缓存到BaseLoadBalancer父类中
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
我们在自定义ribbon rule时,继承AbstractLoadBalancerRule
,即可直接通过getLoadBalancer()
来获取当前的serverList。
服务端剔除(evict)过期任务
AbstractInstanceRegistry
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
}
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
// EvictionIntervalTimerInMs 默认60s
evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs());
}
在EurekaBootStrap
初始化eureka上下文过程中,内部调用AbstractInstanceRegistry.postInit
开启EvictionTask
, 每隔60s执行一次剔除任务。在剔除过程中会计算每次的补偿时间(compensationTimeMs),防止因为gc或时钟回拨等因素产生误差。
剔除任务会对90s内未发起续约的请求进行剔除
AbstractInstanceRegistry
public void evict(long additionalLeaseMs) {
// ...
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 这里用于判定是否过期
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// ... 后续将过期的实例随机剔除一部分,不超过总实例数的15%。
}
// 判定是否过期,additionalLeaseMs为补偿时间
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
通过判定过期时间可以发现,两次的续约时间差需要小于(duration + additionalLeaseMs),假如不考虑补偿因素,那么续约时间差需小于duration
。
再看下duration
是怎么来的。
public Lease(T r, int durationInSecs) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = (durationInSecs * 1000);
}
首先,duration是通过Lease的构造方法赋值,而lease对象的取值是从registry
缓存中获得的,registry
缓存则是在实例注册方法实现中进行保存。
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
//...
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
//...
}
通过实例注册的方法实现可以看到,判定过期周期时间是在InstanceInfo
中定义的,如果为空,则使用默认值Lease.DEFAULT_DURATION_IN_SECS
是90s。
而InstanceInfo
是由客户端侧将本次注册的实例信息传递来,所以继续看客户端对于InstanceInfo
的封装过程。
客户端的注册实现主要在DiscoveryClient中。
InstanceInfo myInfo = applicationInfoManager.getInfo();
// ...
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
EurekaClientAutoConfiguration
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
注册参数instanceInfo是在注入过程中保存到了ApplicationInfoManager中,ApplicationInfoManager在创建时,会通过InstanceInfoFactory
工厂来创建一个InstanceInfo的实例,duration
则定义在了factory构造中。
public class InstanceInfoFactory {
public InstanceInfo create(EurekaInstanceConfig config) {
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
// Builder the instance information to be registered with eureka server
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
// ... build各种参数
InstanceInfo instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
return instanceInfo;
}
// ...
}
可以看到,这里使用config.getLeaseExpirationDurationInSeconds()
作为duration的值,在EurekaInstanceConfigBean
中配置了duration的值,默认90s
private int leaseExpirationDurationInSeconds = 90;
至此我们可以确定,客户端默认使用90s作为实例剔除的过期时间。
五. 配置优化
经过上面的分析,我们了解到各个情况服务感知的所有延迟点,以及实现原理。在此基础上,我们可以对真实场景下eureka参数配置进行适当优化。
主要解决两个问题:
- 减少客户端的感知时间
- 保证客户端可以正确访问已从erueka下线的实例,不会因为其下线而缓存未及时更新导致失败
1. 减少客户端的感知时间
eureka server 配置
服务端只读/读写缓存同步周期缩短到10s,因为只限于内存间两个map的操作,可以大幅缩短缓存同步时间 (30s -> 10s)
eureka.server.response-cache-update-interval-ms=10000
eureka client 配置
客户端拉取最新注册表周期缩短到10s,因为客户端每次只会主动拉取增量配置,这里也适当缩短拉取时间 (30s -> 10s)
eureka.client.registry-fetch-interval-seconds=10
客户端ribbon缓存serverList更新周期缩短到5s,ribbon的更新只会进行内存间的同步,这里可以大幅度缩短时间 (30s -> 5s)
ribbon.ServerListRefreshInterval=5000
此时,服务上线感知时间最大耗时 25s
2. 旧实例从eureka server下线后继续保持可用(平滑启动)
eureka为了保证调用的高效率和高可用性,在内部模型中加入了各级缓存(包括ribbon),这就导致如果旧实例下线后,如果客户端没有及时把旧实例地址剔除,请求仍然可以被打到下线实例上导致报错。
结合上文的内容,如果想实现平滑启动需要完成以下几步:
- 旧实例shutdown前,eureka client需要感知实例即将关闭,并及时告知eureka server即将下线
- 调用方需要尽可能快速感知到旧实例的状态变化
- 旧实例从发送下线通知到彻底shutdown这个周期需要被拉长,来保证客户端更新缓存前,请求打到此实例上依然可以处理。
继续从源码层面分析这三步该如何实现。
2.1 eureka client感知实例即将关闭
spring通过在启动过程注册shutdown hook,当实例关闭前,会发送ContextClosedEvent
事件。
关于Spring处理服务关闭的详细过程请参考 Spring 源码分析 —— 服务优雅关闭
eureka会监听ContextClosedEvent
事件,来完成通知server端下线的操作。
EurekaAutoServiceRegistration
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof WebServerInitializedEvent) {
onApplicationEvent((WebServerInitializedEvent) event);
}
else if (event instanceof ContextClosedEvent) {
onApplicationEvent((ContextClosedEvent) event);
}
}
public void onApplicationEvent(ContextClosedEvent event) {
if (event.getApplicationContext() == context) {
stop();
}
}
@Override
public void stop() {
this.serviceRegistry.deregister(this.registration);
this.running.set(false);
}
}
EurekaAutoServiceRegistration
实现SmartApplicationListener
接口来监听ContextClosedEvent
事件,最终会调用deregister
。
除此之外,在Spring 源码分析 —— 服务优雅关闭 一篇分析到spring的shutdown hook除了发送ContextClosedEvent
事件之外,还会调用所有的lifecycle
的stop方法,实现所有lifecycle的关闭动作,所以这里的stop
方法也会在事件处理完成之后再次被调用,最终也会调用deregister
。
@Override
public void deregister(EurekaRegistration reg) {
if (reg.getApplicationInfoManager().getInfo() != null) {
if (log.isInfoEnabled()) {
log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status DOWN");
}
reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
}
}
deregister
方法将ApplicationInfo
中的实例status修改成DOWN,eureka内部会监听状态变更事件发送给eureka server端,此时server是可以及时更新实例状态为DOWN。
2.2 调用方尽快感知server中实例新状态
上文中也讲到,调用方服务有两层缓存,分别是eureka客户端拉取server列表的localRegionApps
缓存和ribbon的serverList
缓存,为此我们将更新周期分别改成了10s和5s
eureka.client.registry-fetch-interval-seconds=10
ribbon.ServerListRefreshInterval=5000
这样客户端最大的感知时间就是15s,我们需要保证在这15s内,访问旧实例不会失败,因此需要拉长旧实例的下线时间。
2.3 拉长旧实例的下线时间
想要拉长下线时间比较容易,通过sleep就可以,但是有个前提:必须保证server端的实例状态已经为DOWN, 且servlet容器没有被停止,在这个阶段的sleep才有意义。
如何来保证sleep的时机,需要继续深入分析eureka的实现。
如何确定sleep线程的时机
前文已经提到,eureka中EurekaAutoServiceRegistration
分别实现了SmartLifecycle
, SmartApplicationListener
,因此有两个入口来感知关闭事件,分别是
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof WebServerInitializedEvent) {
onApplicationEvent((WebServerInitializedEvent) event);
}
else if (event instanceof ContextClosedEvent) {
onApplicationEvent((ContextClosedEvent) event);
}
}
这两个入口的调用先后顺序是如何的?
在Spring 源码分析 —— 服务优雅关闭中有提到,AbstractApplicationContext#doClose()
方法的实现中,会先发送ContextClosedEvent
事件,再通过lifecycleProcessor
调用所有lifecycle的stop
。
AbstractApplicationContext
protected void doClose() {
// ...
try {
// Publish shutdown event.
// 发布ContextClosedEvent关闭事件
publishEvent(new ContextClosedEvent(this));
}
catch (Throwable ex) {
logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
}
// Stop all Lifecycle beans, to avoid delays during individual destruction.
// 调用所有lifecycle子类bean的关闭方法
if (this.lifecycleProcessor != null) {
try {
this.lifecycleProcessor.onClose();
}
catch (Throwable ex) {
logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
}
}
// ...
}
可以得知,listener的处理先于lifecycle的处理
了解了eureka停止时机之后,还需要看看servlet容器是何时停止的。
servlet容器(Tomcat)何时被停止
从这张继承关系图可以发现,除了EurekaAutoServiceRegistration
之外,还有两个WebServer的bean,WebServerGracefulShutdownLifecycle
、WebServerStartStopLifecycle
也同样实现了SmartLifecycle
WebServerStartStopLifecycle
负责webServer的启动和停止
WebServerStartStopLifecycle
只负责webServer的优雅停止(默认不执行)
class WebServerStartStopLifecycle implements SmartLifecycle {
private final ServletWebServerApplicationContext applicationContext;
private final WebServer webServer;
private volatile boolean running;
WebServerStartStopLifecycle(ServletWebServerApplicationContext applicationContext, WebServer webServer) {
this.applicationContext = applicationContext;
this.webServer = webServer;
}
@Override
public void start() {
this.webServer.start();
this.running = true;
this.applicationContext
.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext));
}
@Override
public void stop() {
this.webServer.stop();
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE - 1;
}
}
class WebServerGracefulShutdownLifecycle implements SmartLifecycle {
private final WebServer webServer;
private volatile boolean running;
WebServerGracefulShutdownLifecycle(WebServer webServer) {
this.webServer = webServer;
}
@Override
public void start() {
this.running = true;
}
@Override
public void stop() {
throw new UnsupportedOperationException("Stop must not be invoked directly");
}
@Override
public void stop(Runnable callback) {
this.running = false;
this.webServer.shutDownGracefully((result) -> callback.run());
}
@Override
public boolean isRunning() {
return this.running;
}
}
SmartLifecycle实现类的执行顺序是如何的?
他们的执行顺序是由getPhase
的值决定的。在spring启动过程中,会根据phase的值从小到大执行,在停止过程中,会从大到小执行(注意是相反的),具体的实现在DefaultLifecycleProcessor
中。
SmartLifecycle | phase |
---|---|
EurekaAutoServiceRegistration | 0 |
WebServerStartStopLifecycle | Integer.MAX_VALUE - 1 |
WebServerGracefulShutdownLifecycle | Integer.MAX_VALUE |
启动顺序: EurekaAutoServiceRegistration
-> WebServerStartStopLifecycle
-> WebServerGracefulShutdownLifecycle
停止顺序: WebServerGracefulShutdownLifecycle
-> WebServerStartStopLifecycle
-> EurekaAutoServiceRegistration
结合上面的分析,整个shutdown的执行顺序如下图:
至此,我们可以在图中插入点1
、插入点2
完成sleep的操作。
但是因为WebServerGracefulShutdownLifecycle
已经是最高优先级了,如果我们默认没有开启优雅关闭,可以在插入点2
实现SmartLifecycle
并配置最高优先级,否则为了稳妥和保证扩展性,更应该在插入点1
来完成。
插入点1: SmartApplicationListener实现
@Slf4j
public class UnawareBootListener implements SmartApplicationListener {
// server读写cache的同步周期
public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
// eureka.client 配置
private final EurekaClientOptimizeConfigBean eurekaConfig;
// ribbon 配置
private final RibbonOptimizeConfigBean ribbonConfig;
public UnawareBootListener(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
this.eurekaConfig = eurekaConfig;
this.ribbonConfig = ribbonConfig;
}
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return ContextClosedEvent.class.isAssignableFrom(eventType);
}
// eureka是0,这里设置成1,比eureka低1级
@Override
public int getOrder() {
return 1;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
}
Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
}
int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000)
+ DEFAULT_EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
try {
Thread.sleep(shutDownWaitTime * 1000L);
} catch (InterruptedException e) {
log.warn("UnawareBootListener wait to shutdown interrupted");
}
log.info("UnawareBootListener wait to shutdown seconds: {}s finish", shutDownWaitTime);
}
}
插入点2: lifecycle实现
@Slf4j
public class UnawareBoot implements SmartLifecycle {
// server读写cache的同步周期
public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
// eureka.client 配置
private final EurekaClientOptimizeConfigBean eurekaConfig;
// ribbon 配置
private final RibbonOptimizeConfigBean ribbonConfig;
private volatile boolean running = false;
public UnawareBoot(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
this.eurekaConfig = eurekaConfig;
this.ribbonConfig = ribbonConfig;
}
@Override
public void start() {
running = true;
}
@Override
public void stop() {
running = false;
Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
}
Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
}
int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000) + EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
try {
Thread.sleep(shutDownWaitTime * 1000L);
} catch (InterruptedException e) {
log.warn("UnawareBoot wait to shutdown interrupted");
}
log.info("UnawareBoot wait to shutdown seconds: {}s finish", shutDownWaitTime);
}
/**
* 设置最高优先级,stop时优先阻塞
*/
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public boolean isRunning() {
return running;
}
}
看上去很完美,整个思路没有问题,确实可以解决平滑重启的问题,但是中间少考虑的一点,就是eureka client续约心跳任务。如果当前代码在sleep之前,client先发送了续约请求,那样同步给server的状态就从DOWN变成了UP。
漏洞修复
心跳续约任务实现
InstanceInfoReplicator
public void run() {
try {
// 刷新实例状态,也就是这个方法将之前的DOWN转为了UP
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
DiscoveryClient
void refreshInstanceInfo() {
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
InstanceInfoReplicator
心跳续约任务会每隔replicationIntervalSeconds
(默认30s),向server同步当前状态,同步之前会计算当前的最新状态。计算状态由DiscoveryClient.getHealthCheckHandler().getStatus()
完成。
EurekaHealthCheckHandler
@Override
public InstanceStatus getStatus(InstanceStatus instanceStatus) {
return getHealthStatus();
}
protected InstanceStatus getHealthStatus() {
final Status status;
// statusAggregator默认会在初始化时注入
if (statusAggregator != null) {
status = getStatus(statusAggregator);
}
else {
status = getStatus(getHealthIndicator());
}
return mapToInstanceStatus(status);
}
protected Status getStatus(StatusAggregator statusAggregator) {
Status status;
Set<Status> statusSet = new HashSet<>();
if (healthIndicators != null) {
statusSet.addAll(
healthIndicators.values().stream().map(HealthIndicator::health)
.map(Health::getStatus).collect(Collectors.toSet()));
}
if (reactiveHealthIndicators != null) {
statusSet.addAll(reactiveHealthIndicators.values().stream()
.map(ReactiveHealthIndicator::health).map(Mono::block)
.filter(Objects::nonNull).map(Health::getStatus)
.collect(Collectors.toSet()));
}
// 这个方法会将set集合中的每个status进行排序,返回order最低的一个set
status = statusAggregator.getAggregateStatus(statusSet);
return status;
}
这个getStatus
会计算当前最新的状态,计算的方式遍历所有的healthIndicators
,基于当前实例的各种状态、参数、数据库状态等分别计算Status,构成一个Set<Status>集合。
SimpleStatusAggregator
@Override
public Status getAggregateStatus(Set<Status> statuses) {
return statuses.stream().filter(this::contains).min(this.comparator).orElse(Status.UNKNOWN);
}
SimpleStatusAggregator
会将Set集合进行排序,返回order最低的一个set,默认顺序从低到高依次是
DOWN -> OUT_OF_SERVICE -> UP -> UNKNOWN
如果一切正常,这里比较后的状态就是UP,重新设置到InstanceInfo
中,变更事件会将此次变更发给server,server中的实例状态就被更新为UP。
解释完心跳续约的过程之后,我们知道,如果只是单纯依赖变更事件去同步server实例DOWN的状态是不严谨的。需要彻底将eureka shutdown才可以。
shutdown eureka client
先看下eureka自己是怎么实现shutdown的。
EurekaClientAutoConfiguration
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
在EurekaClient
的定义中,指定了destroyMethod
属性,当bean在被回收时,会调用此方法。
DiscoveryClient
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
// 从applicationInfoManager移除事件变更监听器
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 取消所有的定时任务
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) {
// 设置状态为DOWN,并主动发送注销请求cancel到server端,这里不在依赖监听器发送
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
// 关闭Transport client
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
// 关闭各种Monitor
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient");
}
}
private void cancelScheduledTasks() {
// 停止心跳续约任务
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
// 停止心跳续约执行器
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
// 停止定时拉取server注册表执行器
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
// 停止ScheduledExecutorService
if (scheduler != null) {
scheduler.shutdownNow();
}
// 停止定时拉取server注册表任务
if (cacheRefreshTask != null) {
cacheRefreshTask.cancel();
}
// 停止心跳任务
if (heartbeatTask != null) {
heartbeatTask.cancel();
}
}
shutdown
主要完成了几件事:移除事件变更监听器、停止所有的定时任务、设置实例状态为DOWN、发起注销请求、关闭Transport client
了解了整个关闭过程之后,如果我们想彻底保证server的注册表处于DOWN的状态,只需要手动调用DiscoveryClient.shutdown()
。
准确来说shutdown之后的server注册表已经把当前实例下掉了,不再显示DOWN状态。
五. 平滑启动完整版实现
@Slf4j
public class UnawareBootListener implements SmartApplicationListener, ApplicationContextAware {
// server读写cache的同步周期
public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
// eureka.client 配置
private final EurekaClientOptimizeConfigBean eurekaConfig;
// ribbon 配置
private final RibbonOptimizeConfigBean ribbonConfig;
public UnawareBootListener(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
this.eurekaConfig = eurekaConfig;
this.ribbonConfig = ribbonConfig;
}
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return ContextClosedEvent.class.isAssignableFrom(eventType);
}
//之前指定order的值也可以忽略了,都已经主动shutdown,不需要在关心listener的顺序
@Override
public void onApplicationEvent(ApplicationEvent event) {
DiscoveryClient discoveryClient = applicationContext.getBean(DiscoveryClient.class);
// 主动触发eureka client shutdown
discoveryClient.shutdown();
Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
}
Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
}
int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000)
+ DEFAULT_EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
try {
Thread.sleep(shutDownWaitTime * 1000L);
} catch (InterruptedException e) {
log.warn("UnawareBootListener wait to shutdown interrupted");
}
log.info("UnawareBootListener wait to shutdown seconds: {}s finish", shutDownWaitTime);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
六. 总结
本文从源码层面剖析了eureka内部如何感知实例上下线,如何刷新缓存等,并给出了解决平滑启动的最佳实践。
写在文末:
实践代码虽然只有区区几十行,但至少需要了解上百倍代码量实现。了解如何启动、如何刷新、如何停止,考虑前后依赖的各种组件,前后耗时一个月,花了十几个小时,才写出这几十行代码。