1. 服务注册
ApplicationResource的addInstance(InstanceInfo info,String isReplication)方法负责接受Eureka Client的服务注册。addInstance()方法调用PeerAwareInstanceRegistryImpl的reigister()方法,此方法先调用其父类即AbstractInstanceRegistry类的reigister()方法,然后调用本类的replicateToPeers()方法。AbstractInstanceRegistry类的reigister()方法是Eureka Server接受服务注册的核心方法。此方法的逻辑为:
- 将此InstanceInfo以Lease对象的形式放入缓存集合registry中。
- 使ResponseCacheImpl中缓存的此InstanceInfo对应的Application失效。
- 属性expectedNumberOfClientsSendingRenews的值加1。
- 重新计算numberOfRenewsPerMinThreshold的值。
replicateToPeers()方法则负责将注册上来的InstanceInfo同步至Eureka Server集群中的其他节点。如果isReplication不为true则表示是Eureka Server集群中的其他节点同步过来的而不是Eureka Client发起的服务注册,此时就不需要同步到Eureka Server集群中的其他节点了。
所以,对于Eureka Server而言,服务注册主要做了如下三件事情:
- 把InstanceInfo缓存至本地。
- 重新计算numberOfRenewsPerMinThreshold的值,这个会影响到Eureka Server的自我保护机制。
- 把InstanceInfo同步至Eureka Server集群中的其他节点。
值得注意的一点是,当Eureka Client注册的时候如果Eureka Server的registry已经缓存了此实例,则会通过比较Eureka Client新注册的实例和已经存在的实例的lastDirtyTimestamp大小来决定是否更新registry中的缓存,lastDirtyTimestamp为即实例的最后修改时间,如果已经存在的实例的lastDirtyTimestamp大Eureka Client新注册的实例的lastDirtyTimestamp则不会更新registry中此实例的缓存。
2. 服务发现
即获取服务注册列表,分为全量获取和增量获取:
全量获取
Eureka Client在启动时和增量获取注册列表失败时都会进行全量服务注册列表获取。ApplicationsResource的getContainers()方法负责接受Eureka Client的调用并返回所有的服务列表。此方法首先会通过ResponseCacheImpl从缓存中获取,如果缓存中没有则最终会调用AbstractInstanceRegistry的getApplicationsFromMultipleRegions()方法获取所有的服务注册列表并缓存至ResponseCacheImpl的缓存中。getApplicationsFromMultipleRegions()方法从AbstractInstanceRegistry的集合属性registry中获取所有的服务注册列表。增量获取
Eureka Client的CacheRefreshThread任务线程会每隔30s从Eureka Server获取变更的服务注册列表,这里说的变更是指新增、修改以及删除的InstanceInfo。Eureka Server端的类ApplicationsResource的getContainerDifferential()方法负责返回最近3分钟(默认)内变更的InstanceInfo,那么Eureka Server是如何收集最近3分钟内变更的InstanceInfo集合的呢?答案是类AbstractInstanceRegistry中的两个属性:集合recentlyChangedQueue和定时任务evictionTimer;
集合recentlyChangedQueue
集合recentlyChangedQueue的类型为ConcurrentLinkedQueue<RecentlyChangedItem>,当Eureka Server收到Eureka Client的注册、下线、变更时就会对此集合进行增、删、改操作,类RecentlyChangedItem保存了一个lastUpdateTime属性和一个leaseInfo属性,分别表示上一次修改的时间戳和InstanceInfo实例。-
定时任务evictionTimer
定时任务evictionTimer会每隔3分钟遍历一次集合recentlyChangedQueue,把那些lastUpdateTime到当前时间差超过3分钟的RecentlyChangedItem从集合recentlyChangedQueue中删除。所以集合recentlyChangedQueue即为最近3分钟内变更的InstanceInfo集合。AbstractInstanceRegistry中的getApplicationDeltasFromMultipleRegions()就是通过此集合返回增量服务列表的。
3. 服务续约
Eureka Client的HeartbeatThread线程会每隔30s向Eureka Server发送一次心跳。Eureka Server端类InstanceResource的renewLease()方法负责Eureka Client的续约请求。Eureka Server在接受到Eureka Client的续约请求后的逻辑:
检测InstanceInfo的状态
如果Eureka Client对应的InstanceInfo不存在或者InstanceInfo的为UNKONW时则直接返回404,Eureka Client收到404就会重新发起一次服务注册。续约计数器加1
如果Eureka Client对应的InstanceInfo正常,续约计数器加1,此计数器维护在类AbstractInstanceRegistry的一个类型为MeasuredRate的属性renewsLastMin中,每次续约都会调用MeasuredRate的increment()方法,此举和Eureka Server的自我保护机制有关。更新实例的lastUpdateTimestamp
通过调用LeaseInfo的renew()方法实现,这个属性和实例的过期剔除有关。-
续约同步至Eureka Server集群中的其他节点
Eureka Server向集群中除了自己以外的所有节点发起续约同步,这里有两种非常重要的异常情况的处理:对方返回404
当对方返回404时说明对方没有缓存此实例或者实例信息不是最新的(对方的lastDirtyTimestamp小于发起方),此时Eureka Server会向对方发起一次服务注册。对方返回冲突
当对方实例信息比发起方新,即对方lastDirtyTimestamp比发起方大,此时当以对方的实例信息为准,将对方返回的实例信息更新至本地并同步至集群其他节点,所以Eureka Server之间在同步实例信息的时候,如果出现数据不一致的情况则以lastDirtyTimestamp最大的为准。
所以Eureka Server之间的续约同步不仅仅是简单的续约的操作,还涉及到数据的一致性同步操作。相关源码如下:
public void heartbeat(final String appName, final String id, final InstanceInfo info, final InstanceStatus overriddenStatus, boolean primeConnection) throws Throwable { if (primeConnection) { replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); return; } ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) { @Override public EurekaHttpResponse<InstanceInfo> execute() throws Throwable { return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); } @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { if (info != null) { // 对方返回404,向对方发起服务注册 register(info); } } else if (config.shouldSyncWhenTimestampDiffers()) { InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity; if (peerInstanceInfo != null) { // 如果冲突同步实例信息 syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo); } } } }; // 略 } private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) { try { if (infoFromPeer != null) { if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) { infoFromPeer.getOverriddenStatus()); registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus()); } // 以对方返回的实例信息为准更新本地并同步至集群其他节点 registry.register(infoFromPeer, true); } } catch (Throwable e) { logger.warn("Exception when trying to set information from peer :", e); } }
对比lastDirtyTimestamp
比对Eureka Client上送的lastDirtyTimestamp和Eureka Server缓存的lastDirtyTimestamp,如果前者大于后者则直接返回404,Eureka Client收到404返回重新发起一次服务注册。lastDirtyTimestamp其实就是实例的最后修改时间,当Eureka Client的lastDirtyTimestamp大于Eureka Server缓存的lastDirtyTimestamp时说明Eureka Server缓存的实例信息不是最新的,此时强制Eureka Client发起注册是Eureka保持数据一致性的一种重要机制。
4. 服务下线
Eureka Client下线的时候会调用Eureka Server的服务下线接口,类InstanceResource的cancelLease()方法负责下线对应的InstanceInfo。类PeerAwareInstanceRegistryImpl的cancel()方法首先通过调用夫类AbstractInstanceRegistry中的cancel()方法下线本地缓存的此InstanceInfo信息,然后调用本类的replicateToPeers()方法同步给Eureka Server集群中的其他节点。其主要流程如下:
- 从registry缓存集合中删除InstanceInfo。
- 放入最近变更实例队列recentlyChangedQueue中。
- 使ResponseCache缓存失效。
- 同步至Eureka Server集群中的其他节点。
- 客户端实例数量减1,即expectedNumberOfClientsSendingRenews值减1
- 重新计算过去1分钟内最低续约次数阈值,即numberOfRenewsPerMinThreshold的值。
相关源码如下:
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
// super.cancel为AbstractInstanceRegistry中的cancel()方法
if (super.cancel(appName, id, isReplication)) {
// 同步给Eureka Server集群中的其他节点
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews
// 客户端实例数量减1
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
// 重新计算过去1分钟内最低续约次数阈值
updateRenewsPerMinThreshold();
}
}
return true;
}
return false;
}
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
return internalCancel(appName, id, isReplication);
}
/**
* {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
* cancel request is replicated to the peers. This is however not desired for expires which would be counted
* in the remote peers as valid cancellations, so self preservation mode would not kick-in.
*/
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 删除registry中的缓存
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
// 放入最近下线实例队列
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
// 放入最近变更队列
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// ResponseCache缓存失效
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
5. 服务剔除
Eureka Server的EvictionTask任务每隔1分钟执行一次,如果保护机制没有开启则遍历registry中缓存的服务注册列表并通过调用LeaseInfo对象的isExpired()方法筛选出过期没有续约的实例,LeaseInfo对象保存了实例最近一次续约的时间戳以及续约超时时间(duration),续约超时时间默认90s。这里有个小bug,服务续约的时候Eureka Server是通过调用LeaseInfo的renew()方法刷新其最近一次续约的时间戳的,如下:
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
其实这里不应该加上duration的,再看isExpired()方法如何判断过期的:
/**
* Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
*
* Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than
* what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect
* instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will
* not be fixed.
*
* @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.
*/
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
这里在判断时候又加了一次duration,所以等于加了两次duration,那么过期时间也就从90s变成了180s,也就是说Eureka Client连续至少超过180s不续约才会过期。可以看到源码注释对这个小bug也做了说明,而且还说这个bug不会被修复。
另外一个需要注意的地方是,当筛选出的过期的实例的数量多于总实例数量的15%时最多剔除总实例数量的15%,如果都删除则破坏了Eureka Server自我保护机制。
剔除的具体逻辑如下:
- 从registry缓存集合中删除InstanceInfo。
- 放入最近变更实例队列recentlyChangedQueue中。
- 使ResponseCache缓存失效。
这里有两个需要注意的地方,第一就是实例的剔除并没有同步到Eureka Server集群中其他节点。第二剔除实例后并没有更新客户端的数量以及过去1分钟内最低续约次数阈值,即expectedNumberOfClientsSendingRenews和numberOfRenewsPerMinThreshold,这也是服务剔除和服务下线不同的地方。相关源码如下:
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
// 略
}
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
6. Eureka Server自我保护机制
1. 机制说明
Eureka Server的自我保护机制是说Eureka Server在某种情况下对其缓存的服务注册列表的一种保护行为。
某种情况是指在过去1分钟内所有客户端续约的次数总和低于其应当续约次数(单个客户端默认每分钟2次)总和的85%时。
一种保护行为是指在上述情况下Eureka Server的续约过期剔除任务不再从其服务注册列表中剔除续约过期的服务。
2. 机制实现
- 类AbstractInstanceRegistry的几个属性:
protected volatile int expectedNumberOfClientsSendingRenews;
protected volatile int numberOfRenewsPerMinThreshold;
private final MeasuredRate renewsLastMin;
-
expectedNumberOfClientsSendingRenews
客户端的数量,有如下几种情况此值会发生变化:- Eureka Server启动的时候会用从相邻节点获取的服务注册列表的数量初始化此值。
- 新的客户端注册的时候此值会加1。
- 有客户端下线的时候此值会减1。
- 默认每15分钟执行一次的任务,此任务会遍历所有实例统计有效的(registerable)实例的数量,当有效的实例的数量大于客户端总数量时赋值给expectedNumberOfClientsSendingRenews。
numberOfRenewsPerMinThreshold
一分钟内所有客户端最低续约总次数的阈值,此值的计算公式为:
numberOfRenewsPerMinThreshold = (int) (expectedNumberOfClientsSendingRenews
* (60.0 / config.getExpectedClientRenewalIntervalSeconds())
* config.getRenewalPercentThreshold())
默认参数下此值为: 客户端总数量 * 2 * 0.85,客户端每30s续约一次,所以每分钟是2次,0.85是一个默认的配置项。所以此值只会受客户端的数量也就是expectedNumberOfClientsSendingRenews的影响,因此expectedNumberOfClientsSendingRenews改变的时候都会重新计算此值。renewsLastMin
用于统计过去1分钟内所有客户端续约的总次数。类中有如下三个属性:AtomicLong currentBucket、AtomicLong lastBucket、Timer timer,currentBucket用来记录当前客户端续约次数,每当客户端续约一次,此值就会加1。timer每隔1分钟执行一次把currentBucket的值赋值给lastBucket并将currentBucket设置为0。方法getCount()直接返回lastBucket的值作为过去1分钟内客户端续约的总次数。
-
定时任务-EvictionTask
此任务每隔1分钟执行一次AbstractInstanceRegistry的evict()方法,此方法用于剔除续约过去的服务实例,方法首先会调用isLeaseExpirationEnabled()方法判断是否启用续约过去剔除,判断方法为过去一分钟内客户端续约总次数是否大于最小续约总次数阈值,如果是则自我保护机制关闭,执行过期剔除操作,如果否则说明自我保护机制开启,直接return,不做过期剔除操作。相关源码如下:public void evict(long additionalLeaseMs) { if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // 省略了剔除服务实例的代码 } // 此方法判断是否启用续约过期剔除 @Override public boolean isLeaseExpirationEnabled() { if (!isSelfPreservationModeEnabled()) { // The self preservation mode is disabled, hence allowing the instances to expire. return true; } return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; } // 返回过去1分钟内客户端续约总次数 @Override public long getNumOfRenewsInLastMin() { return renewsLastMin.getCount(); }
7. 总结
- 每当Eureka Server接收到Eureka Client的服务注册时,首先会缓存此服务实例信息,其次将此服务实例信息同步至集群中其他节点。
- Eureka Server接收集群中其他节点同步的服务实例信息并缓存。
- Eureka Server接收到Eureka Client的服务注册以及Eureka Server节点之间相互同步服务实例信息,当和自身缓存有冲突时,通过比较实例信息最后修改时间的时间戳(lastDirtyTimestamp)大小来解决,以时间戳大的为准,此为数据一致性的处理方案。
- Eureka Server保存最近3分钟内发生变更的服务实例信息列表并在Eureka Client增量获取时返回。
- Eureka Server接收Eureka Client的服务续约调用,并将此续约同步至集群其他节点,当同步时,如果对方返回404则向其发起一次服务注册,当对方返回的实例的lastDirtyTimestamp较大时则以对方的实例信息为准更新自身缓存并同步至集群其他节点。
- Eureka Server会记录Eureka Client的总数量以及这些Client的总续约次数。
- Eureka Server接收Eureka Client的服务下线调用,Eureka Server从自身缓存中删除此服务实例并同步至集群其他节点。
- Eureka Server每隔60s执行一次服务剔除任务,如果自我保护机制没有开启则剔除那些在过去90s(官方文档90s,代码180s)都没有续约的服务实例。此操作不会在集群中同步。
- 过去1分钟内所有Client续约总次数小于等于应续约总次数的85%时自动保护机制开启,Eureka Server不会剔除任何服务。
相关类图: