Spring Cloud之Eureka源码分析

本章主要介绍Eureka server端源码分析。
在分析eureka源码源码之前,需要首先了解eureka的作用。
主要包括服务注册与发现,心跳续约,自动保护机制,服务剔除,集群同步等功能。

@EnableEurekaServer注解

Eureka的服务端开启是通过这个注解。


image.png

这个注解创建了EurekaServerMarkerConfiguration类


image.png

EurekaServerMarkerConfiguration类声明这个类是个配置类,并通过@Bean创建一个Marker类用来进行标记。在spring自动装配过程中,将会对EurekaServerAutoConfiguration类进行装配条件判断
image.png

EurekaServerAutoConfiguration

加载条件是判断检查标记类Marker是否存在。


image.png

如果存在,就表明加了@EnableEurekaServer注解。

EurekaServerAutoConfiguration类的作用是:

1、导入EurekaServerInitializerConfiguration类。
服务剔除、自我保护机制、初始化自我保护机制阈值
2、创建类并注入spring容器中
EurekaServerAutoConfiguration配置类需要创建的类包括PeerAwareInstanceRegistry、PeerEurekaNodes、EurekaServerContext、EurekaServerBootstrap、FilterRegistrationBean、javax.ws.rs.core.Application
其中FilterRegistrationBean和Application:查找资源文件并设置到Jersey过滤器中。
Jersey过滤器的作用是在"com.netflix.discovery","com.netflix.eureka" 两个包下找到所有加了Path或Provider的注解,并返回的资源为Application实例对象。


image.png

image.png

FilterRegistrationBean设置的过滤器对象是Application实例


image.png

所以,在eureka client与server端交互就是通过这些资源文件Application方法调用的。这些资源访问与MVC模型中Controller层的访问原理类似。

服务注册

由客户端发起Application资源ApplicationResource类的调用

1、addInstance

com.netflix.eureka.resources.ApplicationResource#addInstance方法
首先检查添加实例信息是否完整,然后调用registry.register方法进行实例注册

   @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
       //验证一些实例数据是否完整....
        // handle cases where clients may be registering with bad DataCenterInfo with missing data

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

在查看添加实例具体的调用链之前,需要了解下InstanceRegistry类的继承关系


InstanceRegistry.png
2、register

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, int, boolean)


register.png

2.1、首先通过handleRegistration方法发布EurekaInstanceRegisteredEvent事件


image.png

2.2、调用父类register方法
3、register

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
注册后进行同步,通过isReplication参数控制。如果注册是从其他复制节点进行复制,则不同步复制


image.png
4、register

com.netflix.eureka.registry.AbstractInstanceRegistry#register

 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

在这个方法中:
使用ReentrantReadWriteLock的读锁进行加锁,所以在客户端调用资源进行注册时,可能不止一个线程调用到这里
注册信息registry使用ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 结构,表示的意义是对于一个高可用微服务集群。如下图对于服务a有a1、a2、a3相同的微服务。则ConcurrentHashMap中的String表示微服务组名称a,Map<String, Lease<InstanceInfo>>>是三个微服务实例,其中Map中的String表示a1或a2或a3微服务名称,InstanceInfo表示三个服务实例信息


image.png

根据instanceId查找existingLease实例对象,跟当前registrant注册实例比较。使用时间戳较大的实例对registrant变量赋值。并设置实例的serviceUpTimestamp服务上线时间戳。
放入到当前微服务集合中gMap.put(registrant.getId(), lease);到这里则完成服务注册
总之,执行注册实例的结果就是把appName的微服务加入到ConcurrentHashMap集合中。

心跳续约

客户端更新续约通过put方法。通过客户端发起请求
com.netflix.eureka.resources.InstanceResource#renewLease


image.png

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
调用父类续约,续约成功则同步集群


image.png

com.netflix.eureka.registry.AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }

获取注册信息中的具体实例租债器对象,执行租债器的renew()方法
com.netflix.eureka.lease.Lease#renew


image.png

续约成功,则更改lastUpdateTimestamp的值是当前系统时间戳与duration的和

服务下架

服务剔除是eureka服务端对过期的服务(长时间没有心跳的服务)进行定时清除。是服务端开启的定时任务,具体源码实现是在导入EurekaServerInitializerConfiguration类中实现。服务下架是客户端主动下架
EurekaServerInitializerConfiguration实现了ServletContextAware, SmartLifecycle, Ordered接口。

start方法执行时机

SmartLifecycle的start方法调用链(在spring的生命周期中):
AbstractApplicationContext#refresh
AbstractApplicationContext#finishRefresh
DefaultLifecycleProcessor#onRefresh

1、start

EurekaServerInitializerConfiguration#start
在一个线程内执行这个方法,原因是执行eureka上下文环境不会影响spring boot正常启动过程。
这个方法功能是eureka服务端上下文的初始化,发布了EurekaRegistryAvailableEvent和EurekaServerStartedEvent事件。


start.png
2、contextInitialized

org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized


contextInitialized.png
3、initEurekaServerContext

EurekaServerBootstrap#initEurekaServerContext
包括从邻近节点复制注册表


initEurekaServerContext.png
4、openForTraffic

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#openForTraffic


openForTraffic.png
5、openForTraffic

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic


openForTraffic.png
6、postInit

com.netflix.eureka.registry.AbstractInstanceRegistry#postInit


postInit.png
7、EvictionTask

com.netflix.eureka.registry.AbstractInstanceRegistry.EvictionTask
创建EvictionTask类,EvictionTask继承了TimerTask定时器任务类


EvictionTask.png
8、evict

com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }

首先判断是否开启了自我保护机制,遍历注册表类并判断每个租债器实例调用isExpired方法判断是否过期。对应过期的实例加入到expiredLeases集合中。isExpired方法根据当前时间戳与最后更新时间与duration和additionalLeaseMs比较得到。lastUpdateTimestamp最后更新时间是上次系统时间与duration 的和,最后更新时间注册、心跳续约、服务下架都会对这个参数进行更新。

public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

对于需要剔除的expiredLeases实例集合,并不会一下把所有的都会删除。会根据registrySize和RenewalPercentThreshold计算出一个evictionLimit剔除最大数量。例如一共有registrySize=100。RenewalPercentThreshold=0.85,则evictionLimit=100-100*0.85=15个。如果expiredLeases超过15个则会任意选出15个进行剔除,对于剩下的下次剔除任务中删除。如果下次还会超过则会触发自动保护机制isLeaseExpirationEnabled,直接返回。

集群同步

调用方法replicateToPeers
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers

/**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

当前实例的行为包括Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;对当前实例的这些行为操作成功后,则需要把这些状态行为的操作信息实例同步到邻近的eureka服务节点,需不需要集群同步主要看isReplication参数,默认是false需要同步,能够执行到replicateInstanceActionsToPeers方法。

同步操作

1、注册后同步register


register.png

2、续约后同步renew


renew.png

3、状态更新后同步statusUpdate
statusUpdate.png

4、下架后同步cancel


cancel.png

5、删除状态后DeleteStatusOverride
DeleteStatusOverride.png
syncUp

从邻近的节点复制节点信息
com.netflix.eureka.registry.PeerAwareInstanceRegistry#syncUp


syncUp.png
public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

在注册同步重试次数RegistrySyncRetries和注册同步等待时间RegistrySyncRetryWaitMs内,对eureka集群的其他对等节点注册的eurekaclient实例同步注册到当前节点。

自我保护机制

在服务剔除时,会检查是否开启自我保护机制及是否超过阈值


evict.png

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled
这个方法首先检查enableSelfPreservation参数是否开启(默认是true 开启的),然后判断当前得到的心跳数与自我保护阈值比较。


isLeaseExpirationEnabled.png
自我保护阈值计算

numberOfRenewsPerMinThreshold =expectedNumberOfRenewsPerMin*
renewalPercentThreshold
自我保护阈值=每分钟期望续约的心跳数(所有注册上的实例*每分钟触发的心跳连接次数)*自我保护机制的触发百分比(85%)

假如一共有100个实例,服务端在默认情况下每分钟连接刷新时间(expectedClientRenewalIntervalSeconds)是30s,所以一分钟有2次。
numberOfRenewsPerMinThreshold = 100*2*0.85=170个

所以,触发保护机制 当前得到的心跳连接数小于85%时,会触发
另一种说法,在服务剔除的时候当剔除的数量大于15%时,会触发
如果客户端续约刷新间隔与服务端续约刷新间隔不一致,则会造成自我保护机制不能正常工作

阈值更改时机

1、15分钟自动更改
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask


scheduleRenewalThresholdUpdateTask.png

updateRenewalThreshold.png

2、注册


image.png

3、服务下架
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#cancel
image.png

4、服务初始化

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic


image.png

为什么初始化和15分钟自动更改是数量的2倍,注册和下架是+2或-2。因为默认情况下服务端一次刷新心跳是30s,所以在60s内是2次。注册和下架是一个一个操作的

服务发现

查询服务时,涉及服务端的三层缓存架构。
1、只读缓存层readOnlyCacheMap(ConcurrentMap类型)
2、读写缓存层readWriteCacheMap(LoadingCache类型,使用google的guava框架)
3、真实数据层
com.netflix.eureka.resources.ApplicationResource#getApplication

 @GET
    public Response getApplication(@PathParam("version") String version,
                                   @HeaderParam("Accept") final String acceptHeader,
                                   @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
        if (!registry.shouldAllowAccess(false)) {
            return Response.status(Status.FORBIDDEN).build();
        }

        EurekaMonitors.GET_APPLICATION.increment();

        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        if (acceptHeader == null || !acceptHeader.contains("json")) {
            keyType = Key.KeyType.XML;
        }

        Key cacheKey = new Key(
                Key.EntityType.Application,
                appName,
                keyType,
                CurrentRequestVersion.get(),
                EurekaAccept.fromString(eurekaAccept)
        );

        String payLoad = responseCache.get(cacheKey);

        if (payLoad != null) {
            logger.debug("Found: {}", appName);
            return Response.ok(payLoad).build();
        } else {
            logger.debug("Not Found: {}", appName);
            return Response.status(Status.NOT_FOUND).build();
        }
    }

com.netflix.eureka.registry.ResponseCacheImpl#get(com.netflix.eureka.registry.Key)

public String get(final Key key) {
        return get(key, shouldUseReadOnlyResponseCache);
    }

    @VisibleForTesting
    String get(final Key key, boolean useReadOnlyCache) {
        Value payload = getValue(key, useReadOnlyCache);
        if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
            return null;
        } else {
            return payload.getPayload();
        }
    }

@VisibleForTesting
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key :" + key, t);
        }
        return payload;
    }

首先查看只读缓存层是否打开shouldUseReadOnlyResponseCache,与之对应的是getValue方法的useReadOnlyCache变量值。
如果打开,则先从只读缓存readOnlyCacheMap查询,如果查询不到则从读写缓存是readWriteCacheMap查询,并放入到只读缓存

缓存更新原理

1、只读缓存

只读缓存没有提供API更新,只能通过定时任务或查询时从读写缓存写到只读缓存
在创建ResponseCacheImpl类时,开启定时任务


shouldUseReadOnlyResponseCache.png

getCacheUpdateTask.png

如果从读写缓存得到的cacheValue与从只读缓存得到的currentCacheValue不相同,则把cacheValue替换到只读缓存中

2、读写缓存

2.1、构建ResponseCacheImpl类


readWriteCacheMap.png

如果缓存不存在,则通过generatePayload方法重新加载

/*
     * Generate pay load for the given key.
     */
    private Value generatePayload(Key key) {
        Stopwatch tracer = null;
        try {
            String payload;
            switch (key.getEntityType()) {
                case Application:
                    boolean isRemoteRegionRequested = key.hasRegions();

                    if (ALL_APPS.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeAllAppsWithRemoteRegionTimer.start();
                            payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeAllAppsTimer.start();
                            payload = getPayLoad(key, registry.getApplications());
                        }
                    } else if (ALL_APPS_DELTA.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                            versionDeltaWithRegions.incrementAndGet();
                            versionDeltaWithRegionsLegacy.incrementAndGet();
                            payload = getPayLoad(key,
                                    registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeDeltaAppsTimer.start();
                            versionDelta.incrementAndGet();
                            versionDeltaLegacy.incrementAndGet();
                            payload = getPayLoad(key, registry.getApplicationDeltas());
                        }
                    } else {
                        tracer = serializeOneApptimer.start();
                        payload = getPayLoad(key, registry.getApplication(key.getName()));
                    }
                    break;
                case VIP:
                case SVIP:
                    tracer = serializeViptimer.start();
                    payload = getPayLoad(key, getApplicationsForVip(key, registry));
                    break;
                default:
                    logger.error("Unidentified entity type: " + key.getEntityType() + " found in the cache key.");
                    payload = "";
                    break;
            }
            return new Value(payload);
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    }

private String getPayLoad(Key key, Applications apps) {
        EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
        String result;
        try {
            result = encoderWrapper.encode(apps);
        } catch (Exception e) {
            logger.error("Failed to encode the payload for all apps", e);
            return "";
        }
        if(logger.isDebugEnabled()) {
            logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
        }
        return result;
    }

2.2、调用invalidateCache方法,使缓存失效
com.netflix.eureka.registry.AbstractInstanceRegistry#invalidateCache


invalidateCache.png

com.netflix.eureka.registry.ResponseCacheImpl#invalidate


invalidate.png

com.netflix.eureka.registry.ResponseCacheImpl#invalidate(com.netflix.eureka.registry.Key...)
invalidate.png
调用invalidateCache方法时机

1、注册后更新缓存
com.netflix.eureka.registry.AbstractInstanceRegistry#register
2、服务下架
com.netflix.eureka.registry.AbstractInstanceRegistry#cancel
3、statusUpdate
com.netflix.eureka.registry.AbstractInstanceRegistry#statusUpdate
4、deleteStatusOverride
com.netflix.eureka.registry.AbstractInstanceRegistry#deleteStatusOverride

总结:

主要就是服务端使用的框架调用原理和Eureka Server端涉及的功能
那些功能使用客户端调用(有注册,续约,下架)
服务下架与服务剔除的区别
eureka服务端涉及的服务同步,自我保护阈值
服务查询的三级缓存使用原理

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容