Eureka源码剖析之一:初始化-启动

Eureka启动的过程有client端和server端, Eureka client端入口是DiscoveryClient类, Eureka server端入口是EurekaBootStrap类, 接下来我们就从源码看下它们做了什么吧!

〓Eureka Client端启动

1)看下DiscoveryClient类图: 

由此看出DiscoveryClient实现了EurekaClient、LookupService接口,并且定义了内部类:DiscoverClientOptionalArgs,可选参数类,源码里实现为空,是默认实现,具体的需要去查看AbstractDiscoveryClientOptionalArgs这个抽象类;EurekaTransport类,封装了Client请求的类;CacheFreshThread,刷新缓存线程,提供定时拉取服务列表等;HeartbeatThread,心跳线程,提供定时向服务端续约服务等。

// DiscoveryClient类是一个单例,实现了EurekaClient接口

@Singleton

publicclassDiscoveryClientimplementsEurekaClient{

// DiscoveryClient类的构造函数

@Inject// 构造器注入

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,

Provider backupRegistryProvider) {

if(args !=null) {

this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;

this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;

this.eventListeners.addAll(args.getEventListeners());

this.preRegistrationHandler = args.preRegistrationHandler;

}else{

this.healthCheckCallbackProvider =null;

this.healthCheckHandlerProvider =null;

this.preRegistrationHandler =null;

}

this.applicationInfoManager = applicationInfoManager;

InstanceInfo myInfo = applicationInfoManager.getInfo();

clientConfig = config;

staticClientConfig = clientConfig;

transportConfig = config.getTransportConfig();

instanceInfo = myInfo;

if(myInfo !=null) {

appPathIdentifier = instanceInfo.getAppName() +"/"+ instanceInfo.getId();

}else{

logger.warn("Setting instanceInfo to a passed in null value");

}

this.backupRegistryProvider = backupRegistryProvider;

this.urlRandomizer =newEndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);

localRegionApps.set(newApplications());

// 拉取服务计数器:单调地增加生成计数器,以确保陈旧的线程不会将注册表重置为旧版本。

fetchRegistryGeneration =newAtomicLong(0);

remoteRegionsToFetch =newAtomicReference(clientConfig.fetchRegistryForRemoteRegions());

remoteRegionsRef =newAtomicReference<>(remoteRegionsToFetch.get() ==null?null: remoteRegionsToFetch.get().split(","));

// 过时注册统计

if(config.shouldFetchRegistry()) {

this.registryStalenessMonitor =newThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX +"lastUpdateSec_",newlong[]{15L,30L,60L,120L,240L,480L});

}else{

this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;

}

// 过时心跳统计

if(config.shouldRegisterWithEureka()) {

this.heartbeatStalenessMonitor =newThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX +"lastHeartbeatSec_",newlong[]{15L,30L,60L,120L,240L,480L});

}else{

this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;

}

logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

// 既不需要注册到Eureka也不拉取注册服务

if(!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {

logger.info("Client configured to neither register nor query for data.");

scheduler =null;

heartbeatExecutor =null;

cacheRefreshExecutor =null;

eurekaTransport =null;

instanceRegionChecker =newInstanceRegionChecker(newPropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()

// to work with DI'd DiscoveryClient

DiscoveryManager.getInstance().setDiscoveryClient(this);

DiscoveryManager.getInstance().setEurekaClientConfig(config);

initTimestampMs = System.currentTimeMillis();

logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",

initTimestampMs,this.getApplications().size());

return;// no need to setup up an network tasks and we are done

}

try{

// 定义了2个线程大小的定时线程池:一个是刷新缓存CacheFreshThread,一个是心跳线程HeartbeatThread

scheduler = Executors.newScheduledThreadPool(2,

newThreadFactoryBuilder()

.setNameFormat("DiscoveryClient-%d")

.setDaemon(true)

.build());

// 心跳线程池

heartbeatExecutor =newThreadPoolExecutor(

1, clientConfig.getHeartbeatExecutorThreadPoolSize(),0, TimeUnit.SECONDS,

newSynchronousQueue(),

newThreadFactoryBuilder()

.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")

.setDaemon(true)

.build()

);// use direct handoff

// 刷新缓存线程池

cacheRefreshExecutor =newThreadPoolExecutor(

1, clientConfig.getCacheRefreshExecutorThreadPoolSize(),0, TimeUnit.SECONDS,

newSynchronousQueue(),

newThreadFactoryBuilder()

.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")

.setDaemon(true)

.build()

);// use direct handoff

// 实例化EurekaTransport

eurekaTransport =newEurekaTransport();

scheduleServerEndpointTask(eurekaTransport, args);

AzToRegionMapper azToRegionMapper;

if(clientConfig.shouldUseDnsForFetchingServiceUrls()) {

azToRegionMapper =newDNSBasedAzToRegionMapper(clientConfig);

}else{

azToRegionMapper =newPropertyBasedAzToRegionMapper(clientConfig);

}

if(null!= remoteRegionsToFetch.get()) {

azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));

}

instanceRegionChecker =newInstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());

}catch(Throwable e) {

thrownewRuntimeException("Failed to initialize DiscoveryClient!", e);

}

// 在启动时需要拉取注册服务列表,增量拉取之后如果失败就会从备份里面再次拉取

if(clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {

fetchRegistryFromBackup();

}

// call and execute the pre registration handler before all background tasks (inc registration) is started

if(this.preRegistrationHandler !=null) {

this.preRegistrationHandler.beforeRegistration();

}

// 初始化定时任务

initScheduledTasks();

try{

// 监控DiscoverClient

Monitors.registerObject(this);

}catch(Throwable e) {

logger.warn("Cannot register timers", e);

}

// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()

// to work with DI'd DiscoveryClient

DiscoveryManager.getInstance().setDiscoveryClient(this);

DiscoveryManager.getInstance().setEurekaClientConfig(config);

initTimestampMs = System.currentTimeMillis();

logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",

initTimestampMs,this.getApplications().size());

}

/**

* Initializes all scheduled tasks.

*/

privatevoidinitScheduledTasks(){

// 需要拉取注册服务,则定时拉取刷新缓存CacheRefreshThread,可以看#Eureka服务拉取 一篇

if(clientConfig.shouldFetchRegistry()) {

// registry cache refresh timer

// 默认30秒,定时拉取服务

intregistryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();

intexpBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();

scheduler.schedule(

newTimedSupervisorTask(

"cacheRefresh",

scheduler,

cacheRefreshExecutor,

registryFetchIntervalSeconds,

TimeUnit.SECONDS,

expBackOffBound,

newCacheRefreshThread()

),

registryFetchIntervalSeconds, TimeUnit.SECONDS);

}

// 需要注册到Eureka,则定时心跳请求服务端保持客户端存活,即服务续约。可以看#Eureka服务续约 一篇

if(clientConfig.shouldRegisterWithEureka()) {

// 默认30秒,定时进行服务续约

intrenewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();

intexpBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();

logger.info("Starting heartbeat executor: "+"renew interval is: "+ renewalIntervalInSecs);

// Heartbeat timer

scheduler.schedule(

newTimedSupervisorTask(

"heartbeat",

scheduler,

heartbeatExecutor,

renewalIntervalInSecs,

TimeUnit.SECONDS,

expBackOffBound,

newHeartbeatThread()

),

renewalIntervalInSecs, TimeUnit.SECONDS);

// InstanceInfo replicator 当前实例节点复制器实例化

instanceInfoReplicator =newInstanceInfoReplicator(

this,

instanceInfo,

// 实例信息复制间隔,默认30秒

clientConfig.getInstanceInfoReplicationIntervalSeconds(),

2);// burstSize

// 状态变化监听器

statusChangeListener =newApplicationInfoManager.StatusChangeListener() {

@Override

publicStringgetId()

{

return"statusChangeListener";

}

@Override

publicvoidnotify(StatusChangeEvent statusChangeEvent)

{

if(InstanceStatus.DOWN == statusChangeEvent.getStatus() ||

InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {

// log at warn level if DOWN was involved

logger.warn("Saw local status change event {}", statusChangeEvent);

}else{

logger.info("Saw local status change event {}", statusChangeEvent);

}

instanceInfoReplicator.onDemandUpdate();

}

};

if(clientConfig.shouldOnDemandUpdateStatusChange()) {

applicationInfoManager.registerStatusChangeListener(statusChangeListener);

}

// 实例节点复制器启动

instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

}else{

logger.info("Not registering with Eureka server per configuration");

}

}

}

classInstanceInfoReplicatorimplementsRunnable{

// 启动,延迟40秒启动

publicvoidstart(intinitialDelayMs){

// CAS保证启动一次

if(started.compareAndSet(false,true)) {

instanceInfo.setIsDirty();// for initial register

// 定时任务定时调用实例信息复制器(线程),逻辑看run方法

Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);

scheduledPeriodicRef.set(next);

}

}

publicvoidrun(){

try{

// 刷新实例信息

discoveryClient.refreshInstanceInfo();

// 获取实例信息的脏时间戳,如果存在则进行服务注册,服务注册可以看 #Eureka服务注册 一篇

Long dirtyTimestamp = instanceInfo.isDirtyWithTime();

if(dirtyTimestamp !=null) {

//Client进行注册操作

discoveryClient.register();

instanceInfo.unsetIsDirty(dirtyTimestamp);

}

}catch(Throwable t) {

logger.warn("There was a problem with the instance info replicator", t);

}finally{

// 定时每30秒进行刷新或注册请求

Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);

scheduledPeriodicRef.set(next);

}

}

}

Eureka Client启动时会开启三个定时任务

①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。

〓Eureka Server端启动

EurekaBootStrap是server端启动入口,

PeerAwareInstanceRegistryImpl是真正的核心类,我们看下其类图:

// 继承Servlet上下文监听器,说明Eureka Server是基于Servlet

publicclassEurekaBootStrapimplementsServletContextListener{

...

@Override

publicvoidcontextInitialized(ServletContextEvent event){

try{

// 初始化环境

initEurekaEnvironment();

// 初始化Euraka Server Context

initEurekaServerContext();

ServletContext sc = event.getServletContext();

sc.setAttribute(EurekaServerContext.class.getName(), serverContext);

}catch(Throwable e) {

logger.error("Cannot bootstrap eureka server :", e);

thrownewRuntimeException("Cannot bootstrap eureka server :", e);

}

}

/**

* init hook for server context. Override for custom logic.

*/

protectedvoidinitEurekaServerContext()throwsException{

// Eureka Server读取配置

EurekaServerConfig eurekaServerConfig =newDefaultEurekaServerConfig();

// For backward compatibility

JsonXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

XmlXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

logger.info("Initializing the eureka client...");

logger.info(eurekaServerConfig.getJsonCodecName());

// Server默认加载编码JSON、XML

ServerCodecs serverCodecs =newDefaultServerCodecs(eurekaServerConfig);

ApplicationInfoManager applicationInfoManager =null;

// EurekaClient初始化

if(eurekaClient ==null) {

EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())

?newCloudInstanceConfig()

:newMyDataCenterInstanceConfig();

applicationInfoManager =newApplicationInfoManager(

instanceConfig,newEurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

EurekaClientConfig eurekaClientConfig =newDefaultEurekaClientConfig();

eurekaClient =newDiscoveryClient(applicationInfoManager, eurekaClientConfig);

}else{

applicationInfoManager = eurekaClient.getApplicationInfoManager();

}

// 如果是使用AWS平台,这里不涉及

PeerAwareInstanceRegistry registry;

if(isAws(applicationInfoManager.getInfo())) {

registry =newAwsInstanceRegistry(

eurekaServerConfig,

eurekaClient.getEurekaClientConfig(),

serverCodecs,

eurekaClient

);

awsBinder =newAwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);

awsBinder.start();

}else{

// 初始化集群实例,父类AbstractInstanceRegistry 

registry =newPeerAwareInstanceRegistryImpl(

eurekaServerConfig,

eurekaClient.getEurekaClientConfig(),

serverCodecs,

eurekaClient

);

}

// 初始化集群节点实例

PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(

registry,

eurekaServerConfig,

eurekaClient.getEurekaClientConfig(),

serverCodecs,

applicationInfoManager

);

// 初始化EurekaSeverContext实例

serverContext =newDefaultEurekaServerContext(

eurekaServerConfig,

serverCodecs,

registry,

peerEurekaNodes,

applicationInfoManager

);

// 使用非注入的方式Holder保持EurekaServerContext实例

EurekaServerContextHolder.initialize(serverContext);

// EurekaServerContext初始化

serverContext.initialize();

logger.info("Initialized server context");

// 复制注册列表到集群上的其它节点

intregistryCount = registry.syncUp();

registry.openForTraffic(applicationInfoManager, registryCount);

// Register all monitoring statistics.

EurekaMonitors.registerAllStats();

}

// 初始化获得集群节点

protectedPeerEurekaNodesgetPeerEurekaNodes(PeerAwareInstanceRegistry registry, EurekaServerConfig eurekaServerConfig, EurekaClientConfig eurekaClientConfig, ServerCodecs serverCodecs, ApplicationInfoManager applicationInfoManager){

PeerEurekaNodes peerEurekaNodes =newPeerEurekaNodes(

registry,

eurekaServerConfig,

eurekaClientConfig,

serverCodecs,

applicationInfoManager

);

returnpeerEurekaNodes;

}

...

}

@Singleton

publicclassPeerAwareInstanceRegistryImplextendsAbstractInstanceRegistryimplementsPeerAwareInstanceRegistry{

/**

* 从eureka节点peer封装注册信息,进行节点信息的注册同步,如果操作失败则会进入重试

* Populates the registry information from a peer eureka node. This

* operation fails over to other nodes until the list is exhausted if the

* communication fails.

*/

@Override

publicintsyncUp(){

// Copy entire entry from neighboring DS node

intcount =0;

// 默认最大5次进行同步注册

for(inti =0; ((i < serverConfig.getRegistrySyncRetries()) && (count ==0)); i++) {

if(i >0) {

try{

// 重试等待30秒

Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());

}catch(InterruptedException e) {

logger.warn("Interrupted during registry transfer..");

break;

}

}

Applications apps = eurekaClient.getApplications();

for(Application app : apps.getRegisteredApplications()) {

for(InstanceInfo instance : app.getInstances()) {

try{

if(isRegisterable(instance)) {

// 节点注册

register(instance, instance.getLeaseInfo().getDurationInSecs(),true);

count++;

}

}catch(Throwable t) {

logger.error("During DS init copy", t);

}

}

}

}

returncount;

}

@Override

publicvoidopenForTraffic(ApplicationInfoManager applicationInfoManager,intcount){

// Renewals happen every 30 seconds and for a minute it should be a factor of 2.

// 每分钟服务续约的数量:每个节点服务续约每30秒一次,那么多个节点需要count*2次

this.expectedNumberOfRenewsPerMin = count *2;

// 服务续约最小百分阈值默认为0.85。即最小阈值为最小预约数乘以0.85

this.numberOfRenewsPerMinThreshold =

(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());

logger.info("Got "+ count +" instances from neighboring DS node");

logger.info("Renew threshold is: "+ numberOfRenewsPerMinThreshold);

this.startupTime = System.currentTimeMillis();

if(count >0) {

this.peerInstancesTransferEmptyOnStartup =false;

}

DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();

booleanisAws = Name.Amazon == selfName;

if(isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {

logger.info("Priming AWS connections for all replicas..");

primeAwsReplicas(applicationInfoManager);

}

logger.info("Changing status to UP");

applicationInfoManager.setInstanceStatus(InstanceStatus.UP);

// 调用父类的定时启动剔除任务定时启动剔除任务,一旦达到剔除条件则会调用服务下线接口,可以看 #Eureka服务下线 一篇

super.postInit();

}

}

// PeerAwareInstanceRegistryImpl父类

publicabstractclassAbstractInstanceRegistryimplementsInstanceRegistry{

...

/**

* Create a new, empty instance registry.

*/

protectedAbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs){

this.serverConfig = serverConfig;

this.clientConfig = clientConfig;

this.serverCodecs = serverCodecs;

// 近期下线队列recentCanceledQueue

this.recentCanceledQueue =newCircularQueue>(1000);

// 近期注册队列recentRegisteredQueue

this.recentRegisteredQueue =newCircularQueue>(1000);

// 近期1分钟的续约计量统计任务

this.renewsLastMin =newMeasuredRate(1000*60*1);

// 定时任务:定期清除近期变更队列

this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),

serverConfig.getDeltaRetentionTimerIntervalInMs(),

serverConfig.getDeltaRetentionTimerIntervalInMs());

}

privateTimerTaskgetDeltaRetentionTask(){

returnnewTimerTask() {

@Override

publicvoidrun(){

Iterator it = recentlyChangedQueue.iterator();

while(it.hasNext()) {

if(it.next().getLastUpdateTime() <

System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {

it.remove();

}else{

break;

}

}

}

};

}

...

// 定时启动剔除任务

protectedvoidpostInit(){

renewsLastMin.start();

if(evictionTaskRef.get() !=null) {

evictionTaskRef.get().cancel();

}

evictionTaskRef.set(newEvictionTask());

evictionTimer.schedule(evictionTaskRef.get(),

serverConfig.getEvictionIntervalTimerInMs(),

serverConfig.getEvictionIntervalTimerInMs());

}

/**

* Registers a new instance with a given duration.

* 给定一个租期时间注册一个新的实例

*@seecom.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)

*/

publicvoidregister(InstanceInfo registrant,intleaseDuration,booleanisReplication){

try{

read.lock();

// 通过appname获取实例注册数据

Map> gMap = registry.get(registrant.getAppName());

// 注册计数器+1

REGISTER.increment(isReplication);

// 如果实例对应数据不存在,则进行初始化

if(gMap ==null) {

finalConcurrentHashMap> gNewMap =newConcurrentHashMap>();

gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);

if(gMap ==null) {

gMap = gNewMap;

}

}

// 通过实例id获得实例租约信息,如果租约信息存在,那么会比较LastDirtyTimestamp,如果租约信息大于传进来的实例的LastDirtyTimestamp,那么则直接将使用缓存中的注册实例

Lease existingLease = gMap.get(registrant.getId());

// Retain the last dirty timestamp without overwriting it, if there is already a lease

if(existingLease !=null&& (existingLease.getHolder() !=null)) {

Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();

Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();

logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted

// InstanceInfo instead of the server local copy.

if(existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {

logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+

" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");

registrant = existingLease.getHolder();

}

}else{

// 租约信息不存在则作为一个新的注册

synchronized(lock) {

if(this.expectedNumberOfRenewsPerMin >0) {

// Since the client wants to cancel it, reduce the threshold

// (1

// for 30 seconds, 2 for a minute)

this.expectedNumberOfRenewsPerMin =this.expectedNumberOfRenewsPerMin +2;

this.numberOfRenewsPerMinThreshold =

(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());

}

}

logger.debug("No previous lease information found; it is new registration");

}

// 根据注册者和租约时间,实例化租约实例化信息

Lease lease =newLease(registrant, leaseDuration);

if(existingLease !=null) {

lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());

}

gMap.put(registrant.getId(), lease);

// 将注册者添加到最新注册队列

synchronized(recentRegisteredQueue) {

recentRegisteredQueue.add(newPair(

System.currentTimeMillis(),

registrant.getAppName() +"("+ registrant.getId() +")"));

}

// This is where the initial state transfer of overridden status happens

// 注册者重写状态不为UNKNOWN,并且重写状态Map不包含实例,则将其put到Map中

if(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {

logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "

+"overrides", registrant.getOverriddenStatus(), registrant.getId());

if(!overriddenInstanceStatusMap.containsKey(registrant.getId())) {

logger.info("Not found overridden id {} and hence adding it", registrant.getId());

overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());

}

}

InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());

if(overriddenStatusFromMap !=null) {

logger.info("Storing overridden status {} from map", overriddenStatusFromMap);

registrant.setOverriddenStatus(overriddenStatusFromMap);

}

// Set the status based on the overridden status rules

InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);

registrant.setStatusWithoutDirty(overriddenInstanceStatus);

// If the lease is registered with UP status, set lease service up timestamp

// 如果租约被注册后为UP状态,那么标记服务启动时间戳,而且只是首次才会进行设置

if(InstanceStatus.UP.equals(registrant.getStatus())) {

lease.serviceUp();

}

registrant.setActionType(ActionType.ADDED);

// 将租约存放到近期变更队列

recentlyChangedQueue.add(newRecentlyChangedItem(lease));

registrant.setLastUpdatedTimestamp();

// 使对应实例缓存失效

invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());

logger.info("Registered instance {}/{} with status {} (replication={})",

registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);

}finally{

read.unlock();

}

}

}

@Singleton

publicclassDefaultEurekaServerContextimplementsEurekaServerContext{

...

@PostConstruct

@Override

publicvoidinitialize()throwsException{

logger.info("Initializing ...");

peerEurekaNodes.start();

registry.init(peerEurekaNodes);

logger.info("Initialized");

}

...

}

// 集群节点:定时更新集群节点

@Singleton

publicclassPeerEurekaNodes{

...

publicvoidstart(){

taskExecutor = Executors.newSingleThreadScheduledExecutor(

newThreadFactory() {

@Override

publicThreadnewThread(Runnable r){

Thread thread =newThread(r,"Eureka-PeerNodesUpdater");

thread.setDaemon(true);

returnthread;

}

}

);

try{

// 预先更新集群节点

updatePeerEurekaNodes(resolvePeerUrls());

Runnable peersUpdateTask =newRunnable() {

@Override

publicvoidrun(){

try{

updatePeerEurekaNodes(resolvePeerUrls());

}catch(Throwable e) {

logger.error("Cannot update the replica Nodes", e);

}

}

};

taskExecutor.scheduleWithFixedDelay(

peersUpdateTask,

serverConfig.getPeerEurekaNodesUpdateIntervalMs(),

serverConfig.getPeerEurekaNodesUpdateIntervalMs(),

TimeUnit.MILLISECONDS

);

}catch(Exception e) {

thrownewIllegalStateException(e);

}

for(PeerEurekaNode node : peerEurekaNodes) {

logger.info("Replica node URL:  "+ node.getServiceUrl());

}

}

/**

* Given new set of replica URLs, destroy {@linkPeerEurekaNode}s no longer available, and

* create new ones. 新增集群备份节点,移除不再可用的节点,并且创建新的node节点

*

*@paramnewPeerUrls peer node URLs; this collection should have local node's URL filtered out

*/

protectedvoidupdatePeerEurekaNodes(List<String> newPeerUrls){

// 参数newPeerUrls为空则不处理

if(newPeerUrls.isEmpty()) {

logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");

return;

}

// 即将关闭的节点集合

Set toShutdown =newHashSet<>(peerEurekaNodeUrls);

// 因为新增的节点可能在即将关闭的节点集合内,所以先从中移除。

toShutdown.removeAll(newPeerUrls);

// 即将新增的节点集合

Set toAdd =newHashSet<>(newPeerUrls);

// 从新增节点集合中移除本地节点集合

toAdd.removeAll(peerEurekaNodeUrls);

// 如果即将关闭的节点集合为空并且即将新增的节点也为空,则说明没有变化,不需要处理,立即返回。

if(toShutdown.isEmpty() && toAdd.isEmpty()) {// No change

return;

}

// 移除不再可能的节点,当前包含全部已有节点

List newNodeList =newArrayList<>(peerEurekaNodes);

// 即将关闭的节点集合不为空,则将当前节点集合中移除对应节点

if(!toShutdown.isEmpty()) {

logger.info("Removing no longer available peer nodes {}", toShutdown);

inti =0;

while(i < newNodeList.size()) {

PeerEurekaNode eurekaNode = newNodeList.get(i);

if(toShutdown.contains(eurekaNode.getServiceUrl())) {

// 移除节点,节点进行关闭shutdown操作

newNodeList.remove(i);

eurekaNode.shutDown();

}else{

i++;

}

}

}

// 新增节点

if(!toAdd.isEmpty()) {

logger.info("Adding new peer nodes {}", toAdd);

for(String peerUrl : toAdd) {

newNodeList.add(createPeerEurekaNode(peerUrl));

}

}

// 重新赋值

this.peerEurekaNodes = newNodeList;

this.peerEurekaNodeUrls =newHashSet<>(newPeerUrls);

}

// 实时获取排除自身节点所剩下集群的节点地址

protectedListresolvePeerUrls(){

InstanceInfo myInfo = applicationInfoManager.getInfo();

String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);

List replicaUrls = EndpointUtils

.getDiscoveryServiceUrls(clientConfig, zone,newEndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

intidx =0;

while(idx < replicaUrls.size()) {

if(isThisMyUrl(replicaUrls.get(idx))) {

replicaUrls.remove(idx);

}else{

idx++;

}

}

returnreplicaUrls;

}

...

}

1)Eureka Server端启动入口类是继承ServletContextListener的EurekaBoostrap,首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。

2)调用EurekaServerContext上下文进行初始化:启动默认每10分钟定时更新集群节点数据、响应缓存初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。

3)集群节点的同步注册:支持失败最大5次重试,进行集群节点间的相互注册。初始化服务续约最小期望数量和最小续约阈值。

4)服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。

5)启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。

 总 结 

〓Eureka client

启动核心类DiscoveryClient,启动时会开启三个定时任务:

①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。

〓Eureka Server

启动入口类是EurekaBootStrap,核心类是PeerAwareInstanceRegistryImpl,

初始化:首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。

更新节点信息定时任务、定时更新续约数据:调用EurekaServerContext上下文进行初始化:启动默认每10分钟定时更新集群节点数据、响应缓存initializedResponseCache初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。

集群同步:集群节点的同步注册:支持失败最大5次重试,进行集群节点间的相互注册。

初始化服务续约最小期望数量和最小续约阈值。

本地缓存数据:服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。

剔除任务:启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。

这里只是针对Eureka启动初始化做了简要剖析,更多详细的篇章请看后面的分析。

PS:若哪里写的有误或者不明白的,请多多指教!


-关注搬运工来架构,与优秀的你一同进步-

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容