一、客户端工作流程
1.1、初始化阶段
- 1、读取与server交互的信息,封装成EurekaClientConfig
- 2、读取自身服务信息,封装成EurekaInstanceConfig
- 3、拉取Server注册信息缓存到本地
- 4、服务注册
- 5、发送心跳,刷新缓存,注册定时任务
1.2、启动阶段
- 1、发送心跳维持租约
- 2、定时获取注册表信息更新本地缓存
- 3、监控自身信息,如果有变化重新注册服务
1.3、注销阶段
- 从server端注销服务
二、配置信息类
-
配置类结构如下图
2.1、EurekaDiscoveryClientConfiguration
- 作用:帮助Client维持必要bean的属性读取和配置
2.1.1 读取属性和配置类
- EurekaClientConfig:封装了与server交互的信息
- ApplicationInfoManager:应用信息管理器,管理InstanceInfo,EurekaInstanceConfig
- InstanceInfo:发送到server进行注册的元数据
- EurekaInstanceConfig:自身服务实例的配置信息,用于构建InstanceInfo
- DiscoverClient:用于服务发现的客户端接口
2.2、DiscoverClient
- 说明:是服务端发现的核心接口
- String description():获取实现类的描述
- List getInstances(String serviceId):通过服务id获取服务信息
- List getServices():获取服务id列表
package org.springframework.cloud.client.discovery;
public interface DiscoveryClient {
String description();
List<ServiceInstance> getInstances(String serviceId);
List<String> getServices();
}
2.3、EurekaDiscoveryClient
- 继承了DiscoveryClient,组合EurekaClient实现接口功能
package org.springframework.cloud.netflix.eureka;
public class EurekaDiscoveryClient implements DiscoveryClient {}
三、DiscoverClient类结构
-
源码结构如下图
3.1、DiscoveryClient
- 包名:com.netflix.discovery.DiscoveryClient
- 说明:Client端与Server端交互关键逻辑
3.1.1、功能
- 1、注册服务到Server
- 2、发送心跳更新租约
- 3、服务关闭时从Server中取消租约下线服务
- 4、查询在Server中注册的服务实例列表
package com.netflix.discovery;
@Singleton
public class DiscoveryClient implements EurekaClient {}
3.1.2、类体系
LookupService---> EurekaClient ----> DiscoveryClient
3.2、LookupService
- Application getApplication(String appName):通过服务名称获取实例信息
- Applications getApplications():获取实例列表
- List getInstancesById(String id):通过服务id获取服务信息
package com.netflix.discovery.shared;
public interface LookupService<T> {
Application getApplication(String appName);
Applications getApplications();
List<InstanceInfo> getInstancesById(String id);
InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}
3.3、Application
- Application中对InstanceInfo的操作都是同步
- Applications里面的操作基本也是同步操作
3.3.1、Application所有属性
package com.netflix.discovery.shared;
@Serializer("com.netflix.discovery.converters.EntityBodyConverter")
@XStreamAlias("application")
@JsonRootName("application")
public class Application {
private static Random shuffleRandom = new Random();
private String name;
private volatile boolean isDirty = false;
private final Set<InstanceInfo> instances;
private final AtomicReference<List<InstanceInfo>> shuffledInstances;
private final Map<String, InstanceInfo> instancesMap;
}
3.3.2、Application操作同步
private void removeInstance(InstanceInfo i, boolean markAsDirty) {
instancesMap.remove(i.getId());
synchronized (instances) {
instances.remove(i);
if (markAsDirty) {
isDirty = true;
}
}
}
3.4、EurekaClient
- 特点:继承了LookupService,为DiscoveryClient提供了上层接口,属于比较稳定的接口(扩展层)
3.4.1、作用
- 提供了多种获取InstanceInfo接口
- 提供了本地客户端数据
- 提供了为客户端注册和获取健康检查处理器的能力
3.4.2、核心方法
- 为Eureka Client注册健康检查处理器
- 监听Client服务实例信息的更新
//为Eureka Client注册健康检查处理器
public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
//监听Client服务实例信息的更新
public void registerEventListener(EurekaEventListener eventListener);
3.5、HealthCheckHandler
- 用于检查当前Client的状态,如果Client的姿态发生改变,将会触发新的注册事件
- 该事件属于观察者模式,事件监听器将监听Client的服务实例信息变化,触发对应的处理事件
public interface HealthCheckHandler {
InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);
}
四、、DiscoverClient 源码解读
4.1、DiscoverClient构造方法
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider)
4.1.1、构造方法参数
参数说明
- ApplicationInfoManager:应用信息管理器
- EurekaClientConfig:Client与Server交互配置信息
- AbstractDiscoveryClientOptionalArgs:注入可选参数
- Provider<BackupRegistry>:用于获取注册表信息
4.1.2、构造方法主要操作
- 从Server中拉取注册表信息,服务信息,初始化发送心跳,刷新缓存,按需注册定时任务
- 1、初始化阶段:基础信息初始化,配置信息初始化,线程池初始化
- 2、构建阶段:构建EurekaTransport
- 3、预处理阶段:拉取Server注册表信息,注册前预处理
- 4、注册阶段:向Server中注册,初始化心跳定时任务(线程池2个线程)
- 心跳地址:localhost:8761/eureka/EUREKA-CLIENT/apps/AppName/instanceInfoId
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//1、参数说明
//ApplicationInfoManager:应用信息管理器
//EurekaClientConfig:Client与Server交互信息
//AbstractDiscoveryClientOptionalArgs:可选参数
//Provider<BackupRegistry>:注册中心备份
//2、初始化信息
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
//3、配置信息读取初始化
//shouldFetchRegistry对应配置:eureka.client.fetch-register=true/false(是否从Server中拉取注册表信息)
//shouldRegisterWithEureka对应配置:eureka.client.register-with-eureka=true/false(是否将自身信息注册到Server)
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_",
new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_",
new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
//4、线程池定义
//scheduler:线程池大小为2
//heartbeatExecutor:心跳发送线程
//cacheRefreshExecutor:缓存刷新线程
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//5、构建EurekaTransport
//EurekaTransport是DiscoverClient内部类,封装了Client与Server进行http调用的Jersey客户端
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
//6、拉取注册表信息:先拉取Server注册表信息,并缓存到本地,减少与Server端通讯
//shouldFetchRegistry()=true,并且没有注册过
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
//7、注册服务:注册之前先调用预注册功能
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
//8、开始注册
//开始注册:register()
//初始化定时任务:initScheduledTasks();
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
4.2、DiscoverClient 拉取注册表信息
4.2.1、拉取注册表信息
- 方法:boolean fetchRegistry(boolean forceFullRegistryFetch);
- 1、判断拉取方式全量拉取或增量拉取:增量方式被禁止,或Application为空时用全量拉取(一般为第一次拉取)
- 2、拉取信息
- 3、计算集合一致性哈希码
- 4、更新远程实例数据
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
//1、判断拉取方式
//如果增量方式被禁止,或Application为空,采用全量拉取方式
//getAndUpdateDelta:全量拉取
//getAndUpdateDelta:增量拉取
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
//2、计算集合一致性哈希码
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
//3、更新远程实例数据
// onCacheRefreshed:推送缓存刷新事件
//updateInstanceRemoteStatus:缓存中被刷新数据更新远程实例数据
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
4.2.2、全量拉取信息
- getAndStoreFullRegistry()
- 从Server中拉取的信息封装在Applications,并通过处理替换本地注册缓存Applications
- 全量拉取方法有可能被多个线程调用,产生脏数据,因此提供了增量拉取数据
拉取操作
- 地址:http://localhost:8761/eureka/apps
- 1、获取注册表版本号,防止版本落后
- 2、获取Server注册表信息
- 3、判断拉取信息
private void getAndStoreFullRegistry() throws Throwable {
//1、获取注册表的版本号,防止版本落后(由线程引起)
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
//2、信息获取成功
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
//3、判断信息
//检查fetchRegistryGeneration的版本更新是否有改变,无改变说明是最新数据
//有个数据更新:从app中选出状态为UP的实例,同时打乱实例顺序,防止同一个服务不同的实例在启动时接受流量
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
4.2.3、增量拉取信息
- 地址:http://localhost:8761/eureka/apps/delta
- 1、获取版本号
- 2、获取数据失败:调用全量拉取方式再拉取数据
- 3、获取数据成功:分享本地缓存,计算哈希码,如果哈希码不一致则为脏数据,继续调用全量拉取
private void getAndUpdateDelta(Applications applications) throws Throwable {
//1、获取版本号
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
//2、数据获取
//数据获取失败:调用全量拉取方法拉取数据
//拉取成功:更新本地缓存,计算集合一致性哈希码,如果哈希码不一致认为本次数据为脏数据,继续采用全量拉取信息
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
//更新缓存数据
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
4.2.3、更新缓存数据
4.2.3.1、数据枚举类型
- ADDED:数据添加
- MODIFIED:数据改变
- DELETED:数据删除
public enum ActionType {
ADDED, // Added in the discovery server
MODIFIED, // Changed in the discovery server
DELETED// Deleted from the discovery server
}
4.2.3.1、更新缓存
- 遍历列表数据,将添加和修改的数据添加本地注册表中,将删除类型的数据从本地注册表删除
//缓存数据更新:遍历列表数据,将添加和修改的数据添加本地注册表中,将删除类型的数据从本地注册表删除
private void updateDelta(Applications delta) {
int deltaCount = 0;
for (Application app : delta.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {
remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}
++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
//将添加类型数据直接添加到本地注册表
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
//将修改类型数据直接添加到本地注册表
logger.debug("Modified instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
//将删除类型的数据从本地注册表删除
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
}
}
}
logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
for (Applications applications : remoteRegionVsApps.values()) {
applications.setVersion(delta.getVersion());
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}
4.3、DiscoverClient 服务注册
- 数据封装:InstanceInfo
- 注册成功返回状态码204
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
//注册服务:将数据封装到InstanceInfo
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
4.4、DiscoverClient 初始化定时任务
4.4.1、初始化定时任务方法:initScheduledTasks
- 1、注册缓存定时任务,默认刷新时间30秒
- 2、注册发送心跳定时任务,默认时间30秒
- 3、注册定时器
- 4、添加监听器来监听应用状态改变,并在状态改变时重新注册
- 5、启动定时任务
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
//1、注册表缓存刷新时间:默认30秒
//通过eureka.client.registry-fetch-interval-seconds 设置
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//2、发送心跳定时器:默认30秒发送一次
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
//3、注册定时器
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
//注册应用状态改变监控器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//启动定时任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
4.4.2、刷新缓存定时任务
4.4.2.1、TimedSupervisorTask定时任务
- TimedSupervisorTask继承了TimerTask提供定时任务功能,主要运行在run方法中
任务调度过程:
1、scheduler初始化并延迟执行TimedSupervisorTask
2、TimedSupervisorTask将task提交到executor中执行,task和executor在初始化TimedSupervisorTask时传入
3、task正常执行,TimedSupervisorTask将自己提交到scheduler,延迟delay时间后再次执行
4、task超时执行,计算新的delay时间TimedSupervisorTask将自己提交到scheduler,延迟delay时间后再次执行
public class TimedSupervisorTask extends TimerTask {
private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor executor;
private final long timeoutMillis;
private final Runnable task;
private final AtomicLong delay;
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counters and register.
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
@Override
public void run() {
Future<?> future = null;
try {
//执行任务:submit
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
//等待执行任务结果
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
//执行完成,设置下次任务执行频率(时间间隔)
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
} catch (TimeoutException e) {
//任务超时,设置下次任务执行频率
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
//任务拒绝,统计拒绝任务次数
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
//取消未结束的任务
if (future != null) {
future.cancel(true);
}
//如果定时任务未关闭,定义下一次任务
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
4.4.2.2、缓存线程:CacheRefreshThread
- CacheRefreshThread为发送心跳定时任务线程
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
4.4.2.3、 缓存刷新核心方法:refreshRegistry
- 判断Region是否改变(Server地址),用于决定全量拉取还是增量拉取
- 打印更新注册表缓存后变化
@VisibleForTesting
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
//判断Region是否改变(Server地址),用于决定全量拉取还是增量拉取
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
//打印更新注册表缓存后变化
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes);
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
4.4.3、发送心跳定时任务
4.4.3.1、心跳定时任务线程:HeartbeatThread
- 任务的作用:向Server发送心跳请求,维持Client在注册表中的租约
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
4.4.3.2、心跳发送核心方法:renew()
续约核心参数:appName,InstanceId
1、调用HTTP发送心跳到Server端
2、如果请求状态码为404表示Server中不存在当前实例,线程会重新调用注册方法进行注册
3、如果请求状态码为200表示续约成功
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//调用HTTP发现心跳到Server:主要参数-appName,InstanceId
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
//Server中不存在当前实例为404,线程查重新注册
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
//续约成功返回200
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
4.4.4、按需注册定时任务
- 按需注册分为两种场景
- 一个场景为:定义定时任务,定时刷新服务数据和状态,在数据或状态改变时向Server发起重新注册
- 一个场景为:注册状态监听器,当状态改变时向Server发起重新注册
4.4.4.1、定义定时任务
- 作用:当Client中的InstanceInfo或status发生变化时重新向Server发起注册,更新实例信息表保证Server中的信息可用
private void initScheduledTasks() {
... ...
//检查InstanceInfo数据是否变化,有变化重新注册
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
//监听应用状态改变,状态改变后重新发起注册
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
4.4.4.2、线程重新注册
- 按需注册2、注册状态改变的监听器,当状态改变后重新注册应用
class InstanceInfoReplicator implements Runnable {
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
this.discoveryClient = discoveryClient;
this.instanceInfo = instanceInfo;
//创建定时任务
this.scheduler = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
.setDaemon(true)
.build());
this.scheduledPeriodicRef = new AtomicReference<Future>();
this.started = new AtomicBoolean(false);
this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;
this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
}
public void run() {
try {
//刷新InstanceInfo中服务实例信息
discoveryClient.refreshInstanceInfo();
//如果数据发生改变,返回数据更新时间
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//注册实例信息,重新更新状态
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
//执行下一个延时任务
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
4.4.4.3、Client信息更新及状态检查
- 更新服务下线,租约下线
- 检查服务状态呢变化
void refreshInstanceInfo() {
//更新新服务信息
applicationInfoManager.refreshDataCenterInfoIfRequired();
//更新租约信息
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
//调用getHealthCheckHandler检查服务实例变化
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
4.5、DiscoverClient 服务下线
- 主要操作:注销监听器,取消定时任务,服务下线,关闭Jersy客户端,关闭相关Monitor(阈值数据)
- 在线程同步的方法中执行
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
//注销监听器
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
//取消定时任务
cancelScheduledTasks();
//服务下线
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
//关闭Jersy客户端
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
//关闭相关Monitor
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}