Eureka源码采用1.7.2版本
本人小白,此文为本人阅读源码笔记,如果您读到本文,您需要自己甄别是否正确,文中的说明只代表本人理解,不一定是正确的!!!
心跳机制在于证明客户端正常运行,在代码层面在于定时更新过期时间,防止自动故障移除机制导致实例被摘除。
Eureka Client的心跳机制是在客户端初始化时也构建了一个心跳执行的线程池
com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)
//初始化保持心跳的线程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
心跳任务的触发在调度任务初始化时候
com.netflix.discovery.DiscoveryClient#initScheduledTasks
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
//任务线程
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
默认是renewalIntervalInSecs延迟之后再进行心跳发送,默认是30S,并且定时发送的间隔也是30S。这里面核心的是
任务执行线程HeartbeatThread
com.netflix.discovery.DiscoveryClient.HeartbeatThread#run
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//发送心跳
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
这两个方法没什么可说的,就是发送一个Http请求,请求的地址类似这样的:
http://DESKTOP-1S6DCTA:8080/v2/apps/EUREKA/001
参数:status,lastDirtyTimestamp
method:PUT
直接转Server端吧,看看怎么处理心跳请求
com.netflix.eureka.resources.InstanceResource#renewLease
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
//集群节点同步心跳
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
核心方法还是调用了父类的renew(),主要的处理逻辑在那里面
com.netflix.eureka.registry.AbstractInstanceRegistry#renew
//服务续约方法
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
//获取appName获取服务实例
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
//通过ID获取租约
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
//获取租约的服务实例
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
//获取服务实例的状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
//如果不相等则进行覆盖
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
//记录每一分钟实际的心跳数量
renewsLastMin.increment();
//重置续约时间
leaseToRenew.renew();
return true;
}
}
主要流程:
- 依据服务名称和服务ID获取到租约
- 设置租约的覆盖状态
- 当前分钟内心跳次数自增
- 重置续约时间
最核心就是更新服务实例的过期时间,防止自动故障移除机制错误的摘除,服务实例的方法如下:
lastUpdateTimestamp = System.currentTimeMillis() + duration;
这个是当前时间+过期时间(90S),有个BUG,在自动故障移除的时候会看到的
从这个源码中可以看出一个场景啊,当一个服务实例因为网络故障长时间未发送心跳,造成服务实例被摘除(3min)
那么后续网络正常后,再次发送心跳会找不到对应的实例
客户端这块是怎么解决的呢?看看客户端代码:
com.netflix.discovery.DiscoveryClient#renew
//如果当前的服务实例在注册中心已经被移除
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
//重新注册下
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
可以看到客户端在发现服务实例未找到时,直接进行注册了
我这里说下,Eureka的一堆机制里调度任务太多,造成一些时间计算感觉不是很可靠