Eureka Client源码解析

Eureka Client为了简化开发人员的开发工作,将很多与Eureka Server交互的工作隐藏起来,自主完成。在应用的不同运行阶段在后台完成工作如图所示。


Eureka Client工作.jpg

1、服务发现客户端

为了理解Eureka Client的执行原理,首先需要对服务发现客户端com.netflix.discover.DiscoveryClient职能以及相关类进行讲解,它负责了与Eureka Server交互的关键逻辑。

1.1、DiscoveryClient职责

DiscoveryClient是Eureka Client的核心类,包括与Eureka Server交互的关键逻辑,具备了以下职能:
注册服务实例到Eureka Server中;
发送心跳更新与Eureka Server的租约;
在服务关闭时从Eureka Server中取消租约,服务下线;
查询在Eureka Server中注册的服务实例列表。

1.2、DiscoveryClient类结构
DiscoveryClient.jpg

DiscoveryClient继承了LookupService接口,LookupService作用是发现活跃的服务实例,主要方法如下:

package com.netflix.discovery.shared;

import java.util.List;
import com.netflix.appinfo.InstanceInfo;

public interface LookupService<T> {

    /**
     *根据服务实例注册的appName来获取封装有相同appName的服务实例信息容器
     */
    Application getApplication(String appName);

    /**
     *返回当前注册表中所有的服务实例信息
     */
    Applications getApplications();

    /**
     *根据服务实例的id获取服务实例信息
     */
    List<InstanceInfo> getInstancesById(String id);

    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

Application持有服务实例信息列表,它可以理解成同一个服务的集群信息,这些服务实例都挂在同一个服务名appName下。InstanceInfo代表一个服务实例信息。Application部分代码如下:

public class Application {
    
    private static Random shuffleRandom = new Random();

    @Override
    public String toString() {
        return "Application [name=" + name + ", isDirty=" + isDirty
                + ", instances=" + instances + ", shuffledInstances="
                + shuffledInstances + ", instancesMap=" + instancesMap + "]";
    }

    private String name;

    @XStreamOmitField
    private volatile boolean isDirty = false;

    @XStreamImplicit
    private final Set<InstanceInfo> instances;
    ......
}

Applications是注册表中所有服务实例信息的集合,里面的操作大多也是同步操作。

EurekaCient在LookupService的基础上扩充了更多的接口,提供了更丰富的获取服务实例的方式,主要有:
提供了多种方式获取InstanceInfo,例如根据区域、Eureka Server地址等获取。
提供了本地客户端(所处的区域、可用区等)的数据,这部分与AWS密切相关。
提供了为客户端注册和获取健康检查处理器的能力。
除去查询相关的接口,我们主要关注EurekaClient中以下两个接口,代码如下所示:

public interface EurekaClient extends LookupService {

    //为Eureka Client注册健康检查处理器
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler);

    // 监听Client服务实例信息的更新
    public void registerEventListener(EurekaEventListener eventListener);
    ......
}

Eureka Server一般通过心跳(heartbeats)来识别一个实例的状态。Eureka Client中存在一个定时任务定时通过HealthCheckHandler检测当前Client的状态,如果Client的状态发生改变,将会触发新的注册事件,更新Eureka Server的注册表中该服务实例的相关信息。
HealthCheckHandler代码如下所示:

package com.netflix.appinfo;

/**
 * This provides a more granular healthcheck contract than the existing {@link HealthCheckCallback}
 *
 * @author Nitesh Kant
 */
public interface HealthCheckHandler {

    InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);

}

Eureka中的事件模式属于观察者模式,事件监听器将监听Client的服务实例信息变化,触发对应的处理事件

1.3、DiscoveryClient构造函数

在DiscoveryClient构造函数中,Eureka Client会执行从Eureka Server中拉取注册表信息、服务注册、初始化发送心跳、缓存刷新(重新拉取注册表信息)和按需注册定时任务等操作,可以说DiscoveryClient的构造函数贯穿了Eureka Client启动阶段的各项工作。

在构造方法中,忽略掉构造方法中大部分的赋值操作,我们将会逐步了解配置类中的属性会对DiscoveryClient的行为造成什么影响。部分代码如下所示:

        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

config#shouldFetchRegistry(对应配置为eureka.client.fetch-register)为true表示Eureka Client将从Eureka Server中拉取注册表信息。config#shouldRegisterWithEureka(对应配置为eureka.client.register-with-eureka)为true表示Eureka Client将注册到Eureka Server中。如果上述的两个配置均为false,那么Discovery的初始化将直接结束,表示该客户端既不进行服务注册也不进行服务发现。

接着定义一个基于线程池的定时器线程池ScheduledExecutorService,线程池大小为2,一个线程用于发送心跳,另一个线程用于缓存刷新,同时定义了发送心跳和缓存刷新线程池,代码如下所示:

            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

之后,初始化Eureka Client与Eureka Server进行HTTP交互的Jersey客户端

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

EurekaTransport是DiscoveryClient中的一个内部类,其内封装了DiscoveryClient与Eureka Server进行HTTP调用的Jersey客户端。
再接着从Eureka Server中拉取注册表信息

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

如果EurekaClientConfig#shouldFetchRegistry为true时,fetchRegistry方法将会被调用。在Eureka Client向Eureka Server注册前,需要先从Eureka Server拉取注册表中的信息,这是服务发现的前提。通过将Eureka Server中的注册表信息缓存到本地,就可以就近获取其他服务的相关信息,减少与Eureka Server的网络通信。

拉取完Eureka Server中的注册表信息后,将对服务实例进行注册,代码如下所示:

        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                //发起服务注册
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();

在服务注册之前会进行注册预处理,Eureka没有对此提供默认实现。构造函数的最后将初始化并启动发送心跳、缓存刷新和按需注册等定时任务。

最后总结一下,在DiscoveryClient的构造函数中,主要依次做了以下的事情:
1)相关配置的赋值,类似ApplicationInfoManager、EurekaClientConfig等。
2)备份注册中心的初始化,默认没有实现。
3)拉取Eureka Server注册表中的信息。
4)注册前的预处理,默认没有实现。
5)向Eureka Server注册自身。
6)初始化心跳定时任务、缓存刷新和按需注册等定时任务。

2、拉取注册表信息

在DiscoveryClient的构造函数中,调用了DiscoveryClient#fetchRegistry方法从Eureka Server中拉取注册表信息,方法执行如下所示:

    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // 如果增量式拉取被禁止,或者applications == null,则进行全量拉取
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                //全量拉取注册信息
                getAndStoreFullRegistry();
            } else {
                //增量拉取注册信息
                getAndUpdateDelta(applications);
            }
            //计算应用集合一致性hash码
            applications.setAppsHashCode(applications.getReconcileHashCode());
            //打印注册表上实例的总数量
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

一般来讲,在Eureka客户端,除了第一次拉取注册表信息,之后的信息拉取都会尝试只进行增量拉取(第一次拉取注册表信息为全量拉取),下面将分别介绍拉取注册表信息的两种实现,全量拉取注册表信息DiscoveryClient#getAndStoreFullRegistry和增量式拉取注册表信息DiscoveryClient#getAndUpdateDelta。

2.1、全量拉取注册表信息

一般只有在第一次拉取的时候,才会进行注册表信息的全量拉取,主要在DiscoveryClient#getAndStoreFullRegistry方法中进行。

    private void getAndStoreFullRegistry() throws Throwable {
        // 获取拉取的注册表的版本,防止拉取版本落后(由其他的线程引起)
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } 
         // 检查fetchRegistryGeneration的更新版本是否发生改变,无改变的话说明本次拉取是最新的
        else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //更新缓存
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

全量拉取将从Eureka Server中拉取注册表中所有的服务实例信息(封装在Applications中),并经过处理后替换掉本地注册表缓存Applications。
getAndStoreFullRegistry方法可能被多个线程同时调用,导致新拉取的注册表被旧的注册表覆盖(有可能出现先拉取注册表信息的线程在覆盖apps时被阻塞,被后拉取注册表信息的线程抢先设置了apps,被阻塞的线程恢复后再次设置了apps,导致apps数据版本落后),产生脏数据,对此,Eureka通过类型为AtomicLong的currentUpdateGeneration对apps的更新版本进行跟踪。如果更新版本不一致,说明本次拉取注册表信息已过时,不需要缓存到本地。拉取到注册表信息之后会对获取到的apps进行筛选,只保留状态为UP的服务实例信息。

2.2、增量式拉取注册表信息

增量式的拉取方式,一般发生在第一次拉取注册表信息之后,拉取的信息定义为从某一段时间之后发生的所有变更信息,通常来讲是3分钟之内注册表的信息变化。在获取到更新的delta后,会根据delta中的增量更新对本地的数据进行更新。与getAndStoreFullRegistry方法一样,也通过fetchRegistryGeneration对更新的版本进行控制。增量式拉取是为了维护Eureka Client本地的注册表信息与Eureka Server注册表信息的一致性,防止数据过久而失效,采用增量式拉取的方式减少了拉取注册表信息的通信量。Client中有一个注册表缓存刷新定时器专门负责维护两者之间信息的同步性。但是当增量式拉取出现意外时,定时器将执行全量拉取以更新本地缓存的注册表信息。具体代码如下所示:

    private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
        //获取增量拉取失败,则进行全量拉取
        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    //更新本地缓存
                    updateDelta(delta);
                    //计算合并后的Applications的appsHashCode(应用集合一致性哈希码)
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // 和Eureka Server传递的delta上的appsHashCode进行比较
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                //方法中将会执行拉取全量注册表信息操作。
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }

3、服务注册

在拉取完Eureka Server中的注册表信息并将其缓存在本地后,Eureka Client将向Eureka Server注册自身服务实例元数据,主要逻辑位于Discovery#register方法中。register方法代码如下所示:

    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

Eureka Client会将自身服务实例元数据(封装在InstanceInfo中)发送到Eureka Server中请求服务注册,当Eureka Server返回204状态码时,说明服务注册成功。

4、初始化定时任务

很明显,服务注册应该是一个持续的过程,Eureka Client通过定时发送心跳的方式与Eureka Server进行通信,维持自己在Server注册表上的租约。同时Eureka Server注册表中的服务实例信息是动态变化的,为了保持Eureka Client与Eureka Server的注册表信息的一致性,Eureka Client需要定时向Eureka Server拉取注册表信息并更新本地缓存。为了监控Eureka Client应用信息和状态的变化,Eureka Client设置了一个按需注册定时器,定时检查应用信息或者状态的变化,并在发生变化时向Eureka Server重新注册,避免注册表中的本服务实例信息不可用。

在DiscoveryClient#initScheduledTasks方法中初始化了三个定时器任务,一个用于向Eureka Server拉取注册表信息刷新本地缓存;一个用于向Eureka Server发送心跳;一个用于进行按需注册的操作。代码如下所示:

    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            //注册表缓存刷新定时器
            // 获取配置文件中刷新间隔,默认为30s,
            //可以通过eureka.client.registry-fetch-interval-seconds进行设置
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            //发送心跳定时器,默认30秒发送一次心跳
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

4.1、缓存刷新定时任务与发送心跳定时任务

在DiscoveryClient#initScheduledTasks方法中,通过ScheduledExecutorService#schedule的方式提交缓存刷新任务和发送心跳任务,任务执行的方式为延时执行并且不循环,这两个任务的定时循环逻辑由TimedSupervisorTask提供实现。TimedSupervisorTask继承了TimerTask,提供执行定时任务的功能。它在run方法中定义执行定时任务的逻辑。具体代码如下所示:

    public void run() {
        Future<?> future = null;
        try {
            //执行定时任务
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            //等待任务执行结果
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            //执行完成,设置下次执行任务时间间隔
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            //执行任务超时
            timeoutCounter.increment();
            //设置下次执行任务时间间隔
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }
            //执行任务被拒绝
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }
            //统计其他的异常
            throwableCounter.increment();
        } finally {
            //取消未结束的任务
            if (future != null) {
                future.cancel(true);
            }

            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

TimedSupervisorTask通过这种不断循环提交任务的方式,完成定时执行任务的要求。
在DiscoveryClient#initScheduledTasks方法中,提交缓存刷新定时任务的线程任务为CacheRefreshThread,提交发送心跳定时任务的线程为HeartbeatThread。CacheRefreshThread继承了Runnable接口,代码如下所示:

    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

    @VisibleForTesting
    void refreshRegistry() {
         //省略其他代码
         //判断远程Region是否改变(即Eureka Server地址是否发生变化),决定进行全量拉取还是增量式拉取
         boolean success = fetchRegistry(remoteRegionsModified);
        //打印更新注册表缓存后的变化
        //省略其他代码
    }

HeartbeatThread同样继承了Runnable接口,该任务的作用是向Eureka Server发送心跳请求,维持Eureka Client在注册表中的租约。代码如下所示:

private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            // 调用HTTP发送心跳到Eureka Server中维持租约
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                // 重新注册
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            //续约成功
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

Eureka Server会根据续租提交的appName与instanceInfoId来更新注册表中的服务实例的租约。当注册表中不存在该服务实例时,将返回404状态码,发送心跳请求的Eureka Client在接收到404状态后将会重新发起注册;如果续约成功,将会返回200状态码。

4.2、按需指定注册任务

按需注册定时任务的作用是当Eureka Client中的InstanceInfo或者status发生变化时,重新向Eureka Server发起注册请求,更新注册表中的服务实例信息,保证Eureka Server注册表中服务实例信息有效和可用。按需注册定时任务的代码如下:

            //定时检查服务的实例信息
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
            // 监控应用的status变化,发生变化即可发起重新注册
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
            // 注册应用状态改变监控器
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 启动定时按需注册定时任务
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

5、服务下线

一般情况下,应用服务在关闭的时候,Eureka Client会主动向Eureka Server注销自身在注册表中的信息。DiscoveryClient中对象销毁前执行的清理方法如下所示:

    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }

            cancelScheduledTasks();

            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                unregister();
            }

            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }

            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();

            Monitors.unregisterObject(this);

            logger.info("Completed shut down of DiscoveryClient");
        }
    }

在销毁DiscoveryClient之前,会进行一系列清理工作,包括注销ApplicationInfoManager中的StatusChangeListener、取消定时任务、服务下线和关闭Jersey客户端等。我们主要关注unregister服务下线方法,其实现代码如下所示:

    void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351