Eureka Server作为一个开箱即用的服务中心,主要有以下功能:
- 服务注册;
- 接收服务心跳(续租);
- 服务剔除;
- 服务下线;
- 集群同步;
- 获取注册表中服务实例信息。
需要注意的是,Eureka Server本身也是一个Eureka Client,在不禁止其客户端行为时,他会向其他Eureka Server执行注册、发送心跳等操作。
源码和配置
其中重要的类为com.netflix.eureka.registry.AbstractInstanceRegistry
类。该类主管该Server下的Client的活动信息。另外一个接口com.netflix.eureka.registry.PeerAwareInstanceRegistry
,只有一个实现类,主管和peer之间的同步等操作。
数据结构
注册表是Eureka中的主要部分,在此简单分析一下他的数据结构。
服务注册
服务注册的默认租约时间leaseDuration为90s。主要实现位于com.netflix.eureka.registry.AbstractInstanceRegistry#register
方法中,其实现如下(以下所有的代码删除了logger):
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
//首先获取读锁,防止其他线程修改数据,但是可以读取数据
read.lock();
//registry 全局变量,ConcurrentHashMap类型,总注册信息
//通过APPName来获取服务集群的租约集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
//此处使用putIfAbsent而不是put,防止多线程获取readLock之后,覆盖别的线程添加的属性
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//获取Client实例的租约信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
//租约存在,且租约中的Client实例不为空
if (existingLease != null && (existingLease.getHolder() != null)) {
//旧租约的修改时间
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
//新租约的修改时间
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
//因为是多线程操作,所以新租约的时间不一定大于旧租约的时间
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} else {
// 租约不存在
synchronized (lock) {
//自我保护机制[1]
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1 for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
//继承上次的服务启动时间
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
//把不是UNKUOWN的,且新的实例放到overriddenInstanceStatusMap中
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// 根据覆盖状态规则设置服务的状态
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
//如果实例为UP状态,则设置服务启动时间(仅第一次有效)
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
//recentlyChangedQueue用于Client增量式获取注册表信息
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
//设置responseCache缓存过期,用于Client全量获取注册表信息
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
在注册方法中有诸多同步或安全的操作,下面简析一二:
开始read.lock(),使用read的原因,因为readLock是共享锁,所以有多线程注册多个服务时,并行执行。对应的writeLock排它锁只用在获取增量数据时,因为获取增量数据时要保证数据此时不可更改,用writeLock禁止其他线程的read操作。readLock都是对内操作,writeLock是对外要保持统一口径。
代码中涉及的全局变量Map都是ConcurrentMap。保证线程安全,且全局共享。
在向registry中存值的时候,并没有强制性覆盖,用
putIfAbsent
代替put
方法,两者都会返回key对应的旧值,但前者并不会覆盖。然后用之前的值来进行操作(因为是根据实例名获取的,所以后面只对其租约信息进行修改)。自我保护机制处使用了synchronized+volatile:
- 对于多个修改变量,采用synchronized空对象;
- 因为变量要在别的方法中进行计算安全机制,所以需要volatile关键字,保证该字段随时可见。
(此处的字段不用CAS,因为涉及乘法,CAS对乘法不太友好。)
- 对recentRegisteredQueue的修改,锁定该对象,防止所有线程在别的方法中进行操作该值。
- 缓存类变量基本都是由guava的CacheBuilder来创建的,保证时效和容量等问题。
Tip:[1] 自我保护机制:随着Client注册数量的增加,期待每分钟维持心跳的数量也增加,eureka Server检查到每分钟续约的次数少于期待数量的阈值(默认0.85)时,就会认为出现异常情况,从而进入自我保护机制。默认2次/min/服务(基于心跳时间)。Eureka会保护这些实例,认为他们是健康的,不把他们移出注册列表。Eureka Server会输出红色的文字:
EMERGENCY!EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT.RENEWALS ARE LESSER THAN THRESHOLD AND HENCE THE INSTANCES ARE NOT BEGING EXPIRED JUST TO BE SAFE.
接受心跳服务
Client默认每隔30s向Server发起一次续租,也叫心跳服务。核心代码为com.netflix.eureka.registry.AbstractInstanceRegistry#renew
。
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
//如果没有发现租约,返回false
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
//获取租约中的实例信息
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
//获取服务的最终状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
//如果实例的状态和最终的覆盖状态不一致,设置新的状态
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
//最后一分钟的续租数(该续租数每60秒调度自动更新为0)
renewsLastMin.increment();
//修改最后续租时间
leaseToRenew.renew();
return true;
}
}
这部分有很多东西都是在register方法里出现过的,所以不再赘述。
服务剔除
核心代码是com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)
。该入口参数additionalLeaseMs 一般为0。此值是补偿时间,即如果出现GC或者时钟偏移等情况而补偿的时间。用来判断是否过期。
@Override
public void evict() {
evict(0l);
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
//是否开启租约保护[1]
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// 遍历所有节点,找到租约过期的节点
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
//可剔除的节点数 = 本地节点总数 - 阈值节点数
int evictionLimit = registrySize - registrySizeThreshold;
//保护机制 - 保证剔除最少量的节点
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
//随机获取一个节点,并打乱原有顺序(交换对应节点)
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
//服务下线
internalCancel(appName, id, false);
}
}
}
在打乱节点顺序部分,采用随机,是避免同一时间同一服务集群全部过期然后被剔除的现象,从而引发程序崩溃。
Tips:
[1] 是否开启租约保护:该值有两种情况:
- 与
eureka.server.enable-self-preservation
配置有关,默认为true,即不开启租约保护。为false时开启租约保护。 - 上述值为true时,期望每分钟的续租阈值数大于0且最后一分钟的续租数大于该期望值,即每分钟的续租数不能小于阈值数,则为true,否则开启保护机制,不准服务剔除。默认的阈值为0.85,通过
eureka.server.renewal.percent.threshold
来设置(必为double类型)。
服务下线
核心代码是com.netflix.eureka.registry.AbstractInstanceRegistry#internalCancel
。
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
return internalCancel(appName, id, isReplication);
}
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
//获取读锁
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
//recentCanceledQueue和之前的recentRegisteredQueue都是统计队列
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
//如果租约为空,直接返回false
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
return false;
} else {
//设置下线时间(也叫驱逐时间)
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
//添加到最近租约变更队列
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
//虚拟网络协议主机地址
vip = instanceInfo.getVIPAddress();
//安全的虚拟网络协议主机地址
svip = instanceInfo.getSecureVipAddress();
}
//清理缓存 - 使responseCache中指定属性移除
invalidateCache(appName, vip, svip);
return true;
}
} finally {
read.unlock();
}
}
获取注册表
该功能涉及多个Eureka Server。
从Client部分缓存注册表也了解到注册表有全量和增量两种方式。
全量获取注册表
该部分的核心代码是com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationsFromMultipleRegions
。
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
//查看是否有远程区域
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
Applications apps = new Applications();
apps.setVersion(1L);
//从本地registry上获取实例信息
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
//decorateInstanceInfo方法就是讲lease中holder转换为Instance
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
//从远程注册表上获取实例信息
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
入参remoteRegions为指定区域的数组集合。来源于配置eureka.server.remote-region-urls-with-name
,默认为空。
从多地区增量式获取注册表
增量获取的要义还是从最近修改的队列中,获取到最近修改的instanceInfo,从全局map中获取到这些实例的完整信息,然后将这些instanceInfo打包过去。
该部分的核心代码com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions
。
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
//获取写锁,排它锁
write.lock();
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(decorateInstanceInfo(lease));
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
}
}
}
}
}
}
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
集群同步
该功能涉及多个Eureka Server。
Eureka Server初始化本地注册表
这部分主要通过代码com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp
来完成。主要执行:
- 它会先从peer(同伴)节点上拉取注册信息。
- 并将其中的服务实例注册到本地注册表中。
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
//注册表重试等待时间
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//此处走com.netflix.eureka.registry.AbstractInstanceRegistry#register方法
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
在初始化本地注册表时,EurekaServer不会接受来自Client的通信请求(如注册、续租、获取注册表等)。在同步结束后,通过com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
方法来允许该Server接受流量。关于这两部分的调用逻辑,在com.netflix.eureka.EurekaBootStrap#initEurekaServerContext
代码中。而该方法调用时是EurekaBootStrap实现javax.servlet.ServletContextListener#contextInitialized
的内部。
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// 初始化自我保护机制统计参数
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
//判断是否是AWS环境,一般我们的环境是MyOwn环境
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
//修改服务实例状态为上线状态
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
//驱逐任务以及测试每分钟心率(租约率)的任务
super.postInit();
}
Eureka Server 之间注册信息表信息的同步复制
这部分代码核心在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
中,其包括下线,注册和续约等等。
下线
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
//调用 AbstractInstanceRegistry 的cancel
if (super.cancel(appName, id, isReplication)) {
//复制(同步)下线状态到同伴
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
//锁定空对象
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}
注册方法
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
//默认租约时间90s
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//如果租约为空,则用默认时间;若租约不为空,注册租约该有的剩余时间
//调用 AbstractInstanceRegistry 的renew
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
续租方法
public boolean renew(final String appName, final String id, final boolean isReplication) {
//调用 AbstractInstanceRegistry 的renew
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
同步到同伴节点
下面是复制到同伴的入口方法,代码是com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers
。
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
//Stopwatch是测试执行某些代码的时间
Stopwatch tracer = action.getTimer().start();
try {
//是否是复制的
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// 如果peer集群为空,或者本来就是复制操作,则不再执行,防止造成污染复制
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
//同步Instance到同伴节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
peerEurekaNodes是同伴Eureka Server节点。每个Eureka Server会向同伴同步数据。
同步到同伴的代码为com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers
。根据同步的Action操作,来分类同步。
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
同步操作只有以下几种:
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
......
}
上面的请求在com.netflix.eureka.transport.JerseyReplicationClient
中。如HeartBeat的请求在com.netflix.eureka.transport.JerseyReplicationClient#sendHeartBeat
中。可以查看路径等。这部分路径就和client中处理对应功能是一样的。