Eureka Client为了简化开发人员的开发工作,将很多与Eureka Server交互的工作隐藏起来,自主完成。在应用的不同运行阶段在后台完成工作如图所示。
1、服务发现客户端
为了理解Eureka Client的执行原理,首先需要对服务发现客户端com.netflix.discover.DiscoveryClient职能以及相关类进行讲解,它负责了与Eureka Server交互的关键逻辑。
1.1、DiscoveryClient职责
DiscoveryClient是Eureka Client的核心类,包括与Eureka Server交互的关键逻辑,具备了以下职能:
注册服务实例到Eureka Server中;
发送心跳更新与Eureka Server的租约;
在服务关闭时从Eureka Server中取消租约,服务下线;
查询在Eureka Server中注册的服务实例列表。
1.2、DiscoveryClient类结构
DiscoveryClient继承了LookupService接口,LookupService作用是发现活跃的服务实例,主要方法如下:
package com.netflix.discovery.shared;
import java.util.List;
import com.netflix.appinfo.InstanceInfo;
public interface LookupService<T> {
/**
*根据服务实例注册的appName来获取封装有相同appName的服务实例信息容器
*/
Application getApplication(String appName);
/**
*返回当前注册表中所有的服务实例信息
*/
Applications getApplications();
/**
*根据服务实例的id获取服务实例信息
*/
List<InstanceInfo> getInstancesById(String id);
InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}
Application持有服务实例信息列表,它可以理解成同一个服务的集群信息,这些服务实例都挂在同一个服务名appName下。InstanceInfo代表一个服务实例信息。Application部分代码如下:
public class Application {
private static Random shuffleRandom = new Random();
@Override
public String toString() {
return "Application [name=" + name + ", isDirty=" + isDirty
+ ", instances=" + instances + ", shuffledInstances="
+ shuffledInstances + ", instancesMap=" + instancesMap + "]";
}
private String name;
@XStreamOmitField
private volatile boolean isDirty = false;
@XStreamImplicit
private final Set<InstanceInfo> instances;
......
}
Applications是注册表中所有服务实例信息的集合,里面的操作大多也是同步操作。
EurekaCient在LookupService的基础上扩充了更多的接口,提供了更丰富的获取服务实例的方式,主要有:
提供了多种方式获取InstanceInfo,例如根据区域、Eureka Server地址等获取。
提供了本地客户端(所处的区域、可用区等)的数据,这部分与AWS密切相关。
提供了为客户端注册和获取健康检查处理器的能力。
除去查询相关的接口,我们主要关注EurekaClient中以下两个接口,代码如下所示:
public interface EurekaClient extends LookupService {
//为Eureka Client注册健康检查处理器
public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
// 监听Client服务实例信息的更新
public void registerEventListener(EurekaEventListener eventListener);
......
}
Eureka Server一般通过心跳(heartbeats)来识别一个实例的状态。Eureka Client中存在一个定时任务定时通过HealthCheckHandler检测当前Client的状态,如果Client的状态发生改变,将会触发新的注册事件,更新Eureka Server的注册表中该服务实例的相关信息。
HealthCheckHandler代码如下所示:
package com.netflix.appinfo;
/**
* This provides a more granular healthcheck contract than the existing {@link HealthCheckCallback}
*
* @author Nitesh Kant
*/
public interface HealthCheckHandler {
InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);
}
Eureka中的事件模式属于观察者模式,事件监听器将监听Client的服务实例信息变化,触发对应的处理事件
1.3、DiscoveryClient构造函数
在DiscoveryClient构造函数中,Eureka Client会执行从Eureka Server中拉取注册表信息、服务注册、初始化发送心跳、缓存刷新(重新拉取注册表信息)和按需注册定时任务等操作,可以说DiscoveryClient的构造函数贯穿了Eureka Client启动阶段的各项工作。
在构造方法中,忽略掉构造方法中大部分的赋值操作,我们将会逐步了解配置类中的属性会对DiscoveryClient的行为造成什么影响。部分代码如下所示:
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
config#shouldFetchRegistry(对应配置为eureka.client.fetch-register)为true表示Eureka Client将从Eureka Server中拉取注册表信息。config#shouldRegisterWithEureka(对应配置为eureka.client.register-with-eureka)为true表示Eureka Client将注册到Eureka Server中。如果上述的两个配置均为false,那么Discovery的初始化将直接结束,表示该客户端既不进行服务注册也不进行服务发现。
接着定义一个基于线程池的定时器线程池ScheduledExecutorService,线程池大小为2,一个线程用于发送心跳,另一个线程用于缓存刷新,同时定义了发送心跳和缓存刷新线程池,代码如下所示:
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
之后,初始化Eureka Client与Eureka Server进行HTTP交互的Jersey客户端
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
EurekaTransport是DiscoveryClient中的一个内部类,其内封装了DiscoveryClient与Eureka Server进行HTTP调用的Jersey客户端。
再接着从Eureka Server中拉取注册表信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
如果EurekaClientConfig#shouldFetchRegistry为true时,fetchRegistry方法将会被调用。在Eureka Client向Eureka Server注册前,需要先从Eureka Server拉取注册表中的信息,这是服务发现的前提。通过将Eureka Server中的注册表信息缓存到本地,就可以就近获取其他服务的相关信息,减少与Eureka Server的网络通信。
拉取完Eureka Server中的注册表信息后,将对服务实例进行注册,代码如下所示:
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
//发起服务注册
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
在服务注册之前会进行注册预处理,Eureka没有对此提供默认实现。构造函数的最后将初始化并启动发送心跳、缓存刷新和按需注册等定时任务。
最后总结一下,在DiscoveryClient的构造函数中,主要依次做了以下的事情:
1)相关配置的赋值,类似ApplicationInfoManager、EurekaClientConfig等。
2)备份注册中心的初始化,默认没有实现。
3)拉取Eureka Server注册表中的信息。
4)注册前的预处理,默认没有实现。
5)向Eureka Server注册自身。
6)初始化心跳定时任务、缓存刷新和按需注册等定时任务。
2、拉取注册表信息
在DiscoveryClient的构造函数中,调用了DiscoveryClient#fetchRegistry方法从Eureka Server中拉取注册表信息,方法执行如下所示:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 如果增量式拉取被禁止,或者applications == null,则进行全量拉取
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
//全量拉取注册信息
getAndStoreFullRegistry();
} else {
//增量拉取注册信息
getAndUpdateDelta(applications);
}
//计算应用集合一致性hash码
applications.setAppsHashCode(applications.getReconcileHashCode());
//打印注册表上实例的总数量
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
一般来讲,在Eureka客户端,除了第一次拉取注册表信息,之后的信息拉取都会尝试只进行增量拉取(第一次拉取注册表信息为全量拉取),下面将分别介绍拉取注册表信息的两种实现,全量拉取注册表信息DiscoveryClient#getAndStoreFullRegistry和增量式拉取注册表信息DiscoveryClient#getAndUpdateDelta。
2.1、全量拉取注册表信息
一般只有在第一次拉取的时候,才会进行注册表信息的全量拉取,主要在DiscoveryClient#getAndStoreFullRegistry方法中进行。
private void getAndStoreFullRegistry() throws Throwable {
// 获取拉取的注册表的版本,防止拉取版本落后(由其他的线程引起)
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
}
// 检查fetchRegistryGeneration的更新版本是否发生改变,无改变的话说明本次拉取是最新的
else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
//更新缓存
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
全量拉取将从Eureka Server中拉取注册表中所有的服务实例信息(封装在Applications中),并经过处理后替换掉本地注册表缓存Applications。
getAndStoreFullRegistry方法可能被多个线程同时调用,导致新拉取的注册表被旧的注册表覆盖(有可能出现先拉取注册表信息的线程在覆盖apps时被阻塞,被后拉取注册表信息的线程抢先设置了apps,被阻塞的线程恢复后再次设置了apps,导致apps数据版本落后),产生脏数据,对此,Eureka通过类型为AtomicLong的currentUpdateGeneration对apps的更新版本进行跟踪。如果更新版本不一致,说明本次拉取注册表信息已过时,不需要缓存到本地。拉取到注册表信息之后会对获取到的apps进行筛选,只保留状态为UP的服务实例信息。
2.2、增量式拉取注册表信息
增量式的拉取方式,一般发生在第一次拉取注册表信息之后,拉取的信息定义为从某一段时间之后发生的所有变更信息,通常来讲是3分钟之内注册表的信息变化。在获取到更新的delta后,会根据delta中的增量更新对本地的数据进行更新。与getAndStoreFullRegistry方法一样,也通过fetchRegistryGeneration对更新的版本进行控制。增量式拉取是为了维护Eureka Client本地的注册表信息与Eureka Server注册表信息的一致性,防止数据过久而失效,采用增量式拉取的方式减少了拉取注册表信息的通信量。Client中有一个注册表缓存刷新定时器专门负责维护两者之间信息的同步性。但是当增量式拉取出现意外时,定时器将执行全量拉取以更新本地缓存的注册表信息。具体代码如下所示:
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
//获取增量拉取失败,则进行全量拉取
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
//更新本地缓存
updateDelta(delta);
//计算合并后的Applications的appsHashCode(应用集合一致性哈希码)
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// 和Eureka Server传递的delta上的appsHashCode进行比较
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
//方法中将会执行拉取全量注册表信息操作。
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
3、服务注册
在拉取完Eureka Server中的注册表信息并将其缓存在本地后,Eureka Client将向Eureka Server注册自身服务实例元数据,主要逻辑位于Discovery#register方法中。register方法代码如下所示:
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
Eureka Client会将自身服务实例元数据(封装在InstanceInfo中)发送到Eureka Server中请求服务注册,当Eureka Server返回204状态码时,说明服务注册成功。
4、初始化定时任务
很明显,服务注册应该是一个持续的过程,Eureka Client通过定时发送心跳的方式与Eureka Server进行通信,维持自己在Server注册表上的租约。同时Eureka Server注册表中的服务实例信息是动态变化的,为了保持Eureka Client与Eureka Server的注册表信息的一致性,Eureka Client需要定时向Eureka Server拉取注册表信息并更新本地缓存。为了监控Eureka Client应用信息和状态的变化,Eureka Client设置了一个按需注册定时器,定时检查应用信息或者状态的变化,并在发生变化时向Eureka Server重新注册,避免注册表中的本服务实例信息不可用。
在DiscoveryClient#initScheduledTasks方法中初始化了三个定时器任务,一个用于向Eureka Server拉取注册表信息刷新本地缓存;一个用于向Eureka Server发送心跳;一个用于进行按需注册的操作。代码如下所示:
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
//注册表缓存刷新定时器
// 获取配置文件中刷新间隔,默认为30s,
//可以通过eureka.client.registry-fetch-interval-seconds进行设置
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
//发送心跳定时器,默认30秒发送一次心跳
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
4.1、缓存刷新定时任务与发送心跳定时任务
在DiscoveryClient#initScheduledTasks方法中,通过ScheduledExecutorService#schedule的方式提交缓存刷新任务和发送心跳任务,任务执行的方式为延时执行并且不循环,这两个任务的定时循环逻辑由TimedSupervisorTask提供实现。TimedSupervisorTask继承了TimerTask,提供执行定时任务的功能。它在run方法中定义执行定时任务的逻辑。具体代码如下所示:
public void run() {
Future<?> future = null;
try {
//执行定时任务
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
//等待任务执行结果
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
//执行完成,设置下次执行任务时间间隔
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
//执行任务超时
timeoutCounter.increment();
//设置下次执行任务时间间隔
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
//执行任务被拒绝
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
//统计其他的异常
throwableCounter.increment();
} finally {
//取消未结束的任务
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
TimedSupervisorTask通过这种不断循环提交任务的方式,完成定时执行任务的要求。
在DiscoveryClient#initScheduledTasks方法中,提交缓存刷新定时任务的线程任务为CacheRefreshThread,提交发送心跳定时任务的线程为HeartbeatThread。CacheRefreshThread继承了Runnable接口,代码如下所示:
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
//省略其他代码
//判断远程Region是否改变(即Eureka Server地址是否发生变化),决定进行全量拉取还是增量式拉取
boolean success = fetchRegistry(remoteRegionsModified);
//打印更新注册表缓存后的变化
//省略其他代码
}
HeartbeatThread同样继承了Runnable接口,该任务的作用是向Eureka Server发送心跳请求,维持Eureka Client在注册表中的租约。代码如下所示:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 调用HTTP发送心跳到Eureka Server中维持租约
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
// 重新注册
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
//续约成功
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
Eureka Server会根据续租提交的appName与instanceInfoId来更新注册表中的服务实例的租约。当注册表中不存在该服务实例时,将返回404状态码,发送心跳请求的Eureka Client在接收到404状态后将会重新发起注册;如果续约成功,将会返回200状态码。
4.2、按需指定注册任务
按需注册定时任务的作用是当Eureka Client中的InstanceInfo或者status发生变化时,重新向Eureka Server发起注册请求,更新注册表中的服务实例信息,保证Eureka Server注册表中服务实例信息有效和可用。按需注册定时任务的代码如下:
//定时检查服务的实例信息
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 监控应用的status变化,发生变化即可发起重新注册
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
// 注册应用状态改变监控器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 启动定时按需注册定时任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
5、服务下线
一般情况下,应用服务在关闭的时候,Eureka Client会主动向Eureka Server注销自身在注册表中的信息。DiscoveryClient中对象销毁前执行的清理方法如下所示:
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient");
}
}
在销毁DiscoveryClient之前,会进行一系列清理工作,包括注销ApplicationInfoManager中的StatusChangeListener、取消定时任务、服务下线和关闭Jersey客户端等。我们主要关注unregister服务下线方法,其实现代码如下所示:
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}