Spring Cloud——Eureka Server服务同步和服务剔除

服务同步

服务同步是Server节点之间的数据同步。分为启动时同步,运行时同步。

启动同步

在EurekaServerBootstrap类中的初始化上下文方法initEurekaServerContext()方法中会调用PeerAwareInstanceRegistry.syncUp()方法从邻近的eureka节点复制注册表。

启动同步时,会先遍历Applications中获取的服务信息,并将服务信息注册到registry中。可以参考PeerAwareInstanceRegistryImpl类中的syncUp方法:

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public int syncUp() {
        // 获取到的注册节点数量
        int count = 0;
        // 如果count==0 , 那么默认重试5次(前提是开启了register-with-eureka = true,否则为0)
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    // 从第二次开始,每次默认沉睡30秒
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            // 从本地内存里面获取注册实例信息
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        // 判断是否可以注册
                        if (isRegisterable(instance)) {
                            // 注册到当前Eureka Server里面
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }
}

参数说明:

  • regirstrySyncRetries:当eureka服务器启动时尝试去获取集群里其他服务器上的注册信息的次数,默认为5,只有当 eureka.client.register-with-eureka = true 的时候才会是5,如果是false ,则为0。

  • registrySyncRetryWaitMs:当eureka服务器启动时获取其他服务器的注册信息失败时,会再次尝试获取,期间需要等待的时间,默认为30 * 1000毫秒。

  • count:获取到的注册实例数量,如果为0 则根据重试次数进行重试,每次重试前沉默 30秒。

讲过Eureka Client启动的时候默认会自动从Eureka Server获取注册信息, 要想Eureka Server在启动的时候可以同步其他集群节点的注册信息,那么必须开启客户端配置。

# 是否作为一个Eureka Client 注册到Eureka Server上去
eureka.client.register-with-eureka = true

# 是否需要从Eureka Server上拉取注册信息到本地。
eureka.client.fetch-registry = true              

只有开启了上面两个配置,那么集群节点在启动的时候,会初始化Eureka Client端的配置 ,会从其他Eureka Server拉取注册信息到本地,同时在初始化Eureka Server的时候,会从本地内存里面读取 注册信息,自动注册到本身的服务上。

集群同步类型

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    public enum Action {
        Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;

        private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());

        public com.netflix.servo.monitor.Timer getTimer() {
            return this.timer;
        }
    }
}

Heartbeat : 心跳续约

Register : 注册

Cancel : 下线

StatusUpdate : 添加覆盖状态

DeleteStatusOverride : 删除覆盖状态   

运行时同步

server端当有reigster、renew、cancel请求进来时,会将这些请求封装到一个task中,然后放到一个队列当中,然后经过一系列的处理后,在放到另一个队列中。 可以查看PeerAwareInstanceRegistryImpl类中的BatchWorkerRunnable类。

发起同步

这里以注册的代码为例

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            //默认租约90s,如果用户更改了心跳周期等,使用用户自定义的租约
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // 发起注册 调用父类的注册,注册到本地双层Map中
        super.register(info, leaseDuration, isReplication);
        // 注册完成后,在这里发起同步,同步类型为Register
        //本地注册完成后,向其他节点复制,注意isReplication这个属性
        //用来判断是client发来的注册,还是其他Eureka Server临节点复制过来的注册
        //如果是复制过来的注册信息,那么就不再向其他Eureka Server节点进行传播复制
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            // 判断是否是集群同步请求,如果是,则记录最后一分钟的同步次数
            if (isReplication) {
                //记录每分钟收到的复制次数
                numberOfReplicationsLastMin.increment();
            }
            
            // 集群节点为空,或者这是一个Eureka Server 同步请求,直接return
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                //如果没有Eureka Server临节点,或者是别的Eureka Server复制过来的信息
                //那么就不再向其他临节点进行复制,
                //也就是说既然收到了复制过来的信息,那么其他eureka server节点也会收到
                //所以就没必要再去发送一遍复制了,return。
                return;
            }

            // 循环相邻的Eureka Server Node, 分别发起请求同步
            // 遍历所有的Eureka Server邻节点,向它们复制register、cancel等信息
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // 判断是否是自身的URL,过滤掉
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                // 发起同步请求
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }   
}   

步骤说明:

  • 1、判断集群节点是否为空,为空则返回

  • 2、isReplication 代表是否是一个复制请求, isReplication = true 表示是其他Eureka Server发过来的同步请求,这个时候是不需要继续往下同步的。否则会陷入同步死循环。

  • 3、循环集群节点,过滤掉自身的节点。

  • 4、发起同步请求 ,调用replicateInstanceActionsToPeers。

一个Eureka Server在收到了Client的注册等信息时,会挨个通知其他Eureka Server临节点,复制的流程图也就是下面这个样子


发起同步请求 ,调用replicateInstanceActionsToPeers

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:    // 下线
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    // 心跳
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    // 获取本地最新的实例信息
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:  // 注册
                    node.register(info);
                    break;
                case StatusUpdate:  // 设置覆盖状态
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:  //删除覆盖状态
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            //由于此方法是循环复制操作,如果发生异常不进行处理,直接抛出,那么就不会向后面的节点复制了
            //比如有10个Eureka Server节点,再向第2个复制的时候抛出异常,后面8个节点就收不到复制信息
            //这个地方,只是做log,并没有抛出异常
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        } finally {
            CurrentRequestVersion.remove();
        }
    }
}   

PeerEurekaNode的register方法如下:

public class PeerEurekaNode {

    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        // 默认采用的是批处理
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }
}

默认采用的是批量任务处理器,就是将task放入任务队列中,然后通过线程获取任务队列里面的任务,模仿ThreadExecutorPool的方式,生成线程,从队列里面抓取任务处理,统一批量执行,Eureka Server 那边也是统一接收,这样提高了同步效率

批量处理的任务执行器是com.netflix.eureka.cluster.ReplicationTaskProcessor

class ReplicationTaskProcessor implements TaskProcessor<ReplicationTask> {

    @Override
    public ProcessingResult process(List<ReplicationTask> tasks) {
        // 构建ReplicationInstance放入ReplicationList
        ReplicationList list = createReplicationListOf(tasks);
        try {
            // 发起批量处理请求
            EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
            int statusCode = response.getStatusCode();
            if (!isSuccess(statusCode)) {
                if (statusCode == 503) {
                    logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                    return ProcessingResult.Congestion;
                } else {
                    // Unexpected error returned from the server. This should ideally never happen.
                    logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                    return ProcessingResult.PermanentError;
                }
            } else {
                // 处理执行结果,成功则调用handleSuccess ,失败则调用handleFailure。
                handleBatchResponse(tasks, response.getEntity().getResponseList());
            }
        } catch (Throwable e) {
            if (maybeReadTimeOut(e)) {
                logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
                //read timeout exception is more Congestion then TransientError, return Congestion for longer delay 
                return ProcessingResult.Congestion;
            } else if (isNetworkConnectException(e)) {
                logNetworkErrorSample(null, e);
                return ProcessingResult.TransientError;
            } else {
                logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
                return ProcessingResult.PermanentError;
            }
        }
        return ProcessingResult.Success;
    }
}

请求批量处理的接口地址 : peerreplication/batch/

handleBatchResponse(tasks, response.getEntity().getResponseList()) , 循环调用处理结果,成功则调用handleSuccess. , 失败则调用handleFailure , 比如hearbeat的时候,调用返回码为404的时候,会重新发起注册。

public class PeerEurekaNode {

    public void heartbeat(final String appName, final String id,
                          final InstanceInfo info, final InstanceStatus overriddenStatus,
                          boolean primeConnection) throws Throwable {
        if (primeConnection) {
            // We do not care about the result for priming request.
            replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            return;
        }
        ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
            @Override
            public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
                return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            }

            @Override
            public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                super.handleFailure(statusCode, responseEntity);
                if (statusCode == 404) {
                    logger.warn("{}: missing entry.", getTaskName());
                    if (info != null) {
                        logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                                getTaskName(), info.getId(), info.getStatus());
                        // 重新发起注册。
                        register(info);
                    }
                } else if (config.shouldSyncWhenTimestampDiffers()) {
                    InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                    if (peerInstanceInfo != null) {
                        syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                    }
                }
            }
        };
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
    }
}

Eureka Server接收同步

程序入口 : com.netflix.eureka.resources.PeerReplicationResource

@Path("/{version}/peerreplication")
@Produces({"application/xml", "application/json"})
public class PeerReplicationResource {

    @Path("batch")
    @POST
    public Response batchReplication(ReplicationList replicationList) {
        try {
            ReplicationListResponse batchResponse = new ReplicationListResponse();
            // 循环请求的任务
            for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
                try {
                    // 分发任务,同时将处理结果收集起来,等会统一返回
                    batchResponse.addResponse(dispatch(instanceInfo));
                } catch (Exception e) {
                    batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
                    logger.error("{} request processing failed for batch item {}/{}",
                            instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
                }
            }
            return Response.ok(batchResponse).build();
        } catch (Throwable e) {
            logger.error("Cannot execute batch Request", e);
            return Response.status(Status.INTERNAL_SERVER_ERROR).build();
        }
    }
    
    private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
        //  创建实例
        ApplicationResource applicationResource = createApplicationResource(instanceInfo);
        // 创建实例
        InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
        //获取客户端instance的lastDirtyTimestamp  ,有点类似于版本号的概念。
        String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
        // 获取覆盖状态
        String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
        // 获取instance的状态
        String instanceStatus = toString(instanceInfo.getStatus());

        Builder singleResponseBuilder = new Builder();
        switch (instanceInfo.getAction()) {
            case Register:  // 注册
                singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
                break;
            case Heartbeat: // 心跳续约
                singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
                break;
            case Cancel:    // 下线
                singleResponseBuilder = handleCancel(resource);
                break;
            case StatusUpdate:  // 修改覆盖状态
                singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
                break;
            case DeleteStatusOverride:  // 删除覆盖状态
                singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
                break;
        }
        return singleResponseBuilder.build();
    }   
}

以上五个场景,这里就不一一说了,就说一下注册吧:

@Path("/{version}/peerreplication")
@Produces({"application/xml", "application/json"})
public class PeerReplicationResource {

    private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
        applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
        return new Builder().setStatusCode(Status.OK.getStatusCode());
    }
    
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }   
}

EPLICATION = “true” ,此次请求为true,表示是一个服务端的复制请求。

由上面可以知道,集群同步走的和客户端注册的后续流程是一样的,只不过isReplication=true , 表明这是一个集群同步的请求。

服务剔除

服务剔除其实是一个兜底的方案,目的就是解决非正常情况下的服务宕机或其他因素导致不能发送cancel请求的服务信息清理的策略。

服务剔除分为:

  • 判断剔除条件
  • 找出过期服务
  • 清理过期服务

剔除条件:

  • 关闭自我保护

  • 自我保护如果开启,会先判断是server还是client出现问题,如果是client的问题就会进行删除

  • 自我保护机制:Eureka的自我保护机制是为了防止误杀服务提供的一种保护机制。Eureka的自我保护机制认为如果有大量的服务都续约失败,则认为自己出现了问题(例如:自己断网了),也就不剔除了。反之,则是它人的问题,就进行剔除。

自我保护的阈值分为server和client,如果超出阈值就是表示大量服务可用,部分服务不可用,这判定为client端出现问题。如果未超出阈值就是表示大量服务不可用,则判定是自己出现了问题。

阈值的计算:

  • 自我保护阈值 = 服务总数 每分钟续约数 自我保护阈值因子;
  • 每分钟续约数 = (60s / 客户端续约时间);

过期服务:

找出过期服务会遍历所有的服务,判断上次续约时间距离当前时间大于阈值就标记为过期,同时会将这些过期的服务保存的过期的服务集合中。

剔除服务:

剔除服务之前会先计算要是剔除的服务数量,然后遍历过期服务,通过洗牌算法确保每次都公平的选择出要剔除的服务,然后进行剔除。

执行剔除服务后:

  • 从register中删除服务信息;
  • 更新队列;
  • 清空二级缓存,保证数据的一致性;

开启服务剔除

在EurekaServerBootstrap类中的初始化上下文方法initEurekaServerContext()方法中会调用PeerAwareInstanceRegistry.openForTraffic()方法开启服务剔除。

public class EurekaServerBootstrap {

    protected void initEurekaServerContext() throws Exception {
        //省略。。。。。。
        
        // 从邻近的eureka节点复制注册表
        int registryCount = this.registry.syncUp();
        
        // 默认每30秒发送心跳,1分钟就是2次
        // 修改eureka状态为up 
        // 同时,这里面会开启一个定时任务,用于清理 60秒没有心跳的客户端。自动下线
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);

        // 注册所有监控统计信息
        EurekaMonitors.registerAllStats();
    }
}

registry.openForTraffic

修改eureka状态为up,同时开启一个定时任务,用于清理 60秒没有心跳的客户端

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        // 计算每分钟最大续约数
        this.expectedNumberOfClientsSendingRenews = count;
        // 每分钟最小续约数
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        // 设置实例的状态为UP
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        // 开启定时任务,默认60秒执行一次,用于清理60秒之内没有续约的实例
        super.postInit();
    }
    
    // 每分钟最小续约数
    protected void updateRenewsPerMinThreshold() {
        this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                * serverConfig.getRenewalPercentThreshold());
    }   
}

Eureka Server服务剔除

从上面代码可知Eureka Server 的服务剔除,它是通过定时任务完成的,在EurekaBootStrap启动引导的initEurekaServerContext上下文初始化方法中,调用了这么一行代码registry.openForTraffic(applicationInfoManager, registryCount);在该方法中又调用了com.netflix.eureka.registry.AbstractInstanceRegistry#postInit方法来初始化服务剔除的定时任务。

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            //如果服务剔除任务不为空,就执行cancel方法,该方法把任务的状态修改为了cancel任务取消
            evictionTaskRef.get().cancel();
        }
        //创建新的服务剔除任务
        evictionTaskRef.set(new EvictionTask());
        //交给调度器去执行,延迟60s,每60s执行一次驱逐任务
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),    //60s 逐出间隔计时器
                serverConfig.getEvictionIntervalTimerInMs());
    }
}

EvictionTask定时任务

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    class EvictionTask extends TimerTask {

        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

        @Override
        public void run() {
            try {
                //计算任务执行的时间偏差:补偿时间
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                
                //执行驱逐
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
    }
}

在驱逐任务中,计算了任务执行的时间偏差即补偿时间,然后调用com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)执行服务的剔除逻辑

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            //如果没启用租约到期,直接返回
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        //首先收集所有过期的服务,以随机顺序将其逐出
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        //循环注册表中的所有的服务
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                //获取到租约
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    //如果服务过期,就把服务添加到expiredLeases map中
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        //为了补偿GC暂停或本地时间差异导致的剔除任务执行时间差异,使用当前注册表大小作为触发自我保存的基础
        //否则,将清除完整的注册表。
        //注册表大小
        int registrySize = (int) getLocalRegistrySize();
        //注册表中服务的续约阈值 = 注册大小 * 0.85
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        //驱逐极限 = 注册表大小 - 注册表续约阈值 
        int evictionLimit = registrySize - registrySizeThreshold;
        //过期的服务数 和 evictionLimit  取最小 ,如果大于 0 说明需要有服务要剔除
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            //剔除 toEvict 个
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            //取随机值
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                //选择一个随机项目(Knuth随机算法),随机剔除
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                //获取剔除服务的Lease
                Lease<InstanceInfo> lease = expiredLeases.get(i);
                //应用名
                String appName = lease.getHolder().getAppName();
                //实例ID
                String id = lease.getHolder().getId();
                //expired Counter 过期计数增加
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
}

这里做了如下事情

  • 1、判断是否开启过期驱逐
  • 2、获取到所有的过期的服务,通过Lease.isExpired判断过期
  • 3、计算一个驱逐极限值 :min( 过期数 ,注册表服务数 - 注册表服务数 * 0.85(续约阈值百分比) )
  • 4、如果驱逐极限值 > 0 ,那就从过期的服务中随机驱逐 “驱逐极限”个服务
  • 5、调用internalCancel方法消息服务

我们可以看下Lease.isExpired是如何判断实例过期的

public class Lease<T> {
    
    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }
}

这里给的过期计算方式是: evictionTimestamp (剔除时间戳) > 0 || 最后更新时间戳 + 租期(90s) + 补偿时间 。

但是有意思的是这个方法上的注释说了一个问题:它说由于renew()做了“错误”的事情,将lastUpdateTimestamp设置为+duration,超过了它应该的值,因此到期实际上是2 * duration,这个是个小问题,没有什么影响就没做修改。意思是renew方法中的lastUpdateTimestamp时间 不应该 + duration租期时间,这超过了它应该的值,因此到期实际上是2 * duration。

public class Lease<T> {
    
    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }
}

当然这个问题没有太大影响,所以没做改正,注释上也说明白了这个问题,这个方法我们就看到这。

继续看一下internalCancel内部取消服务的方法

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            //上锁
            read.lock();
            //服务取消数增加
            CANCEL.increment(isReplication);
            //获取当前剔除的服务
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                //从服务注册的map中移除掉当前服务
                leaseToCancel = gMap.remove(id);
            }
            //添加到最近取消队列
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            //overriddenInstanceStatusMap 服务状态map中移除当前服务
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                //没找到服务
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                //调用Lease.cancel方法
                leaseToCancel.cancel();
                //获取服务实例信息
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    //实例状态修改为删除
                    instanceInfo.setActionType(ActionType.DELETED);
                    //添加最近修改队列
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    //实例信息对象修改最后修改时间
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                //使缓存无效,调用responseCache.invalidate让服务在缓存中失效
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            }
        } finally {
            read.unlock();
        }

        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews.
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }

        return true;
    }
}

这里主要是根据当前要取消的服务名从registry中查询出服务之后做了这些事情

  • 1、从registry中移除服务,
  • 2、从overriddenInstanceStatusMap状态map中移除服务状态
  • 3、添加到最近取消队列
  • 4、调用Lease.cancel方法,将租约对象中的逐出时间修改为当前时间
  • 5、修改服务的InstanceInfo的状态为DELETE
  • 6、添加到最近修改队列
  • 7、更新服务最后修改时间
  • 8、使ReponseCache缓存无效

服务剔除总结

参考:
https://segmentfault.com/a/1190000021284890

https://blog.csdn.net/qq_36960211/article/details/85278254

https://blog.csdn.net/u014494148/article/details/108900884

https://www.jianshu.com/p/b8c614c442e0

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容