序
本文主要研究下spring cloud eureka的instanceEnabledOnit属性
EurekaInstanceConfigBean
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaInstanceConfigBean.java
@ConfigurationProperties("eureka.instance")
public class EurekaInstanceConfigBean implements CloudEurekaInstanceConfig, EnvironmentAware {
//......
/**
* Indicates whether the instance should be enabled for taking traffic as soon as it
* is registered with eureka. Sometimes the application might need to do some
* pre-processing before it is ready to take traffic.
*/
private boolean instanceEnabledOnit;
//......
}
这个属性用来决定应用服务是否一注册上就可以开始接收请求
配置定义如下
spring-cloud-netflix-eureka-client-2.0.0.RC1.jar!/META-INF/spring-configuration-metadata.json
{
"sourceType": "org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean",
"defaultValue": false,
"name": "eureka.instance.instance-enabled-onit",
"description": "Indicates whether the instance should be enabled for taking traffic as soon as it\n is registered with eureka. Sometimes the application might need to do some\n pre-processing before it is ready to take traffic.",
"type": "java.lang.Boolean"
}
PropertiesInstanceConfig
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/PropertiesInstanceConfig.java
static final String TRAFFIC_ENABLED_ON_INIT_KEY = "traffic.enabled";
@Override
public boolean isInstanceEnabledOnit() {
return configInstance.getBooleanProperty(namespace + TRAFFIC_ENABLED_ON_INIT_KEY,
super.isInstanceEnabledOnit()).get();
}
这里走的是super.isInstanceEnabledOnit()
变更为STARTING
InstanceInfoFactory
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/InstanceInfoFactory.java
public class InstanceInfoFactory {
private static final Log log = LogFactory.getLog(InstanceInfoFactory.class);
public InstanceInfo create(EurekaInstanceConfig config) {
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
// Builder the instance information to be registered with eureka
// server
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
String namespace = config.getNamespace();
if (!namespace.endsWith(".")) {
namespace = namespace + ".";
}
builder.setNamespace(namespace).setAppName(config.getAppname())
.setInstanceId(config.getInstanceId())
.setAppGroupName(config.getAppGroupName())
.setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false))
.setPort(config.getNonSecurePort())
.enablePort(InstanceInfo.PortType.UNSECURE,
config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(config.getVirtualHostName())
.setSecureVIPAddress(config.getSecureVirtualHostName())
.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
.setStatusPageUrl(config.getStatusPageUrlPath(),
config.getStatusPageUrl())
.setHealthCheckUrls(config.getHealthCheckUrlPath(),
config.getHealthCheckUrl(), config.getSecureHealthCheckUrl())
.setASGName(config.getASGName());
// Start off with the STARTING state to avoid traffic
if (!config.isInstanceEnabledOnit()) {
InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: " + initialStatus);
}
builder.setStatus(initialStatus);
}
else {
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: "
+ InstanceInfo.InstanceStatus.UP
+ ". This may be too early for the instance to advertise itself as available. "
+ "You would instead want to control this via a healthcheck handler.");
}
}
// Add any user-specific metadata information
for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
String key = mapEntry.getKey();
String value = mapEntry.getValue();
// only add the metadata if the value is present
if (value != null && !value.isEmpty()) {
builder.add(key, value);
}
}
InstanceInfo instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
return instanceInfo;
}
}
可以看到这里判断isInstanceEnabledOnit,如果不是,则InstanceStatus初始状态为InstanceInfo.InstanceStatus.STARTING;而InstanceInfo创建的时候,默认初始化status为InstanceStatus.UP,所以如果为true,这里就log一下
EurekaRegistration
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaRegistration.java
public EurekaRegistration build() {
Assert.notNull(instanceConfig, "instanceConfig may not be null");
if (this.applicationInfoManager == null) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(this.instanceConfig);
this.applicationInfoManager = new ApplicationInfoManager(this.instanceConfig, instanceInfo);
}
if (this.eurekaClient == null) {
Assert.notNull(this.clientConfig, "if eurekaClient is null, EurekaClientConfig may not be null");
Assert.notNull(this.publisher, "if eurekaClient is null, ApplicationEventPublisher may not be null");
this.eurekaClient = new CloudEurekaClient(this.applicationInfoManager, this.clientConfig, this.publisher);
}
return new EurekaRegistration(instanceConfig, eurekaClient, applicationInfoManager, healthCheckHandler);
}
EurekaRegistration创建的时候,使用了new InstanceInfoFactory().create(this.instanceConfig)来创建InstanceInfo
DiscoveryClient.initScheduledTasks
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
该client构造器初始化initScheduledTasks,这里头调用了instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()),默认是延迟40秒执行
InstanceInfoReplicator
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/InstanceInfoReplicator.java
class InstanceInfoReplicator implements Runnable {
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
}
这里最开始设定为dirty,然后第一次执行的时候,就会触发register,远程调用注册。
DiscoveryClient有个statusChangeListener,它调用的是这里的onDemandUpdate方法,而onDemandUpdate方法在不超过频率限制的时候,执行的是这个run方法。
变更为UP
EurekaAutoServiceRegistration
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaAutoServiceRegistration.java
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
//......
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
this.serviceRegistry.register(this.registration);
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
}
这里自动注册,serviceRegistry.register(this.registration)
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getInstanceConfig().getAppname()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->
reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
这里会变更状态为reg.getInstanceConfig().getInitialStatus(),该值默认为UP
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaInstanceConfigBean.java
/**
* Initial status to register with rmeote Eureka server.
*/
private InstanceStatus initialStatus = InstanceStatus.UP;
小结
应用启动时,可以根据eureka.instance.instance-enabled-onit配置设定(默认为false
),来配置初始注册到eureka server的时候,其status是UP,还是STARTING。默认初始化的时候是STARTING,之后自动注册的时候,变更为UP,表示可以开始接收请求。
- DiscoveryClient初始化了一堆定时任务,其中包括了InstanceInfoReplicator的run方法,它默认是延时40秒执行,它在start的时候先设置为instanceInfo.setIsDirty(),之后第一次执行就可以触发discoveryClient.register()。
- 而EurekaAutoServiceRegistration在启动的时候,会调用serviceRegistry.register(this.registration),之后变更状态,发布StatusChangeEvent,DiscoveryClient有个statusChangeListener,它调用的是InstanceInfoReplicator的onDemandUpdate方法,而onDemandUpdate方法在不超过频率限制的时候,执行的是InstanceInfoReplicator的run方法。
可以发现不管是InstanceInfoReplicator的延时任务,还是urekaAutoServiceRegistration变更StatusChangeEvent,最后都是由InstanceInfoReplicator的run方法去触发discoveryClient.register(),去远程eureka server注册。