Spring Cloud Eureka 源码分析 —— Server端(服务治理)

一. 前言

本文详细介绍了eureka server端关于服务治理的几个核心方法实现。主要实现逻辑集中在com.netflix.eureka.registry.AbstractInstanceRegistry抽象类中。

client端注册逻辑参见Spring Cloud Eureka 源码分析 —— Client端

二. 服务治理

服务治理包括服务注册(register)服务续约(renew)服务获取(getApplication)服务下线(cancel)心跳同步(heartbeat)服务剔除(evict)自我保护

服务状态管理包括覆盖状态(overriddenStatus)状态变更(statusUpdate)删除覆盖状态(deleteStatusOverride)规则计算

三. 源码分析

服务注册(register)

    //com.netflix.eureka.registry.AbstractInstanceRegistry
    /**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     *  registrant 表示本次注册的实例信息
     *  leaseDuration 表示续约周期
     *  isReplication 表示本次是server间的同步(true),还是客户端主动注册(false)
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            // registry缓存了实例注册的信息,注册实现本质上就是将实例信息添加到register属性中。
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            // 计数器+1
            REGISTER.increment(isReplication);
            //初始化gMap
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 如果之前有缓存续约信息,比较两个对象的时间差
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    // 如果本次注册是更老的实例信息,就还使用上次缓存的对象
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                // 如果本次是第一次注册,expectedNumberOfClientsSendingRenews+1,并更新自我保护阈值
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                // 使用之前的服务开启时间
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 将这个实例添加到register缓存中
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                // recentRegisteredQueue是一个记录注册操作的队列,key是注册时间,value是客户端实例id,主要用于debug或统计使用
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            // 如果请求实例的覆盖状态不是UNKONWN,且之前map中没有缓存过,则保存请求的覆盖状态
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            // 如果之前map已经缓存过覆盖状态,则以map的覆盖状态为准,这是因为map缓存的值可以被statusUpdate方法调整,优先服务端的配置值
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            // 根据当前的覆盖状态和规则,计算当前实例状态,具体计算规则下文具体分析
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            // 如果是UP状态,就设置serviceUp的时间戳
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            // 设置操作类型为ADDED,表示添加到server端列表中
            registrant.setActionType(ActionType.ADDED);
            // 本次register添加到变更队列中
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            // 更新变更时间
            registrant.setLastUpdatedTimestamp();
            // 失效当前实例的缓存,以及all和all_delta
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

服务注册主要有以下几步:

  1. 初始化registry中的gMap(非必要)
  2. 更新expectedNumberOfClientsSendingRenews 并重新计算自我保护阈值
  3. registrant添加到gMap
  4. 更新overriddenInstanceStatusMap中的覆盖状态
  5. 计算并更新registrant的status
  6. 失效缓存

服务续约(renew)

    /**
     * Marks the given instance of the given app name as renewed, and also marks whether it originated from
     * replication.
     *
     * @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
     *  appName 服务名
     *  id 服务实例id
     *  isReplication 表示本次是server间的同步(true),还是客户端主动注册(false)
    */
    public boolean renew(String appName, String id, boolean isReplication) {
        // 续约次数+1
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        // 根据服务名和实例id获取服务实例
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            // 如果没有找到,次数+1
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            // 获取缓存的实例信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                // 获取计算后的实例状态,计算方式下文介绍,如果此时状态是UNKNOWN,可能实例已经被删除
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                // 如果计算后的状态 != 实例状态,将计算后的状态作为实例状态
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            // 增加续约次数
            renewsLastMin.increment();
            // 更新Lease续约时间,会在后续判断续约是否过期时使用
            leaseToRenew.renew();
            return true;
        }
    }

服务续约主要有以下几步:

  1. 判断registry中是否存在当前实例id,不存在就直接返回
  2. 根据覆盖状态计算并更新registrant的status
  3. 更新续约时间

服务获取(getApplication)

Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                // 是否允许读取只读cache的实例信息
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    // 只读cache没有取到,就从读写cache中获取
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
    }

三级缓存

客户端在获取服务注册表时,默认使用三级缓存的机制获取。
readOnlyCacheMap (ConcurrentMap<Key, Value>)只读缓存
readWriteCacheMap (LoadingCache<Key, Value>) 读写缓存
register (ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>) 注册表缓存

读取数据先从只读缓存读取,只读缓存没有再从读写缓存读,读写缓存没有最后再从注册表读。

更新机制

register更新机制是几乎每个服务操作都会对其对更新,有的是put或remove,有的是更新缓存对象的一些属性。
readWriteCacheMap更新机制是通过invalidateCache方法,或者通过任务自动过期,默认过期时间180s
readOnlyCacheMap 默认每隔30秒会从读写缓存中同步

通过下面代码可以看到invalidateCache的过期实现和readOnlyCacheMap的刷新任务。

    @Override
    public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
        for (Key.KeyType type : Key.KeyType.values()) {
            for (Version v : Version.values()) {
                invalidate(
                        // appName,全量apps, 增量apps都会过期
                        new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                        new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                        new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                        new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                        new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                        new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
                );
                if (null != vipAddress) {
                    invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
                }
                if (null != secureVipAddress) {
                    invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
                }
            }
        }
    }

    /**
     * Invalidate the cache information given the list of keys.
     */
    public void invalidate(Key... keys) {
        for (Key key : keys) {
            readWriteCacheMap.invalidate(key);
            Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
            if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
                for (Key keysWithRegion : keysWithRegions) {
                    logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                            key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                    readWriteCacheMap.invalidate(keysWithRegion);
                }
            }
        }
    }
        ...
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
        // responseCacheUpdateIntervalMs 默认值30s
        ...
    // 只读cache刷新任务
    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) {
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                    }
                }
            }
        };
    }

服务下线(cancel)

    /**
     * {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each
     * cancel request is replicated to the peers. This is however not desired for expires which would be counted
     * in the remote peers as valid cancellations, so self preservation mode would not kick-in.
     */
    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            read.lock();
            // 计数器+1
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                // 从registry缓存中移除当前实例id
                leaseToCancel = gMap.remove(id);
            }  
            // 下线实例id加入到下线操作队列里
            synchronized (recentCanceledQueue) {
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            // 覆盖状态缓存移除当前实例id
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            // 如果register缓存中没有找到实例id,计数器+1
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                // 更新剔除时间戳
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    // 设置操作类型为DELETED,表示从server端列表中删除
                    instanceInfo.setActionType(ActionType.DELETED);
                    // 变更队列中添加本次剔除操作
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    // 更新变更时间戳
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                // 失效当前实例的缓存,以及all和all_delta 
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                return true;
            }
        } finally {
            read.unlock();
        }
    }

服务下线主要有以下几步:

  1. 从registry中将此实例id移除
  2. 从overriddenInstanceStatusMap中将此实例id移除
  3. 失效缓存

心跳同步(heartbeat)

private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
        Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
        int responseStatus = response.getStatus();
        Builder responseBuilder = new Builder().setStatusCode(responseStatus);

        if ("false".equals(config.getExperimental("bugfix.934"))) {
            if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) {
                responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
            }
        } else {
            if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode())
                    && response.getEntity() != null) {
                responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
            }
        }
        return responseBuilder;
    }

    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
        ...
    }

心跳同步主要用于peer server端之间最终一致性的数据同步。最终也调用到renew方法。

服务剔除(evict)

    // additionalLeaseMs 为补偿时间
    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
        // 如果当前处于自我保护状态,就不允许剔除
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        // 将所有的过期实例加入到list中
        // 这段注释是说将所有过期实例加入到list中,以随机的方式剔除,防止在自我保护开启前就已经把某些服务的实例全部剔除掉
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        // 当前实例总数
        int registrySize = (int) getLocalRegistrySize();
        // 当前实例总数 * 自我保护阈值(0.85)
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
        // 这里计算剔除数量,将应剔除的总数 和 最大剔除数量间 取最小值,每次任务最多只剔除15%的实例
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                // 通过随机的方式挑选出实例(洗牌算法)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                // 计数器+1
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                // 找到要剔除的实例,调用cancal方法将其下线
                internalCancel(appName, id, false);
            }
        }
    }

    // 判断是否过期,需要加上补偿时间,解释见下文
    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }
    
    // 检查是否可以剔除过期实例
    @Override
    public boolean isLeaseExpirationEnabled() {
        if (!isSelfPreservationModeEnabled()) {
            // 如果没有开启自我保护模式,直接返回true,表示允许过期剔除
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
        // 如果开启了自我保护模式,且当前每分钟续约数 > 既定阈值,也允许剔除,否则表示当前处于自我保护状态
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }

服务剔除主要有以下几步:

  1. 根据自我保护机制以及续约时间的判断找到所有过期实例
  2. 计算剔除数量,将应剔除的总数最大剔除数量之间取最小值
  3. 使用洗牌算法找出要剔除的实例id
  4. 调用internalCancel方法将其下线

剔除时间补偿

        /**
         * compute a compensation time defined as the actual time this task was executed since the prev iteration,
         * vs the configured amount of time for execution. This is useful for cases where changes in time (due to
         * clock skew or gc for example) causes the actual eviction task to execute later than the desired time
         * according to the configured cycle.
         */
        long getCompensationTimeMs() {
            // 当前时间戳
            long currNanos = getCurrentTimeNano();
            // 上次时间戳
            long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
            if (lastNanos == 0l) {
                return 0l;
            }
            // 执行时间差
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            // 补偿时间 = 执行时间差 - 周期时间
            long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0l ? 0l : compensationTime;
        }

设置补偿时间的目的在于,如果在执行过程中出现gc或者时钟回退,会导致执行周期时间略大于配置周期时间,以至于出现为到期而被剔除的情况。通过时间补偿记录两次时间差,补偿一些时间损耗。

覆盖状态(overriddenStatus)

InstanceInfo有两个状态的表示字段,分别是statusoverriddenStatus,分别表示实例的状态覆盖状态,eureka定义覆盖状态的目的在于,在不改变真实client端状态的前提下,修改server端注册表的状态。

在大多数情况下,都是InstanceStatus.UPInstanceStatus.OUT_OF_SERVICE直接做状态切换。InstanceStatus.UP表示接收流量;InstanceStatus.OUT_OF_SERVICE表示摘除流量。

说的更通俗一点,就是真实客户端保持不动,是正常状态,通过修改server端的覆盖状态,来达到让实例的InstanceStatus从UP变更为OUT_OF_SERVICE,从而实现摘除流量的目的。

    // key是实例id
    protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
            .newBuilder().initialCapacity(500)
            .expireAfterAccess(1, TimeUnit.HOURS)
            .<String, InstanceStatus>build().asMap();

    public enum InstanceStatus {
        UP, // Ready to receive traffic
        DOWN, // Do not send traffic- healthcheck callback failed
        STARTING, // Just about starting- initializations to be done - do not
        // send traffic
        OUT_OF_SERVICE, // Intentionally shutdown for traffic
        UNKNOWN;

        public static InstanceStatus toEnum(String s) {
            if (s != null) {
                try {
                    return InstanceStatus.valueOf(s.toUpperCase());
                } catch (IllegalArgumentException e) {
                    // ignore and fall through to unknown
                    logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
                }
            }
            return UNKNOWN;
        }
    }

实例状态的计算覆盖规则

/**
 * A single rule that if matched it returns an instance status.
 * The idea is to use an ordered list of such rules and pick the first result that matches.
 *
 * It is designed to be used by
 * {@link AbstractInstanceRegistry#getOverriddenInstanceStatus(InstanceInfo, Lease, boolean)}
 *
 * Created by Nikos Michalakis on 7/13/16.
 */
public interface InstanceStatusOverrideRule {

    /**
     * Match this rule.
     *
     * @param instanceInfo The instance info whose status we care about.
     * @param existingLease Does the instance have an existing lease already? If so let's consider that.
     * @param isReplication When overriding consider if we are under a replication mode from other servers.
     * @return A result with whether we matched and what we propose the status to be overriden to.
     */
    StatusOverrideResult apply(final InstanceInfo instanceInfo,  final Lease<InstanceInfo> existingLease, boolean isReplication); 
}

/**
 * Container for a result computed by an {@link InstanceStatusOverrideRule}.
 *
 * Created by Nikos Michalakis on 7/13/16.
 */
public class StatusOverrideResult {

    public static StatusOverrideResult NO_MATCH = new StatusOverrideResult(false, null);

    public static StatusOverrideResult matchingStatus(InstanceInfo.InstanceStatus status) {
        return new StatusOverrideResult(true, status);
    }

    // Does the rule match?
    private final boolean matches;

    // The status computed by the rule.
    private final InstanceInfo.InstanceStatus status;

    private StatusOverrideResult(boolean matches, InstanceInfo.InstanceStatus status) {
        this.matches = matches;
        this.status = status;
    }

    public boolean matches() {
        return matches;
    }

    public InstanceInfo.InstanceStatus status() {
        return status;
    }
}

这里定义了覆盖规则的接口,每个规则实现类需要实现apply,来返回当前实例状态是否满足本规则,且如果满足,应该返回InstanceInfo.InstanceStatus是什么。

eureka默认实现类的关系图:


默认有6种实现类,有一个AsgEnabledRule是AWS环境的实现类,这里就不多说明。主要分析图中这5种。

OverrideExistsRule
/**
 * This rule checks to see if we have overrides for an instance and if we do then we return those.
 */
public class OverrideExistsRule implements InstanceStatusOverrideRule {

    private Map<String, InstanceInfo.InstanceStatus> statusOverrides;

    public OverrideExistsRule(Map<String, InstanceInfo.InstanceStatus> statusOverrides) {
        this.statusOverrides = statusOverrides;
    }

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
        InstanceInfo.InstanceStatus overridden = statusOverrides.get(instanceInfo.getId());
        // If there are instance specific overrides, then they win - otherwise the ASG status
        if (overridden != null) {
            logger.debug("The instance specific override for instance {} and the value is {}",
                    instanceInfo.getId(), overridden.name());
            return StatusOverrideResult.matchingStatus(overridden);
        }
        return StatusOverrideResult.NO_MATCH;
    }

该规则使用statusOverrides缓存的值作为返回值,而statusOverrides其实就是AbstractInstanceRegistry.overriddenInstanceStatusMap,同时该规则每次调用都会刷新overriddenInstanceStatusMap的有效期,来保证不会过期。

AlwaysMatchInstanceStatusRule
/**
 * This rule matches always and returns the current status of the instance.
 */
public class AlwaysMatchInstanceStatusRule implements InstanceStatusOverrideRule {

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                      Lease<InstanceInfo> existingLease,
                                      boolean isReplication) {
        logger.debug("Returning the default instance status {} for instance {}", instanceInfo.getStatus(),
                instanceInfo.getId());
        return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
    }

该规则忽视覆盖状态,直接按照当前实例的状态返回。

LeaseExistsRule
/**
 * This rule matches if we have an existing lease for the instance that is UP or OUT_OF_SERVICE.
 */
public class LeaseExistsRule implements InstanceStatusOverrideRule {

    private static final Logger logger = LoggerFactory.getLogger(LeaseExistsRule.class);

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                      Lease<InstanceInfo> existingLease,
                                      boolean isReplication) {
        // This is for backward compatibility until all applications have ASG names, otherwise while starting up
        // the client status may override status replicated from other servers
        if (!isReplication) {
            InstanceInfo.InstanceStatus existingStatus = null;
            if (existingLease != null) {
                existingStatus = existingLease.getHolder().getStatus();
            }
            // Allow server to have its way when the status is UP or OUT_OF_SERVICE
            if ((existingStatus != null) && 
                (InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(existingStatus) || InstanceInfo.InstanceStatus.UP.equals(existingStatus))) {
                logger.debug("There is already an existing lease with status {}  for instance {}",
                        existingLease.getHolder().getStatus().name(),
                        existingLease.getHolder().getId());
                return StatusOverrideResult.matchingStatus(existingLease.getHolder().getStatus());
            }
        }
        return StatusOverrideResult.NO_MATCH;
    }

该规则只对客户端主动续约生效,不对server间同步生效。
该规则是根据existingLease来做判断,也就是匹配已存在续约的应用实例的状态UPOUT_OF_SERVICE,满足这两个状态就直接返回,否则不匹配。

DownOrStartingRule
/**
 * This rule matches if the instance is DOWN or STARTING.
 */
public class DownOrStartingRule implements InstanceStatusOverrideRule {
    private static final Logger logger = LoggerFactory.getLogger(DownOrStartingRule.class);

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                      Lease<InstanceInfo> existingLease,
                                      boolean isReplication) {
        // ReplicationInstance is DOWN or STARTING - believe that, but when the instance says UP, question that
        // The client instance sends STARTING or DOWN (because of heartbeat failures), then we accept what
        // the client says. The same is the case with replica as well.
        // The OUT_OF_SERVICE from the client or replica needs to be confirmed as well since the service may be
        // currently in SERVICE
        if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
                && (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
            logger.debug("Trusting the instance status {} from replica or instance for instance {}",
                    instanceInfo.getStatus(), instanceInfo.getId());
            return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
        }
        return StatusOverrideResult.NO_MATCH;
    }

该规则只匹配请求实例的状态是DOWN或STARTING。

FirstMatchWinsCompositeRule

/**
 * This rule takes an ordered list of rules and returns the result of the first match or the
 * result of the {@link AlwaysMatchInstanceStatusRule}.
 */
public class FirstMatchWinsCompositeRule implements InstanceStatusOverrideRule {

    private final InstanceStatusOverrideRule[] rules;
    private final InstanceStatusOverrideRule defaultRule;
    private final String compositeRuleName;

    public FirstMatchWinsCompositeRule(InstanceStatusOverrideRule... rules) {
        this.rules = rules;
        this.defaultRule = new AlwaysMatchInstanceStatusRule();
        // Let's build up and "cache" the rule name to be used by toString();
        List<String> ruleNames = new ArrayList<>(rules.length+1);
        for (int i = 0; i < rules.length; ++i) {
            ruleNames.add(rules[i].toString());
        }
        ruleNames.add(defaultRule.toString());
        compositeRuleName = ruleNames.toString();
    }

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
        for (int i = 0; i < this.rules.length; ++i) {
            StatusOverrideResult result = this.rules[i].apply(instanceInfo, existingLease, isReplication);
            if (result.matches()) {
                return result;
            }
        }
        return defaultRule.apply(instanceInfo, existingLease, isReplication);
    }

该规则并没有具体的实现,而是将之前所有需要执行的规则都保存到了一个数组中,按顺序依次执行。 具体的顺序在PeerAwareInstanceRegistryImpl的构造方法中已指定。

@Inject
    public PeerAwareInstanceRegistryImpl(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient) {
        super(serverConfig, clientConfig, serverCodecs);
        this.eurekaClient = eurekaClient;
        this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
        // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
        // then we check the status of a potentially existing lease.
        this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
                new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
    }

根据此顺序可以发现,如果是一个正常的注册或续约请求,会优先按照覆盖状态的值匹配。

状态变更(statusUpdate)

/**
     * Updates the status of an instance. Normally happens to put an instance
     * between {@link InstanceStatus#OUT_OF_SERVICE} and
     * {@link InstanceStatus#UP} to put the instance in and out of traffic.
     *
     * @param appName the application name of the instance.
     * @param id the unique identifier of the instance.
     * @param newStatus the new {@link InstanceStatus}.
     * @param lastDirtyTimestamp last timestamp when this instance information was updated.
     * @param isReplication true if this is a replication event from other nodes, false
     *                      otherwise.
     * @return true if the status was successfully updated, false otherwise.
     */
    @Override
    public boolean statusUpdate(String appName, String id,
                                InstanceStatus newStatus, String lastDirtyTimestamp,
                                boolean isReplication) {
        try {
            read.lock();
            // 计数器+1
            STATUS_UPDATE.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> lease = null;
            if (gMap != null) {
                lease = gMap.get(id);
            }
            // 如果之前没有register缓存,就直接返回
            if (lease == null) {
                return false;
            } else {
                // 更新时间戳
                lease.renew();
                InstanceInfo info = lease.getHolder();
                // Lease is always created with its instance info object.
                // This log statement is provided as a safeguard, in case this invariant is violated.
                if (info == null) {
                    logger.error("Found Lease without a holder for instance id {}", id);
                }
                // 如果缓存的status != 本次变更的status,就更新缓存
                if ((info != null) && !(info.getStatus().equals(newStatus))) {
                    // Mark service as UP if needed
                    // 如果本次是UP且之前不是UP,就更新一下服务up的时间戳
                    if (InstanceStatus.UP.equals(newStatus)) {
                        lease.serviceUp();
                    }
                    // This is NAC overriden status
                    // 将本次新的状态缓存到覆盖状态的map中
                    overriddenInstanceStatusMap.put(id, newStatus);
                    // Set it for transfer of overridden status to replica on
                    // replica start up
                    // 更新当前实例的状态和覆盖状态为newStatus
                    info.setOverriddenStatus(newStatus);
                    long replicaDirtyTimestamp = 0;
                    info.setStatusWithoutDirty(newStatus);
                    if (lastDirtyTimestamp != null) {
                        replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
                    }
                    // If the replication's dirty timestamp is more than the existing one, just update
                    // it to the replica's.
                    // 如果本次的dirty时间戳 > 缓存的时间戳,就更新缓存info的时间戳
                    if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                        info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                    }
                    // 当前操作类型为MODIFIED
                    info.setActionType(ActionType.MODIFIED);
                    // 加入到变更队列中
                    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                    // 更新变更时间戳
                    info.setLastUpdatedTimestamp();
                    // 失效当前实例的缓存,以及all和all_delta
                    invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
                }
                return true;
            }
        } finally {
            read.unlock();
        }
    }

本方法主要用在InstanceStatus.UPInstanceStatus.OUT_OF_SERVICE之间切换,用于只在服务端控制某个实例是否接收流量。方法实现本质是修改ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap的覆盖状态缓存值。

通过修改覆盖状态,并结合覆盖状态计算规则,达到只在服务端变更实例状态的目的。

删除覆盖状态(deleteStatusOverride)

    /**
     * Removes status override for a give instance.
     *
     * @param appName the application name of the instance.
     * @param id the unique identifier of the instance.
     * @param newStatus the new {@link InstanceStatus}.
     * @param lastDirtyTimestamp last timestamp when this instance information was updated.
     * @param isReplication true if this is a replication event from other nodes, false
     *                      otherwise.
     * @return true if the status was successfully updated, false otherwise.
     */
    @Override
    public boolean deleteStatusOverride(String appName, String id,
                                        InstanceStatus newStatus,
                                        String lastDirtyTimestamp,
                                        boolean isReplication) {
        try {
            read.lock();
             // 计数器+1
            STATUS_OVERRIDE_DELETE.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> lease = null;
            if (gMap != null) {
                lease = gMap.get(id);
            }
            if (lease == null) {
                return false;
            } else {
                // 更新时间戳
                lease.renew();
                InstanceInfo info = lease.getHolder();

                // Lease is always created with its instance info object.
                // This log statement is provided as a safeguard, in case this invariant is violated.
                if (info == null) {
                    logger.error("Found Lease without a holder for instance id {}", id);
                }
                
                InstanceStatus currentOverride = overriddenInstanceStatusMap.remove(id);
                // 移除时,如果之前已缓存,会返回
                if (currentOverride != null && info != null) {
                    // 设置覆盖状态为UNKNOWN
                    info.setOverriddenStatus(InstanceStatus.UNKNOWN);
                    // 按照参数值来设置状态,WithoutDirty表示不需要更新dirty时间戳
                    info.setStatusWithoutDirty(newStatus);
                    long replicaDirtyTimestamp = 0;
                    if (lastDirtyTimestamp != null) {
                        replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
                    }
                    // If the replication's dirty timestamp is more than the existing one, just update
                    // it to the replica's.
                    // 如果本次的dirty时间戳 > 缓存的时间戳,就更新缓存info的时间戳
                    if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                        info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                    }
                    // 当前操作类型为MODIFIED
                    info.setActionType(ActionType.MODIFIED);
                    // 加入到变更队列中
                    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                    // 更新变更时间戳
                    info.setLastUpdatedTimestamp();
                    // 失效当前实例的缓存,以及all和all_delta
                    invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
                }
                return true;
            }
        } finally {
            read.unlock();
        }
    }

删除覆盖状态代码的实现逻辑跟更新代码相差不大,只是一个是更新map,一个是从map中删除。

自我保护

自我保护的目的是防止因为server端和client端的网络抖动问题,导致大量有效client端被下线,从而影响到client之间本身的相互调用。

protected void updateRenewsPerMinThreshold() {
        this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                * serverConfig.getRenewalPercentThreshold());
    }
@Override
    public boolean isLeaseExpirationEnabled() {
        if (!isSelfPreservationModeEnabled()) {
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }

计算公式:每分钟续约阈值 = 预计客户端续约数(客户端实例数) * 每分钟续约的次数 * 阈值百分比

如果每分钟的续约数量 > 阈值,则过期实例可能被剔除; 否则就进入自动保护状态,不会有实例被过期。具体剔除规则参考evict的说明。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容