spring cloud eureka server

Eureka Server作为一个开箱即用的服务中心,主要有以下功能:

  • 服务注册;
  • 接收服务心跳(续租);
  • 服务剔除;
  • 服务下线;
  • 集群同步;
  • 获取注册表中服务实例信息。

需要注意的是,Eureka Server本身也是一个Eureka Client,在不禁止其客户端行为时,他会向其他Eureka Server执行注册、发送心跳等操作。

源码和配置

其中重要的类为com.netflix.eureka.registry.AbstractInstanceRegistry类。该类主管该Server下的Client的活动信息。另外一个接口com.netflix.eureka.registry.PeerAwareInstanceRegistry,只有一个实现类,主管和peer之间的同步等操作。

eureka server主要类图.png

数据结构

注册表是Eureka中的主要部分,在此简单分析一下他的数据结构。


image.png

服务注册

服务注册的默认租约时间leaseDuration为90s。主要实现位于com.netflix.eureka.registry.AbstractInstanceRegistry#register方法中,其实现如下(以下所有的代码删除了logger):

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
        //首先获取读锁,防止其他线程修改数据,但是可以读取数据
            read.lock();
      //registry 全局变量,ConcurrentHashMap类型,总注册信息
      //通过APPName来获取服务集群的租约集合
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
               //此处使用putIfAbsent而不是put,防止多线程获取readLock之后,覆盖别的线程添加的属性
                 gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
           //获取Client实例的租约信息
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
           //租约存在,且租约中的Client实例不为空
            if (existingLease != null && (existingLease.getHolder() != null)) {
              //旧租约的修改时间
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
              //新租约的修改时间
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            //因为是多线程操作,所以新租约的时间不一定大于旧租约的时间
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    registrant = existingLease.getHolder();
                }
            } else {
                // 租约不存在
                synchronized (lock) {
                  //自我保护机制[1]
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1 for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
            //继承上次的服务启动时间
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            //把不是UNKUOWN的,且新的实例放到overriddenInstanceStatusMap中
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // 根据覆盖状态规则设置服务的状态
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            //如果实例为UP状态,则设置服务启动时间(仅第一次有效)
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
           //recentlyChangedQueue用于Client增量式获取注册表信息
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
           //设置responseCache缓存过期,用于Client全量获取注册表信息
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

在注册方法中有诸多同步或安全的操作,下面简析一二:

  1. 开始read.lock(),使用read的原因,因为readLock是共享锁,所以有多线程注册多个服务时,并行执行。对应的writeLock排它锁只用在获取增量数据时,因为获取增量数据时要保证数据此时不可更改,用writeLock禁止其他线程的read操作。readLock都是对内操作,writeLock是对外要保持统一口径。

  2. 代码中涉及的全局变量Map都是ConcurrentMap。保证线程安全,且全局共享。

  3. 在向registry中存值的时候,并没有强制性覆盖,用putIfAbsent代替put方法,两者都会返回key对应的旧值,但前者并不会覆盖。然后用之前的值来进行操作(因为是根据实例名获取的,所以后面只对其租约信息进行修改)。

  4. 自我保护机制处使用了synchronized+volatile:

  • 对于多个修改变量,采用synchronized空对象;
  • 因为变量要在别的方法中进行计算安全机制,所以需要volatile关键字,保证该字段随时可见。

(此处的字段不用CAS,因为涉及乘法,CAS对乘法不太友好。)

  1. 对recentRegisteredQueue的修改,锁定该对象,防止所有线程在别的方法中进行操作该值。
  2. 缓存类变量基本都是由guava的CacheBuilder来创建的,保证时效和容量等问题。

Tip:[1] 自我保护机制:随着Client注册数量的增加,期待每分钟维持心跳的数量也增加,eureka Server检查到每分钟续约的次数少于期待数量的阈值(默认0.85)时,就会认为出现异常情况,从而进入自我保护机制。默认2次/min/服务(基于心跳时间)。Eureka会保护这些实例,认为他们是健康的,不把他们移出注册列表。Eureka Server会输出红色的文字:

EMERGENCY!EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT.RENEWALS ARE LESSER THAN THRESHOLD AND HENCE THE INSTANCES ARE NOT BEGING EXPIRED JUST TO BE SAFE.

接受心跳服务

Client默认每隔30s向Server发起一次续租,也叫心跳服务。核心代码为com.netflix.eureka.registry.AbstractInstanceRegistry#renew

public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        //如果没有发现租约,返回false
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            //获取租约中的实例信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
            //获取服务的最终状态
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                  //如果实例的状态和最终的覆盖状态不一致,设置新的状态
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            //最后一分钟的续租数(该续租数每60秒调度自动更新为0)
            renewsLastMin.increment();
            //修改最后续租时间
            leaseToRenew.renew();
            return true;
        }
    }

这部分有很多东西都是在register方法里出现过的,所以不再赘述。

服务剔除

核心代码是com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)。该入口参数additionalLeaseMs 一般为0。此值是补偿时间,即如果出现GC或者时钟偏移等情况而补偿的时间。用来判断是否过期。

    @Override
    public void evict() {
        evict(0l);
    }


 public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
      //是否开启租约保护[1]
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // 遍历所有节点,找到租约过期的节点
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        //可剔除的节点数 = 本地节点总数 - 阈值节点数
        int evictionLimit = registrySize - registrySizeThreshold;
        //保护机制 - 保证剔除最少量的节点
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
           Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
               //随机获取一个节点,并打乱原有顺序(交换对应节点)
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                //服务下线
                internalCancel(appName, id, false);
            }
        }
    }

在打乱节点顺序部分,采用随机,是避免同一时间同一服务集群全部过期然后被剔除的现象,从而引发程序崩溃。

Tips:
[1] 是否开启租约保护:该值有两种情况:

  • eureka.server.enable-self-preservation配置有关,默认为true,即不开启租约保护。为false时开启租约保护。
  • 上述值为true时,期望每分钟的续租阈值数大于0且最后一分钟的续租数大于该期望值,即每分钟的续租数不能小于阈值数,则为true,否则开启保护机制,不准服务剔除。默认的阈值为0.85,通过eureka.server.renewal.percent.threshold来设置(必为double类型)。

服务下线

核心代码是com.netflix.eureka.registry.AbstractInstanceRegistry#internalCancel

    @Override
    public boolean cancel(String appName, String id, boolean isReplication) {
        return internalCancel(appName, id, isReplication);
    }

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            //获取读锁
            read.lock();
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                leaseToCancel = gMap.remove(id);
            }
          //recentCanceledQueue和之前的recentRegisteredQueue都是统计队列
            synchronized (recentCanceledQueue) {
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            //如果租约为空,直接返回false
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                return false;
            } else {
                //设置下线时间(也叫驱逐时间)
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    //添加到最近租约变更队列
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    //虚拟网络协议主机地址
                    vip = instanceInfo.getVIPAddress();
                    //安全的虚拟网络协议主机地址
                    svip = instanceInfo.getSecureVipAddress();
                }
                //清理缓存 - 使responseCache中指定属性移除
                invalidateCache(appName, vip, svip);
                return true;
            }
        } finally {
            read.unlock();
        }
    }

获取注册表

该功能涉及多个Eureka Server。
从Client部分缓存注册表也了解到注册表有全量和增量两种方式。

全量获取注册表

该部分的核心代码是com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationsFromMultipleRegions

 public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
        //查看是否有远程区域
        boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;

        if (includeRemoteRegion) {
            GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
        } else {
            GET_ALL_CACHE_MISS.increment();
        }
        Applications apps = new Applications();
        apps.setVersion(1L);
        //从本地registry上获取实例信息
        for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
            Application app = null;

            if (entry.getValue() != null) {
                for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                    Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                    if (app == null) {
                        app = new Application(lease.getHolder().getAppName());
                    }
                    //decorateInstanceInfo方法就是讲lease中holder转换为Instance
                    app.addInstance(decorateInstanceInfo(lease));
                }
            }
            if (app != null) {
                apps.addApplication(app);
            }
        }
        //从远程注册表上获取实例信息
        if (includeRemoteRegion) {
            for (String remoteRegion : remoteRegions) {
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteApps = remoteRegistry.getApplications();
                    for (Application application : remoteApps.getRegisteredApplications()) {
                        if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                            logger.info("Application {}  fetched from the remote region {}",
                                    application.getName(), remoteRegion);

                            Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                            if (appInstanceTillNow == null) {
                                appInstanceTillNow = new Application(application.getName());
                                apps.addApplication(appInstanceTillNow);
                            }
                            for (InstanceInfo instanceInfo : application.getInstances()) {
                                appInstanceTillNow.addInstance(instanceInfo);
                            }
                        } else {
                            logger.debug("Application {} not fetched from the remote region {} as there exists a "
                                            + "whitelist and this app is not in the whitelist.",
                                    application.getName(), remoteRegion);
                        }
                    }
                } else {
                    logger.warn("No remote registry available for the remote region {}", remoteRegion);
                }
            }
        }
        apps.setAppsHashCode(apps.getReconcileHashCode());
        return apps;
    }

入参remoteRegions为指定区域的数组集合。来源于配置eureka.server.remote-region-urls-with-name,默认为空。

从多地区增量式获取注册表

增量获取的要义还是从最近修改的队列中,获取到最近修改的instanceInfo,从全局map中获取到这些实例的完整信息,然后将这些instanceInfo打包过去。
该部分的核心代码com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
        if (null == remoteRegions) {
            remoteRegions = allKnownRemoteRegions; // null means all remote regions.
        }

        boolean includeRemoteRegion = remoteRegions.length != 0;

        if (includeRemoteRegion) {
            GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
        } else {
            GET_ALL_CACHE_MISS_DELTA.increment();
        }

        Applications apps = new Applications();
        apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
        Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
        try {
            //获取写锁,排它锁
            write.lock();
            Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
            while (iter.hasNext()) {
                Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                InstanceInfo instanceInfo = lease.getHolder();
                Application app = applicationInstancesMap.get(instanceInfo.getAppName());
                if (app == null) {
                    app = new Application(instanceInfo.getAppName());
                    applicationInstancesMap.put(instanceInfo.getAppName(), app);
                    apps.addApplication(app);
                }
                app.addInstance(decorateInstanceInfo(lease));
            }

            if (includeRemoteRegion) {
                for (String remoteRegion : remoteRegions) {
                    RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                    if (null != remoteRegistry) {
                        Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                        if (null != remoteAppsDelta) {
                            for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                                if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                    Application appInstanceTillNow =
                                            apps.getRegisteredApplications(application.getName());
                                    if (appInstanceTillNow == null) {
                                        appInstanceTillNow = new Application(application.getName());
                                        apps.addApplication(appInstanceTillNow);
                                    }
                                    for (InstanceInfo instanceInfo : application.getInstances()) {
                                        appInstanceTillNow.addInstance(instanceInfo);
                                    }
                                }
                            }
                        }
                    }
                }
            }

            Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
            apps.setAppsHashCode(allApps.getReconcileHashCode());
            return apps;
        } finally {
            write.unlock();
        }
    }

集群同步

该功能涉及多个Eureka Server。

Eureka Server初始化本地注册表

这部分主要通过代码com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp来完成。主要执行:

  • 它会先从peer(同伴)节点上拉取注册信息。
  • 并将其中的服务实例注册到本地注册表中。
@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    //注册表重试等待时间
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            //此处走com.netflix.eureka.registry.AbstractInstanceRegistry#register方法
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

在初始化本地注册表时,EurekaServer不会接受来自Client的通信请求(如注册、续租、获取注册表等)。在同步结束后,通过com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic方法来允许该Server接受流量。关于这两部分的调用逻辑,在com.netflix.eureka.EurekaBootStrap#initEurekaServerContext代码中。而该方法调用时是EurekaBootStrap实现javax.servlet.ServletContextListener#contextInitialized的内部。

@Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // 初始化自我保护机制统计参数
        this.expectedNumberOfRenewsPerMin = count * 2;
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        //判断是否是AWS环境,一般我们的环境是MyOwn环境
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        //修改服务实例状态为上线状态
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        //驱逐任务以及测试每分钟心率(租约率)的任务
        super.postInit();
    }

Eureka Server 之间注册信息表信息的同步复制

这部分代码核心在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中,其包括下线,注册和续约等等。

下线
 @Override
    public boolean cancel(final String appName, final String id,
                          final boolean isReplication) {
        //调用 AbstractInstanceRegistry 的cancel
        if (super.cancel(appName, id, isReplication)) {
            //复制(同步)下线状态到同伴
            replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
            //锁定空对象
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            return true;
        }
        return false;
    }
注册方法
   @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        //默认租约时间90s
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        //如果租约为空,则用默认时间;若租约不为空,注册租约该有的剩余时间
        //调用 AbstractInstanceRegistry 的renew
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
续租方法
    public boolean renew(final String appName, final String id, final boolean isReplication) {
        //调用 AbstractInstanceRegistry 的renew
        if (super.renew(appName, id, isReplication)) {
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
同步到同伴节点

下面是复制到同伴的入口方法,代码是com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        //Stopwatch是测试执行某些代码的时间
        Stopwatch tracer = action.getTimer().start();
        try {
            //是否是复制的
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // 如果peer集群为空,或者本来就是复制操作,则不再执行,防止造成污染复制
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                //同步Instance到同伴节点
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

peerEurekaNodes是同伴Eureka Server节点。每个Eureka Server会向同伴同步数据。

同步到同伴的代码为com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers。根据同步的Action操作,来分类同步。

 private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

同步操作只有以下几种:

public enum Action {
        Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
......
}

上面的请求在com.netflix.eureka.transport.JerseyReplicationClient中。如HeartBeat的请求在com.netflix.eureka.transport.JerseyReplicationClient#sendHeartBeat中。可以查看路径等。这部分路径就和client中处理对应功能是一样的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352

推荐阅读更多精彩内容