Eureka源码浅读---自动故障移除

Eureka源码采用1.7.2版本

本人小白,此文为本人阅读源码笔记,如果您读到本文,您需要自己甄别是否正确,文中的说明只代表本人理解,不一定是正确的!!!

自动故障移除是说,当服务端长时间未接收到服务实例的心跳,会认为服务实例已经出现宕机,会自动将服务实例从注册表中进行摘除

自动故障移除依赖服务初始化时启动的一个定时检测线程

com.netflix.eureka.EurekaBootStrap#initEurekaServerContext

    //定时检查服务实例是否故障,并自动下线
    registry.openForTraffic(applicationInfoManager, registryCount);

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl

@Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // 参数列表的count就是服务端初始化是进行注册表拉取的服务实例数量
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        //30秒心跳  1分钟就是count * 2 写死了
        //心跳发送时间可以设置
        this.expectedNumberOfRenewsPerMin = count * 2;
        //期望每分钟最少的心跳数量   serverConfig.getRenewalPercentThreshold()默认0.85
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        logger.info("Got " + count + " instances from neighboring DS node");
        logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        //核心方法
        super.postInit();
    }

com.netflix.eureka.registry.AbstractInstanceRegistry#postInit

  protected void postInit() {
        //开始执行expectedNumberOfRenewsPerMin定时更新 1min
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        //启动定时检测任务 1分钟执行一次
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }

在这个方法中启动了两个线程进行自动执行,可以任务两者是同时启动的,一个是renewsLastMin(),这个方法是每分钟统计下当前1分钟内接收到了多少个心跳请求,并且存储起来,EvictionTask 是服务实例定时故障检测任务

com.netflix.eureka.registry.AbstractInstanceRegistry.EvictionTask#run

        @Override
        public void run() {
            try {
                //获取补偿时间
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
        
        //这个补偿时间的计算方法很实用,重点说明下
        
         long getCompensationTimeMs() {
            //获取当前时间
            long currNanos = getCurrentTimeNano();
            //获取上次设置时间,并设为当前时间
            long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
            if (lastNanos == 0l) {
                return 0l;
            }
            
            //计算两者的时间差 elapsedMs
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            // elapsedMs 减去配置的定时执行时间
            long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0l ? 0l : compensationTime;
        }

获取时间补偿的这个方法是防止网络波动,JVM的stop world造成的任务没有按照设定好的时间进行执行,弥补这个现象造成的影响

com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        //是否允许主动下线故障实例   和自我保护机制相关
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    //过期判断 判断当前服务实例是否过期
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        //将过期的服务实例加入到过期列表中
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        //获取当前服务实例数量
        int registrySize = (int) getLocalRegistrySize();
        //计算保留最少保留的服务实例数量
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        //计算可以摘除的服务实例数量
        int evictionLimit = registrySize - registrySizeThreshold;

        //获取过期服务数量和应该摘除数量的最小值
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);

        //随机摘除toEvict个实例
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                //摘除方法 和 服务下线方法一样
                internalCancel(appName, id, false);
            }
        }
    }

这个方法有两个地方比较有意思

1.第一个地方 过期判断

过期判断:lease.isExpired(additionalLeaseMs)

这个是校验续约是否过期的一个方法

 public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

由此可以看出判断过期条件主要如下:

  • 过期时间戳大于0,主要是服务下线的时候调用的cancel()方法,将该时间戳设置成当前时间
  • 当前时间大于最后更新时间戳+持续时间+补偿时间,一般情况下,补偿时间为0,且在renew()方法中,将lastUpdateTimestamp设置成lastUpdateTimestamp = System.currentTimeMillis() + duration,所以当前时间需要大于最后更新时间 + 2 * duration,duration默认为90S,所以需要大于180S,可以这么说服务实例的过期是超过3分钟没有心跳,有点扯了,要知道定时检测服务是否过期的任务可是1min执行一次,这块我不太清楚为什么这么设计任务的执行时间和租约过期时间

2.第二个地方 服务实例移

整个方法的的服务实例移除的过程分为以下几步:

  • 判断实例是否过期,如果过期加入到过期列表中 expiredLeases
  • 依据本地注册服务实例数量计算可以移除的服务实例,这个计算方法是服务实例数量 - (服务实例数量 * 0.85)
  • 比较过期列表和可移除的服务实例数量,取最小值 toEvict
  • 在过期服务列表中随机选取toEvict个服务实例进行移除

移除的方法调用的是服务下线方法internalCancel(),这个方法需要说明下,这里调用的是子类直接实现,没有外层的封装,即没有numberOfRenewsPerMinThreshold-2的过程,说明自动故障实例的移除过程没有触发期望每分钟最小心跳数量的更新,这个设计我觉得比较合理,如果不这么设计,自动保护永远不会触发了,接着一点internalCancel()实际上做了三件事,注册表移除该实例,将该实例加入最新修改队列,清空读写缓存中该实例的信息

所以从上面代码可以看出,自动服务故障移除中服务实例摘除的数量不会超过15%

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容