一. 前言
本文详细介绍了eureka server端关于服务治理的几个核心方法实现。主要实现逻辑集中在com.netflix.eureka.registry.AbstractInstanceRegistry
抽象类中。
client端注册逻辑参见Spring Cloud Eureka 源码分析 —— Client端
二. 服务治理
服务治理包括服务注册(register)、服务续约(renew)、服务获取(getApplication)、服务下线(cancel)、心跳同步(heartbeat)、服务剔除(evict)、自我保护
服务状态管理包括覆盖状态(overriddenStatus)、状态变更(statusUpdate)、删除覆盖状态(deleteStatusOverride)、规则计算
三. 源码分析
服务注册(register)
//com.netflix.eureka.registry.AbstractInstanceRegistry
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
* registrant 表示本次注册的实例信息
* leaseDuration 表示续约周期
* isReplication 表示本次是server间的同步(true),还是客户端主动注册(false)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// registry缓存了实例注册的信息,注册实现本质上就是将实例信息添加到register属性中。
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 计数器+1
REGISTER.increment(isReplication);
//初始化gMap
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
// 如果之前有缓存续约信息,比较两个对象的时间差
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
// 如果本次注册是更老的实例信息,就还使用上次缓存的对象
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 如果本次是第一次注册,expectedNumberOfClientsSendingRenews+1,并更新自我保护阈值
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 使用之前的服务开启时间
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 将这个实例添加到register缓存中
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
// recentRegisteredQueue是一个记录注册操作的队列,key是注册时间,value是客户端实例id,主要用于debug或统计使用
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
// 如果请求实例的覆盖状态不是UNKONWN,且之前map中没有缓存过,则保存请求的覆盖状态
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
// 如果之前map已经缓存过覆盖状态,则以map的覆盖状态为准,这是因为map缓存的值可以被statusUpdate方法调整,优先服务端的配置值
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
// 根据当前的覆盖状态和规则,计算当前实例状态,具体计算规则下文具体分析
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 如果是UP状态,就设置serviceUp的时间戳
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 设置操作类型为ADDED,表示添加到server端列表中
registrant.setActionType(ActionType.ADDED);
// 本次register添加到变更队列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新变更时间
registrant.setLastUpdatedTimestamp();
// 失效当前实例的缓存,以及all和all_delta
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
服务注册主要有以下几步:
- 初始化registry中的gMap(非必要)
- 更新expectedNumberOfClientsSendingRenews 并重新计算自我保护阈值
- registrant添加到gMap
- 更新overriddenInstanceStatusMap中的覆盖状态
- 计算并更新registrant的status
- 失效缓存
服务续约(renew)
/**
* Marks the given instance of the given app name as renewed, and also marks whether it originated from
* replication.
*
* @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
* appName 服务名
* id 服务实例id
* isReplication 表示本次是server间的同步(true),还是客户端主动注册(false)
*/
public boolean renew(String appName, String id, boolean isReplication) {
// 续约次数+1
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
// 根据服务名和实例id获取服务实例
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
// 如果没有找到,次数+1
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
// 获取缓存的实例信息
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 获取计算后的实例状态,计算方式下文介绍,如果此时状态是UNKNOWN,可能实例已经被删除
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
// 如果计算后的状态 != 实例状态,将计算后的状态作为实例状态
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
// 增加续约次数
renewsLastMin.increment();
// 更新Lease续约时间,会在后续判断续约是否过期时使用
leaseToRenew.renew();
return true;
}
}
服务续约主要有以下几步:
- 判断registry中是否存在当前实例id,不存在就直接返回
- 根据覆盖状态计算并更新registrant的status
- 更新续约时间
服务获取(getApplication)
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
// 是否允许读取只读cache的实例信息
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
// 只读cache没有取到,就从读写cache中获取
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
三级缓存
客户端在获取服务注册表时,默认使用三级缓存的机制获取。
readOnlyCacheMap (ConcurrentMap<Key, Value>)只读缓存
readWriteCacheMap (LoadingCache<Key, Value>) 读写缓存
register (ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>) 注册表缓存
读取数据先从只读缓存读取,只读缓存没有再从读写缓存读,读写缓存没有最后再从注册表读。
更新机制
register
更新机制是几乎每个服务操作都会对其对更新,有的是put或remove,有的是更新缓存对象的一些属性。
readWriteCacheMap
更新机制是通过invalidateCache
方法,或者通过任务自动过期,默认过期时间180s
readOnlyCacheMap
默认每隔30秒会从读写缓存中同步
通过下面代码可以看到invalidateCache
的过期实现和readOnlyCacheMap
的刷新任务。
@Override
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
invalidate(
// appName,全量apps, 增量apps都会过期
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
/**
* Invalidate the cache information given the list of keys.
*/
public void invalidate(Key... keys) {
for (Key key : keys) {
readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion : keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
...
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
// responseCacheUpdateIntervalMs 默认值30s
...
// 只读cache刷新任务
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
服务下线(cancel)
/**
* {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
* cancel request is replicated to the peers. This is however not desired for expires which would be counted
* in the remote peers as valid cancellations, so self preservation mode would not kick-in.
*/
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
// 计数器+1
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 从registry缓存中移除当前实例id
leaseToCancel = gMap.remove(id);
}
// 下线实例id加入到下线操作队列里
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
// 覆盖状态缓存移除当前实例id
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
// 如果register缓存中没有找到实例id,计数器+1
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
// 更新剔除时间戳
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
// 设置操作类型为DELETED,表示从server端列表中删除
instanceInfo.setActionType(ActionType.DELETED);
// 变更队列中添加本次剔除操作
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
// 更新变更时间戳
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 失效当前实例的缓存,以及all和all_delta
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
服务下线主要有以下几步:
- 从registry中将此实例id移除
- 从overriddenInstanceStatusMap中将此实例id移除
- 失效缓存
心跳同步(heartbeat)
private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
int responseStatus = response.getStatus();
Builder responseBuilder = new Builder().setStatusCode(responseStatus);
if ("false".equals(config.getExperimental("bugfix.934"))) {
if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) {
responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
}
} else {
if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode())
&& response.getEntity() != null) {
responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
}
}
return responseBuilder;
}
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
...
}
心跳同步主要用于peer server端之间最终一致性的数据同步。最终也调用到renew
方法。
服务剔除(evict)
// additionalLeaseMs 为补偿时间
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 如果当前处于自我保护状态,就不允许剔除
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
// 将所有的过期实例加入到list中
// 这段注释是说将所有过期实例加入到list中,以随机的方式剔除,防止在自我保护开启前就已经把某些服务的实例全部剔除掉
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
// 当前实例总数
int registrySize = (int) getLocalRegistrySize();
// 当前实例总数 * 自我保护阈值(0.85)
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 这里计算剔除数量,将应剔除的总数 和 最大剔除数量间 取最小值,每次任务最多只剔除15%的实例
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
// 通过随机的方式挑选出实例(洗牌算法)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
// 计数器+1
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 找到要剔除的实例,调用cancal方法将其下线
internalCancel(appName, id, false);
}
}
}
// 判断是否过期,需要加上补偿时间,解释见下文
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
// 检查是否可以剔除过期实例
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// 如果没有开启自我保护模式,直接返回true,表示允许过期剔除
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
// 如果开启了自我保护模式,且当前每分钟续约数 > 既定阈值,也允许剔除,否则表示当前处于自我保护状态
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
服务剔除主要有以下几步:
- 根据自我保护机制以及续约时间的判断找到所有过期实例
- 计算剔除数量,将应剔除的总数和最大剔除数量之间取最小值
- 使用洗牌算法找出要剔除的实例id
- 调用
internalCancel
方法将其下线
剔除时间补偿
/**
* compute a compensation time defined as the actual time this task was executed since the prev iteration,
* vs the configured amount of time for execution. This is useful for cases where changes in time (due to
* clock skew or gc for example) causes the actual eviction task to execute later than the desired time
* according to the configured cycle.
*/
long getCompensationTimeMs() {
// 当前时间戳
long currNanos = getCurrentTimeNano();
// 上次时间戳
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}
// 执行时间差
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
// 补偿时间 = 执行时间差 - 周期时间
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}
设置补偿时间的目的在于,如果在执行过程中出现gc或者时钟回退,会导致执行周期时间略大于配置周期时间,以至于出现为到期而被剔除的情况。通过时间补偿记录两次时间差,补偿一些时间损耗。
覆盖状态(overriddenStatus)
InstanceInfo有两个状态的表示字段,分别是status
和overriddenStatus
,分别表示实例的状态和覆盖状态,eureka定义覆盖状态的目的在于,在不改变真实client端状态的前提下,修改server端注册表的状态。
在大多数情况下,都是InstanceStatus.UP
和InstanceStatus.OUT_OF_SERVICE
直接做状态切换。InstanceStatus.UP
表示接收流量;InstanceStatus.OUT_OF_SERVICE
表示摘除流量。
说的更通俗一点,就是真实客户端保持不动,是正常状态,通过修改server端的覆盖状态,来达到让实例的InstanceStatus从UP
变更为OUT_OF_SERVICE
,从而实现摘除流量的目的。
// key是实例id
protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
.newBuilder().initialCapacity(500)
.expireAfterAccess(1, TimeUnit.HOURS)
.<String, InstanceStatus>build().asMap();
public enum InstanceStatus {
UP, // Ready to receive traffic
DOWN, // Do not send traffic- healthcheck callback failed
STARTING, // Just about starting- initializations to be done - do not
// send traffic
OUT_OF_SERVICE, // Intentionally shutdown for traffic
UNKNOWN;
public static InstanceStatus toEnum(String s) {
if (s != null) {
try {
return InstanceStatus.valueOf(s.toUpperCase());
} catch (IllegalArgumentException e) {
// ignore and fall through to unknown
logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
}
}
return UNKNOWN;
}
}
实例状态的计算覆盖规则
/**
* A single rule that if matched it returns an instance status.
* The idea is to use an ordered list of such rules and pick the first result that matches.
*
* It is designed to be used by
* {@link AbstractInstanceRegistry#getOverriddenInstanceStatus(InstanceInfo, Lease, boolean)}
*
* Created by Nikos Michalakis on 7/13/16.
*/
public interface InstanceStatusOverrideRule {
/**
* Match this rule.
*
* @param instanceInfo The instance info whose status we care about.
* @param existingLease Does the instance have an existing lease already? If so let's consider that.
* @param isReplication When overriding consider if we are under a replication mode from other servers.
* @return A result with whether we matched and what we propose the status to be overriden to.
*/
StatusOverrideResult apply(final InstanceInfo instanceInfo, final Lease<InstanceInfo> existingLease, boolean isReplication);
}
/**
* Container for a result computed by an {@link InstanceStatusOverrideRule}.
*
* Created by Nikos Michalakis on 7/13/16.
*/
public class StatusOverrideResult {
public static StatusOverrideResult NO_MATCH = new StatusOverrideResult(false, null);
public static StatusOverrideResult matchingStatus(InstanceInfo.InstanceStatus status) {
return new StatusOverrideResult(true, status);
}
// Does the rule match?
private final boolean matches;
// The status computed by the rule.
private final InstanceInfo.InstanceStatus status;
private StatusOverrideResult(boolean matches, InstanceInfo.InstanceStatus status) {
this.matches = matches;
this.status = status;
}
public boolean matches() {
return matches;
}
public InstanceInfo.InstanceStatus status() {
return status;
}
}
这里定义了覆盖规则的接口,每个规则实现类需要实现apply,来返回当前实例状态是否满足本规则,且如果满足,应该返回InstanceInfo.InstanceStatus
是什么。
eureka默认实现类的关系图:
默认有6种实现类,有一个
AsgEnabledRule
是AWS环境的实现类,这里就不多说明。主要分析图中这5种。
OverrideExistsRule
/**
* This rule checks to see if we have overrides for an instance and if we do then we return those.
*/
public class OverrideExistsRule implements InstanceStatusOverrideRule {
private Map<String, InstanceInfo.InstanceStatus> statusOverrides;
public OverrideExistsRule(Map<String, InstanceInfo.InstanceStatus> statusOverrides) {
this.statusOverrides = statusOverrides;
}
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
InstanceInfo.InstanceStatus overridden = statusOverrides.get(instanceInfo.getId());
// If there are instance specific overrides, then they win - otherwise the ASG status
if (overridden != null) {
logger.debug("The instance specific override for instance {} and the value is {}",
instanceInfo.getId(), overridden.name());
return StatusOverrideResult.matchingStatus(overridden);
}
return StatusOverrideResult.NO_MATCH;
}
该规则使用statusOverrides
缓存的值作为返回值,而statusOverrides
其实就是AbstractInstanceRegistry.overriddenInstanceStatusMap
,同时该规则每次调用都会刷新overriddenInstanceStatusMap
的有效期,来保证不会过期。
AlwaysMatchInstanceStatusRule
/**
* This rule matches always and returns the current status of the instance.
*/
public class AlwaysMatchInstanceStatusRule implements InstanceStatusOverrideRule {
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
logger.debug("Returning the default instance status {} for instance {}", instanceInfo.getStatus(),
instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
该规则忽视覆盖状态,直接按照当前实例的状态返回。
LeaseExistsRule
/**
* This rule matches if we have an existing lease for the instance that is UP or OUT_OF_SERVICE.
*/
public class LeaseExistsRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(LeaseExistsRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// This is for backward compatibility until all applications have ASG names, otherwise while starting up
// the client status may override status replicated from other servers
if (!isReplication) {
InstanceInfo.InstanceStatus existingStatus = null;
if (existingLease != null) {
existingStatus = existingLease.getHolder().getStatus();
}
// Allow server to have its way when the status is UP or OUT_OF_SERVICE
if ((existingStatus != null) &&
(InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(existingStatus) || InstanceInfo.InstanceStatus.UP.equals(existingStatus))) {
logger.debug("There is already an existing lease with status {} for instance {}",
existingLease.getHolder().getStatus().name(),
existingLease.getHolder().getId());
return StatusOverrideResult.matchingStatus(existingLease.getHolder().getStatus());
}
}
return StatusOverrideResult.NO_MATCH;
}
该规则只对客户端主动续约生效,不对server间同步生效。
该规则是根据existingLease来做判断,也就是匹配已存在续约的应用实例的状态UP
或OUT_OF_SERVICE
,满足这两个状态就直接返回,否则不匹配。
DownOrStartingRule
/**
* This rule matches if the instance is DOWN or STARTING.
*/
public class DownOrStartingRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(DownOrStartingRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// ReplicationInstance is DOWN or STARTING - believe that, but when the instance says UP, question that
// The client instance sends STARTING or DOWN (because of heartbeat failures), then we accept what
// the client says. The same is the case with replica as well.
// The OUT_OF_SERVICE from the client or replica needs to be confirmed as well since the service may be
// currently in SERVICE
if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
&& (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
logger.debug("Trusting the instance status {} from replica or instance for instance {}",
instanceInfo.getStatus(), instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
return StatusOverrideResult.NO_MATCH;
}
该规则只匹配请求实例的状态是DOWN或STARTING。
FirstMatchWinsCompositeRule
/**
* This rule takes an ordered list of rules and returns the result of the first match or the
* result of the {@link AlwaysMatchInstanceStatusRule}.
*/
public class FirstMatchWinsCompositeRule implements InstanceStatusOverrideRule {
private final InstanceStatusOverrideRule[] rules;
private final InstanceStatusOverrideRule defaultRule;
private final String compositeRuleName;
public FirstMatchWinsCompositeRule(InstanceStatusOverrideRule... rules) {
this.rules = rules;
this.defaultRule = new AlwaysMatchInstanceStatusRule();
// Let's build up and "cache" the rule name to be used by toString();
List<String> ruleNames = new ArrayList<>(rules.length+1);
for (int i = 0; i < rules.length; ++i) {
ruleNames.add(rules[i].toString());
}
ruleNames.add(defaultRule.toString());
compositeRuleName = ruleNames.toString();
}
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
for (int i = 0; i < this.rules.length; ++i) {
StatusOverrideResult result = this.rules[i].apply(instanceInfo, existingLease, isReplication);
if (result.matches()) {
return result;
}
}
return defaultRule.apply(instanceInfo, existingLease, isReplication);
}
该规则并没有具体的实现,而是将之前所有需要执行的规则都保存到了一个数组中,按顺序依次执行。 具体的顺序在PeerAwareInstanceRegistryImpl
的构造方法中已指定。
@Inject
public PeerAwareInstanceRegistryImpl(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
根据此顺序可以发现,如果是一个正常的注册或续约请求,会优先按照覆盖状态的值匹配。
状态变更(statusUpdate)
/**
* Updates the status of an instance. Normally happens to put an instance
* between {@link InstanceStatus#OUT_OF_SERVICE} and
* {@link InstanceStatus#UP} to put the instance in and out of traffic.
*
* @param appName the application name of the instance.
* @param id the unique identifier of the instance.
* @param newStatus the new {@link InstanceStatus}.
* @param lastDirtyTimestamp last timestamp when this instance information was updated.
* @param isReplication true if this is a replication event from other nodes, false
* otherwise.
* @return true if the status was successfully updated, false otherwise.
*/
@Override
public boolean statusUpdate(String appName, String id,
InstanceStatus newStatus, String lastDirtyTimestamp,
boolean isReplication) {
try {
read.lock();
// 计数器+1
STATUS_UPDATE.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
lease = gMap.get(id);
}
// 如果之前没有register缓存,就直接返回
if (lease == null) {
return false;
} else {
// 更新时间戳
lease.renew();
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
// 如果缓存的status != 本次变更的status,就更新缓存
if ((info != null) && !(info.getStatus().equals(newStatus))) {
// Mark service as UP if needed
// 如果本次是UP且之前不是UP,就更新一下服务up的时间戳
if (InstanceStatus.UP.equals(newStatus)) {
lease.serviceUp();
}
// This is NAC overriden status
// 将本次新的状态缓存到覆盖状态的map中
overriddenInstanceStatusMap.put(id, newStatus);
// Set it for transfer of overridden status to replica on
// replica start up
// 更新当前实例的状态和覆盖状态为newStatus
info.setOverriddenStatus(newStatus);
long replicaDirtyTimestamp = 0;
info.setStatusWithoutDirty(newStatus);
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
// 如果本次的dirty时间戳 > 缓存的时间戳,就更新缓存info的时间戳
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
// 当前操作类型为MODIFIED
info.setActionType(ActionType.MODIFIED);
// 加入到变更队列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新变更时间戳
info.setLastUpdatedTimestamp();
// 失效当前实例的缓存,以及all和all_delta
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}
本方法主要用在InstanceStatus.UP
和InstanceStatus.OUT_OF_SERVICE
之间切换,用于只在服务端控制某个实例是否接收流量。方法实现本质是修改ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap
的覆盖状态缓存值。
通过修改覆盖状态,并结合覆盖状态计算规则,达到只在服务端变更实例状态的目的。
删除覆盖状态(deleteStatusOverride)
/**
* Removes status override for a give instance.
*
* @param appName the application name of the instance.
* @param id the unique identifier of the instance.
* @param newStatus the new {@link InstanceStatus}.
* @param lastDirtyTimestamp last timestamp when this instance information was updated.
* @param isReplication true if this is a replication event from other nodes, false
* otherwise.
* @return true if the status was successfully updated, false otherwise.
*/
@Override
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
try {
read.lock();
// 计数器+1
STATUS_OVERRIDE_DELETE.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
lease = gMap.get(id);
}
if (lease == null) {
return false;
} else {
// 更新时间戳
lease.renew();
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
InstanceStatus currentOverride = overriddenInstanceStatusMap.remove(id);
// 移除时,如果之前已缓存,会返回
if (currentOverride != null && info != null) {
// 设置覆盖状态为UNKNOWN
info.setOverriddenStatus(InstanceStatus.UNKNOWN);
// 按照参数值来设置状态,WithoutDirty表示不需要更新dirty时间戳
info.setStatusWithoutDirty(newStatus);
long replicaDirtyTimestamp = 0;
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
// 如果本次的dirty时间戳 > 缓存的时间戳,就更新缓存info的时间戳
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
// 当前操作类型为MODIFIED
info.setActionType(ActionType.MODIFIED);
// 加入到变更队列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新变更时间戳
info.setLastUpdatedTimestamp();
// 失效当前实例的缓存,以及all和all_delta
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}
删除覆盖状态代码的实现逻辑跟更新代码相差不大,只是一个是更新map,一个是从map中删除。
自我保护
自我保护的目的是防止因为server端和client端的网络抖动问题,导致大量有效client端被下线,从而影响到client之间本身的相互调用。
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
计算公式:每分钟续约阈值 = 预计客户端续约数(客户端实例数) * 每分钟续约的次数 * 阈值百分比
如果每分钟的续约数量 > 阈值,则过期实例可能被剔除; 否则就进入自动保护状态,不会有实例被过期。具体剔除规则参考evict
的说明。