先明了Eureka做了哪些事情:
- 服务注册
- 自我保护
- 心跳检测
- 状态同步
- ...
从这几件事出发 看看Eureka如何实现的。
服务注册<客户端注册>
服务注册是由Eureka Client向Eureka Server上报自己的服务信息,是在服务启动的时候就做了。那就从客户端启动入手。
但重点是如何找 哪个方法是做服务注册的呢?说实话 很迷茫 ,于是从网上找到Spring Cloud的一些规范,
ServiceRegistry
接口是Spring Cloud定义的服务注册的约定。那就从这个接口的实现类着手。也就是EurekaServiceRegistry.register
,那这个方法是哪里调用的呢?最终找到了org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration#start
。这个方法其实是IOC容器中Bean的生命周期函数。所以,有两个入手点,一是客户端的注解
EnableEurekaClient
或者AutoConfiguration
;一是EurekaAutoServiceRegistration
。声明一下:在客户端使用时 不论
EnableEurekaClient
还是EnableDiscoveryClient
都是可选项。且EnableEurekaClient
的注释明确说了 注解为可选项。因此,第一个入手点仅剩下AutoConfiguration
。
EurekaClientAutoConfiguration
EurekaClientAutoConfiguration
本身就是个配置类,里面全是Bean的定义,下面截取的代码片段 都是根据上面的分析而来,将相关Bean的初始化入口列出来。
// Spring Cloud的规范实现类
// 约定使用Service Registry注册和注销实例。
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
return new EurekaServiceRegistry();
}
// 服务自动注册
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
ApplicationContext context, EurekaServiceRegistry registry,
EurekaRegistration registration) {
return new EurekaAutoServiceRegistration(context, registry, registration);
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
// If we use the proxy of the ApplicationInfoManager we could run into a
// problem
// when shutdown is called on the CloudEurekaClient where the
// ApplicationInfoManager bean is
// requested but wont be allowed because we are shutting down. To avoid this
// we use the
// object directly.
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
} else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
CloudEurekaInstanceConfig instanceConfig,
ApplicationInfoManager applicationInfoManager,
@Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
.with(eurekaClient).with(healthCheckHandler).build();
}
EurekaAutoServiceRegistration
在
AutoConfiguration
中找到了实例化此类的地方,说明这个类必然是IoC容器中的一个Bean。同时,关注一下此类的接口实现,重点关注一下SmartLifecycle
。点一下AutoServiceRegistration
,这个其实也算是Spring Cloud提供的一个自动注册的点。public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener { .... }
这就表明,在IOC容器启动时 会回调里面的生命周期方法,例如
start
、stop
等
// 回调时触发
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
// serviceRegistry = org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this,
this.registration.getInstanceConfig()));
this.running.set(true);
}
}
EurekaServiceRegistry#register
这里虽然是真正的注册方法,但是Spring还是进行了一些其他的设计,是通过发布事件这种方式来表示状态变更的。
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application "
+ reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
// 这里其实是发布了状态变更的事件
reg.getApplicationInfoManager() // =com.netflix.appinfo.ApplicationInfoManager
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
这里穿插一下com.netflix.appinfo.ApplicationInfoManager.setInstanceStatus
// 设置此实例的状态。应用程序可以使用它来指示是否准备接收通信流。在这里设置状态还会将状态更改事件通知所有注册的侦听器。
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {return;}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
// listeners是从哪里添加的内容呢 看方法-registerStatusChangeListener
for (StatusChangeListener listener : listeners.values()) {
try {
// 状态的变更 意味着 有服务进行注册或者更新
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
// 点击alt+F7 找到调用这个方法的入口:com.netflix.discovery.DiscoveryClient#initScheduledTasks
public void registerStatusChangeListener(StatusChangeListener listener) {
listeners.put(listener.getId(), listener);
}
DiscoveryClient#initScheduledTasks
在
DiscoveryClient
的构造函数里面调了initScheduledTasks
,这里就可以找一下哪里对DiscoveryClient
进行了初始化,Spring Cloud这里做了一个巧妙的设计,它定义了一个类CloudEurekaClient
继承了DiscoveryClient
,而在EurekaClientAutoConfiguration
里面 初始化了CloudEurekaClient
。
public class CloudEurekaClient extends DiscoveryClient {
....
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher) {
// 这里调用了DiscoveryClient的构造
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
....
}
public class DiscoveryClient implements EurekaClient {
....
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider,
EndpointRandomizer endpointRandomizer) {
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch 就在这里了
initScheduledTasks();
}
....
}
梳理前面的类的关系
到这里就全通了。
首先在AutoConfiguration中实例化了CloudEurekaClient
,从而调用了DiscoveryClient#initScheduledTasks
,初始化了StatusChangeListener
,同时将其添加到了com.netflix.appinfo.ApplicationInfoManager#listeners
里面。
之后IoC容器初始化完成后回调所有SmartLifecycle
的实现类的start()
方法,进而到org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry#register
,然后就是com.netflix.appinfo.ApplicationInfoManager#setInstanceStatus
,之后com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener#notify
StatusChangeListener
事件的监听 一个接口 只有一个匿名内部类
以下代码片段来自DiscoveryClient#initScheduledTasks
,是事件监听的一个匿名内部类。
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
logger.error("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
// 真正将实例信息更新到注册中心
instanceInfoReplicator.onDemandUpdate();
}
};
com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate
这个方法其实就是调用了com.netflix.discovery.InstanceInfoReplicator#run
com.netflix.discovery.InstanceInfoReplicator#run
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register
从这里面 只能看出来是通过Post方式请求了
http://{eureka.ip:port}/eureka/apps/{appName}
地址。这样只需要找一下服务端是如何暴漏相关的接口 并如何处理注册信息的即可。
String urlPath = "apps/" + info.getAppName();
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
服务注册<服务端接收>
Eureka使用的
jersey
框架,所以说 很难像对待Spring Web MVC
的方式找暴漏的接口服务,所以从网上找了一下对应的方法。最后即使找到了方式 也没搞明白是咋样映射到url的。
ApplicationResource#addInstance
从网上找到的,接收注册请求的方式是
com.netflix.eureka.resources.ApplicationResource#addInstance
,方法前面都是校验参数,真正执行注册操作的是registry.register(info, "true".equals(isReplication));
,其中registry=org.springframework.cloud.netflix.eureka.server.InstanceRegistry
看一下关于InstanceRegistry
的类图
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register
public void register(final InstanceInfo info, final boolean isReplication) {
// 等于 publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
super.register(info, isReplication);
}
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
public void register(final InstanceInfo info, final boolean isReplication) {
// 心跳的超时时间 默认90s
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 如果自定义配置了超时时间 这里重新赋值一下
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// com.netflix.eureka.registry.AbstractInstanceRegistry#register
super.register(info, leaseDuration, isReplication);
// 高可用情况下 节点间的同步复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
com.netflix.eureka.registry.AbstractInstanceRegistry#register
存储节点信息的容器:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
外层 KEY = spring.application.name
内层 KEY = instanceID
内层 VALUE = 实例信息
代码很长 把logger相关的内容删掉了
入参 registrant 其实是远程调用接口时传入的 而 registry.get(registrant.getAppName()).get(registrant.getId())
则是本地的副本
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock();
try {
// registry = ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
// 根据spring.application.name获取服务的列表<集群情况下才会是列表>
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 这里代表第一次来注册 实例化一个
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
// 说明上一步成功进行了初始化 这里大概是考虑其他节点插入了同样的服务并同步了过来这种情况
if (gMap == null) {
gMap = gNewMap;
}
}
// 本地副本
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
// 如果不为空 说明服务器本地存在副本 这里需要保留下本地脏时间戳 而且不覆盖它
if (existingLease != null && (existingLease.getHolder() != null)) {
// registry存在的这个脏时间戳
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
// 远程传输过来 的时间戳
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
// 只有大于的情况下 使用服务器本地副本
// 在小于等于时 使用远程传输过来的
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// 这个好像是记录的客户端的数量 提供给心跳续约那里用的
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
// 更新一下 每分钟心跳续约次数阈值
// TODO 这个值啥用
updateRenewsPerMinThreshold();
}
}
}
// 重新构造一个
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
// 如果本地副本不为空 那就将重新构造出来的更新 服务启动的时间
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 这一步就已经将服务信息加入到了注册中心了
gMap.put(registrant.getId(), lease);
// 这一步没啥用 recentRegisteredQueue好像是提供给Eureka的控制台用的
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// 下面开始处理状态的问题 不知道是干啥的
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
// 这一步等于 serviceUpTimestamp = System.currentTimeMillis();
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
// 这个应该是被 readWriteCacheMap 用的
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 更新缓存 也是更新 readWriteCacheMap 里面的内容
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
} finally {
read.unlock();
}
}
服务信息缓存
未能保证多级缓存的一致性
com.netflix.eureka.registry.ResponseCacheImpl
public class ResponseCacheImpl implements ResponseCache {
...
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key,Value>();
// guava的缓存
private final LoadingCache<Key, Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs,
AbstractInstanceRegistry registry) {
....
this.readWriteCacheMap= CacheBuilder.newBuilder()
.initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
// 默认180s
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(),
TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
// readWriteCacheMap.invalidate的时候 回调这个方法
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
}).build(new CacheLoader<Key, Value>() {
@Override
// readWriteCacheMap.get的时候 回调这个方法
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
// 定时任务 更新只读缓存 默认30s
// responseCacheUpdateIntervalMs = eureka.server.responseCacheUpdateIntervalMs
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(
(
(System.currentTimeMillis() / responseCacheUpdateIntervalMs)
* responseCacheUpdateIntervalMs
) + responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
....
}
心跳续约<客户端发送>
入口
心跳只可能在注册之后开始,所以在看注册相关的代码的时候可以留意一下有关
heartbeat
相关的内容。这里就提两个点
- DiscoveryClient#DiscoveryClient
- DiscoveryClient#initScheduledTasks
这两个 都有和心跳相关的线程池 或者 定时任务,那就从这里着手
DiscoveryClient#DiscoveryClient
构造方法种创建了一个线程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);
DiscoveryClient#initScheduledTasks
这里构建了一个Task 然后最终走到了com.netflix.discovery.TimedSupervisorTask#run
,这个方法其实就是将HeartbeatThread
作为一个task 交由 heartbeatExecutor里面执行
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
// 线程调度
scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);
有个小疑问:
为啥不直接heartbeatExecutor.submit(new HeartbeatThread())
,为啥要费劲弄一个TimedSupervisorTask
类 然后使用 ScheduledExecutorService.schedule
呢?
其实看一眼逻辑 无非就是处理了一些异常情况
DiscoveryClient.HeartbeatThread
其实最终还是起一个HeartbeatThread
的线程
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
DiscoveryClient#renew
这里无非还是一个远程调用而已
EurekaHttpResponse<InstanceInfo> httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#sendHeartBeat
这里只传入了状态和脏时间戳
String urlPath = "apps/" + appName + '/' + id;
WebResource webResource = jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
ClientResponse response = requestBuilder.put(ClientResponse.class);
心跳续约<服务端接收>
InstanceResource#renewLease
服务端接收请求的地方 核心代码就是 registry.renew(app.getName(), id, isFromReplicaNode);
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#renew
public boolean renew(final String appName, final String serverId, boolean isReplication) {
List<Application> applications = getSortedApplications();
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
// 发布事件
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
}
}
// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
return super.renew(appName, serverId, isReplication);
}
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
public boolean renew(final String appName, final String id, final boolean isReplication) {
//
if (super.renew(appName, id, isReplication)) {
// 同步其他节点
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
com.netflix.eureka.registry.AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// lease doesn't exist
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
// 感觉这个方法比较重要
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
// 计数器递增
renewsLastMin.increment();
// 更新 最后更新时间戳
leaseToRenew.renew();
return true;
}
}
自我保护机制
自我保护机制的触发条件:在15分钟内超过85%的客户端都没有正常的心跳,那么Eureka就认为客户端与注册中心之间出现了网络故障,Eureka Server自动进入自我保护机制。
进入自我保护机制后,Eureka Server会做以下的事情:
- Eureka Server不再剔除注册表中失去心跳连接的服务
- Eureka Server可以继续接受新服务的注册和查询请求,但不会将信息同步到其他节点中,保证当前节点依然可用
- 当网络稳定后,当前Eureka Server的新注册信息才会被同步到其他节点中
EurekaServerBootstrap
这个类以及其中方法的由来:
- 在
EurekaServerAutoConfiguration
中进行了实例化 注入了IoC容器EurekaServerBootstrap#contextInitialized
这个方法是用来做自我保护机制的contextInitialized
被EurekaServerInitializerConfiguration#start
调用
调用链:org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized
org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#initEurekaServerContext
com.netflix.eureka.registry.InstanceRegistry#openForTraffic
InstanceRegistry#openForTraffic
单纯调用了父类的方法 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
// 更新每分钟的续约阈值
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 重点在这里
super.postInit();
}
com.netflix.eureka.registry.AbstractInstanceRegistry#updateRenewsPerMinThreshold
expectedNumberOfClientsSendingRenews 这个值是预期的 有几个客户端就是几个
serverConfig.getExpectedClientRenewalIntervalSeconds() 默认等于 30
serverConfig.getRenewalPercentThreshold() 默认等于 0.85
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
AbstractInstanceRegistry#postInit
protected void postInit() {
// 开启一个定时任务 统计 每分钟 心跳续约的次数
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
// 定时器的定时任务提交 每60s执行一次 serverConfig.getEvictionIntervalTimerInMs()=60*1000
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
EvictionTask里面的run方法 最终调用了com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)
AbstractInstanceRegistry#evict
public void evict(long additionalLeaseMs) {
// 判断是否需要开启自我保护
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// 先收集所有的过期项,以随机的顺序驱逐他们。
// 对于大规模的驱逐项集合,如果我们不随机驱逐,我们可能会在自我保护开始之前把整个应用程序抹去。
// 通过随机化,影响应该均匀分布在所有应用程序中。
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// 判断是不是 超过85%的客户端都没有正常的心跳
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = registrySize * serverConfig.getRenewalPercentThreshold();
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 下线操作
internalCancel(appName, id, false);
}
}
}
客户端拉取服务节点信息
客户端的配置
eureka.client.fetch-registry
在默认的情况下是true,这个配置表示是否从注册中心拉取服务信息。在构造方法DiscoveryClient#DiscoveryClient里面有一段判断这个配置的内容:if (clientConfig.shouldFetchRegistry()) { try { // 这里就是拉取的入口 boolean primaryFetchRegistryResult = fetchRegistry(false); .... } catch (Throwable th) { throw new IllegalStateException(th); } }
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 如果增量被禁用 或者是 第一次来 那就执行这个
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
// 全量拉取 调用接口 apps/
getAndStoreFullRegistry();
} else {
// 增量拉取 调用接口 apps/delta
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
客户端保存服务节点信息
在上一步 已经将服务信息通过远程通信的方式请求而来,如何保存 其实就是如何处理响应数据的问题。
这部分的代码不想看了。
节点间数据同步
在上面的章节 注册,有一点说了以下节点间的同步
PeerAwareInstanceRegistryImpl#replicateToPeers
与Ribbon的整合
Ribbon获取服务列表的地方是
com.netflix.loadbalancer.DynamicServerListLoadBalancer#updateListOfServers
,在这个方法里面有一个ServerList.getUpdatedListOfServers()
,如果是从配置文件的话,那么子类是ConfigurationBasedServerList
,现在换成了Eureka,那子类是org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList
。
DomainExtractingServerList#getUpdatedListOfServers
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
// this.list = com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList
List<DiscoveryEnabledServer> servers = setZones(
this.list.getUpdatedListOfServers());
return servers;
}
DiscoveryEnabledNIWSServerList#getUpdatedListOfServers
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
Eureka Server小记
服务端的搭建过程很简单,除了引入依赖与yml文件的配置之外,就是一个Spring Boot的项目,在启动类上标注了
@EnableEurekaServer
,就完成了。因此入手点从这个注解开始。
@EnableEurekaServer
这个注解设计的极为巧妙,通过注释也可以看出来,他的作用就是激活这个Eureka Server的配置的;但是他是怎么激活的呢?重点就在
@Import(EurekaServerMarkerConfiguration.class)
导入的这个配置也极其简单,声明了一个Marker的Bean。这里就很奇怪了,凭什么一个简单的Marker就能激活配置呢?在Spring自己实现的类SPI的机制下,一个类配置在了
spring.factories
中时,Spring 启动时就可以扫描到这里面配置的类,也就是EurekaServerAutoConfiguration
。但是EurekaServerAutoConfiguration
上的条件就是当前环境中必须有Marker这个Bean。这样@EnableEurekaServer
就可以实现他注释所说的了。为什么会这样设计呢?个人理解,可能就是为了避免错误的引入依赖而导致项目加载很多不必要的类吧。
/**
* Annotation to activate Eureka Server related configuration.
* {@link EurekaServerAutoConfiguration}
* 用以 <激活 Eureka Server 相关的配置> 的注解
*/
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
/**
* Responsible for adding in a marker bean to activate
* {@link EurekaServerAutoConfiguration}.
* 负责添加一个标记Bean 来激活 配置类EurekaServerAutoConfiguration
*/
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {}
}
// 重点就是这个条件
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
}
控制台入口 EurekaController
这个是Spring提供的控制台接口,按照Spring Web MVC的方式定义的。还是比较容易看懂。
http://localhost:8761
相关接口