Eureka启动的过程有client端和server端, Eureka client端入口是DiscoveryClient类, Eureka server端入口是EurekaBootStrap类, 接下来我们就从源码看下它们做了什么吧!
〓Eureka Client端启动
1)看下DiscoveryClient类图:
由此看出DiscoveryClient实现了EurekaClient、LookupService接口,并且定义了内部类:DiscoverClientOptionalArgs,可选参数类,源码里实现为空,是默认实现,具体的需要去查看AbstractDiscoveryClientOptionalArgs这个抽象类;EurekaTransport类,封装了Client请求的类;CacheFreshThread,刷新缓存线程,提供定时拉取服务列表等;HeartbeatThread,心跳线程,提供定时向服务端续约服务等。
// DiscoveryClient类是一个单例,实现了EurekaClient接口
@Singleton
publicclassDiscoveryClientimplementsEurekaClient{
// DiscoveryClient类的构造函数
@Inject// 构造器注入
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider backupRegistryProvider) {
if(args !=null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
}else{
this.healthCheckCallbackProvider =null;
this.healthCheckHandlerProvider =null;
this.preRegistrationHandler =null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if(myInfo !=null) {
appPathIdentifier = instanceInfo.getAppName() +"/"+ instanceInfo.getId();
}else{
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer =newEndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(newApplications());
// 拉取服务计数器:单调地增加生成计数器,以确保陈旧的线程不会将注册表重置为旧版本。
fetchRegistryGeneration =newAtomicLong(0);
remoteRegionsToFetch =newAtomicReference(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef =newAtomicReference<>(remoteRegionsToFetch.get() ==null?null: remoteRegionsToFetch.get().split(","));
// 过时注册统计
if(config.shouldFetchRegistry()) {
this.registryStalenessMonitor =newThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX +"lastUpdateSec_",newlong[]{15L,30L,60L,120L,240L,480L});
}else{
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 过时心跳统计
if(config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor =newThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX +"lastHeartbeatSec_",newlong[]{15L,30L,60L,120L,240L,480L});
}else{
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// 既不需要注册到Eureka也不拉取注册服务
if(!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler =null;
heartbeatExecutor =null;
cacheRefreshExecutor =null;
eurekaTransport =null;
instanceRegionChecker =newInstanceRegionChecker(newPropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs,this.getApplications().size());
return;// no need to setup up an network tasks and we are done
}
try{
// 定义了2个线程大小的定时线程池:一个是刷新缓存CacheFreshThread,一个是心跳线程HeartbeatThread
scheduler = Executors.newScheduledThreadPool(2,
newThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 心跳线程池
heartbeatExecutor =newThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(),0, TimeUnit.SECONDS,
newSynchronousQueue(),
newThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);// use direct handoff
// 刷新缓存线程池
cacheRefreshExecutor =newThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(),0, TimeUnit.SECONDS,
newSynchronousQueue(),
newThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);// use direct handoff
// 实例化EurekaTransport
eurekaTransport =newEurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if(clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper =newDNSBasedAzToRegionMapper(clientConfig);
}else{
azToRegionMapper =newPropertyBasedAzToRegionMapper(clientConfig);
}
if(null!= remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker =newInstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
}catch(Throwable e) {
thrownewRuntimeException("Failed to initialize DiscoveryClient!", e);
}
// 在启动时需要拉取注册服务列表,增量拉取之后如果失败就会从备份里面再次拉取
if(clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if(this.preRegistrationHandler !=null) {
this.preRegistrationHandler.beforeRegistration();
}
// 初始化定时任务
initScheduledTasks();
try{
// 监控DiscoverClient
Monitors.registerObject(this);
}catch(Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs,this.getApplications().size());
}
/**
* Initializes all scheduled tasks.
*/
privatevoidinitScheduledTasks(){
// 需要拉取注册服务,则定时拉取刷新缓存CacheRefreshThread,可以看#Eureka服务拉取 一篇
if(clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 默认30秒,定时拉取服务
intregistryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
intexpBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
newTimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
newCacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 需要注册到Eureka,则定时心跳请求服务端保持客户端存活,即服务续约。可以看#Eureka服务续约 一篇
if(clientConfig.shouldRegisterWithEureka()) {
// 默认30秒,定时进行服务续约
intrenewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
intexpBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: "+"renew interval is: "+ renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
newTimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
newHeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator 当前实例节点复制器实例化
instanceInfoReplicator =newInstanceInfoReplicator(
this,
instanceInfo,
// 实例信息复制间隔,默认30秒
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2);// burstSize
// 状态变化监听器
statusChangeListener =newApplicationInfoManager.StatusChangeListener() {
@Override
publicStringgetId()
{
return"statusChangeListener";
}
@Override
publicvoidnotify(StatusChangeEvent statusChangeEvent)
{
if(InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
}else{
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if(clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 实例节点复制器启动
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}else{
logger.info("Not registering with Eureka server per configuration");
}
}
}
classInstanceInfoReplicatorimplementsRunnable{
// 启动,延迟40秒启动
publicvoidstart(intinitialDelayMs){
// CAS保证启动一次
if(started.compareAndSet(false,true)) {
instanceInfo.setIsDirty();// for initial register
// 定时任务定时调用实例信息复制器(线程),逻辑看run方法
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
publicvoidrun(){
try{
// 刷新实例信息
discoveryClient.refreshInstanceInfo();
// 获取实例信息的脏时间戳,如果存在则进行服务注册,服务注册可以看 #Eureka服务注册 一篇
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if(dirtyTimestamp !=null) {
//Client进行注册操作
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
}catch(Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
}finally{
// 定时每30秒进行刷新或注册请求
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
Eureka Client启动时会开启三个定时任务:
①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。
〓Eureka Server端启动
EurekaBootStrap是server端启动入口,
PeerAwareInstanceRegistryImpl是真正的核心类,我们看下其类图:
// 继承Servlet上下文监听器,说明Eureka Server是基于Servlet
publicclassEurekaBootStrapimplementsServletContextListener{
...
@Override
publicvoidcontextInitialized(ServletContextEvent event){
try{
// 初始化环境
initEurekaEnvironment();
// 初始化Euraka Server Context
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
}catch(Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
thrownewRuntimeException("Cannot bootstrap eureka server :", e);
}
}
/**
* init hook for server context. Override for custom logic.
*/
protectedvoidinitEurekaServerContext()throwsException{
// Eureka Server读取配置
EurekaServerConfig eurekaServerConfig =newDefaultEurekaServerConfig();
// For backward compatibility
JsonXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(newV1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
logger.info("Initializing the eureka client...");
logger.info(eurekaServerConfig.getJsonCodecName());
// Server默认加载编码JSON、XML
ServerCodecs serverCodecs =newDefaultServerCodecs(eurekaServerConfig);
ApplicationInfoManager applicationInfoManager =null;
// EurekaClient初始化
if(eurekaClient ==null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
?newCloudInstanceConfig()
:newMyDataCenterInstanceConfig();
applicationInfoManager =newApplicationInfoManager(
instanceConfig,newEurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig =newDefaultEurekaClientConfig();
eurekaClient =newDiscoveryClient(applicationInfoManager, eurekaClientConfig);
}else{
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
// 如果是使用AWS平台,这里不涉及
PeerAwareInstanceRegistry registry;
if(isAws(applicationInfoManager.getInfo())) {
registry =newAwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder =newAwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
}else{
// 初始化集群实例,父类AbstractInstanceRegistry
registry =newPeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
// 初始化集群节点实例
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
// 初始化EurekaSeverContext实例
serverContext =newDefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
// 使用非注入的方式Holder保持EurekaServerContext实例
EurekaServerContextHolder.initialize(serverContext);
// EurekaServerContext初始化
serverContext.initialize();
logger.info("Initialized server context");
// 复制注册列表到集群上的其它节点
intregistryCount = registry.syncUp();
registry.openForTraffic(applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
// 初始化获得集群节点
protectedPeerEurekaNodesgetPeerEurekaNodes(PeerAwareInstanceRegistry registry, EurekaServerConfig eurekaServerConfig, EurekaClientConfig eurekaClientConfig, ServerCodecs serverCodecs, ApplicationInfoManager applicationInfoManager){
PeerEurekaNodes peerEurekaNodes =newPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClientConfig,
serverCodecs,
applicationInfoManager
);
returnpeerEurekaNodes;
}
...
}
@Singleton
publicclassPeerAwareInstanceRegistryImplextendsAbstractInstanceRegistryimplementsPeerAwareInstanceRegistry{
/**
* 从eureka节点peer封装注册信息,进行节点信息的注册同步,如果操作失败则会进入重试
* Populates the registry information from a peer eureka node. This
* operation fails over to other nodes until the list is exhausted if the
* communication fails.
*/
@Override
publicintsyncUp(){
// Copy entire entry from neighboring DS node
intcount =0;
// 默认最大5次进行同步注册
for(inti =0; ((i < serverConfig.getRegistrySyncRetries()) && (count ==0)); i++) {
if(i >0) {
try{
// 重试等待30秒
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
}catch(InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for(Application app : apps.getRegisteredApplications()) {
for(InstanceInfo instance : app.getInstances()) {
try{
if(isRegisterable(instance)) {
// 节点注册
register(instance, instance.getLeaseInfo().getDurationInSecs(),true);
count++;
}
}catch(Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
returncount;
}
@Override
publicvoidopenForTraffic(ApplicationInfoManager applicationInfoManager,intcount){
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
// 每分钟服务续约的数量:每个节点服务续约每30秒一次,那么多个节点需要count*2次
this.expectedNumberOfRenewsPerMin = count *2;
// 服务续约最小百分阈值默认为0.85。即最小阈值为最小预约数乘以0.85
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got "+ count +" instances from neighboring DS node");
logger.info("Renew threshold is: "+ numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if(count >0) {
this.peerInstancesTransferEmptyOnStartup =false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
booleanisAws = Name.Amazon == selfName;
if(isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 调用父类的定时启动剔除任务定时启动剔除任务,一旦达到剔除条件则会调用服务下线接口,可以看 #Eureka服务下线 一篇
super.postInit();
}
}
// PeerAwareInstanceRegistryImpl父类
publicabstractclassAbstractInstanceRegistryimplementsInstanceRegistry{
...
/**
* Create a new, empty instance registry.
*/
protectedAbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs){
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
// 近期下线队列recentCanceledQueue
this.recentCanceledQueue =newCircularQueue>(1000);
// 近期注册队列recentRegisteredQueue
this.recentRegisteredQueue =newCircularQueue>(1000);
// 近期1分钟的续约计量统计任务
this.renewsLastMin =newMeasuredRate(1000*60*1);
// 定时任务:定期清除近期变更队列
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
privateTimerTaskgetDeltaRetentionTask(){
returnnewTimerTask() {
@Override
publicvoidrun(){
Iterator it = recentlyChangedQueue.iterator();
while(it.hasNext()) {
if(it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
}else{
break;
}
}
}
};
}
...
// 定时启动剔除任务
protectedvoidpostInit(){
renewsLastMin.start();
if(evictionTaskRef.get() !=null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(newEvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
/**
* Registers a new instance with a given duration.
* 给定一个租期时间注册一个新的实例
*@seecom.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
publicvoidregister(InstanceInfo registrant,intleaseDuration,booleanisReplication){
try{
read.lock();
// 通过appname获取实例注册数据
Map> gMap = registry.get(registrant.getAppName());
// 注册计数器+1
REGISTER.increment(isReplication);
// 如果实例对应数据不存在,则进行初始化
if(gMap ==null) {
finalConcurrentHashMap> gNewMap =newConcurrentHashMap>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if(gMap ==null) {
gMap = gNewMap;
}
}
// 通过实例id获得实例租约信息,如果租约信息存在,那么会比较LastDirtyTimestamp,如果租约信息大于传进来的实例的LastDirtyTimestamp,那么则直接将使用缓存中的注册实例
Lease existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if(existingLease !=null&& (existingLease.getHolder() !=null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if(existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
}else{
// 租约信息不存在则作为一个新的注册
synchronized(lock) {
if(this.expectedNumberOfRenewsPerMin >0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin =this.expectedNumberOfRenewsPerMin +2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 根据注册者和租约时间,实例化租约实例化信息
Lease lease =newLease(registrant, leaseDuration);
if(existingLease !=null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
// 将注册者添加到最新注册队列
synchronized(recentRegisteredQueue) {
recentRegisteredQueue.add(newPair(
System.currentTimeMillis(),
registrant.getAppName() +"("+ registrant.getId() +")"));
}
// This is where the initial state transfer of overridden status happens
// 注册者重写状态不为UNKNOWN,并且重写状态Map不包含实例,则将其put到Map中
if(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+"overrides", registrant.getOverriddenStatus(), registrant.getId());
if(!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if(overriddenStatusFromMap !=null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 如果租约被注册后为UP状态,那么标记服务启动时间戳,而且只是首次才会进行设置
if(InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
// 将租约存放到近期变更队列
recentlyChangedQueue.add(newRecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 使对应实例缓存失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
}finally{
read.unlock();
}
}
}
@Singleton
publicclassDefaultEurekaServerContextimplementsEurekaServerContext{
...
@PostConstruct
@Override
publicvoidinitialize()throwsException{
logger.info("Initializing ...");
peerEurekaNodes.start();
registry.init(peerEurekaNodes);
logger.info("Initialized");
}
...
}
// 集群节点:定时更新集群节点
@Singleton
publicclassPeerEurekaNodes{
...
publicvoidstart(){
taskExecutor = Executors.newSingleThreadScheduledExecutor(
newThreadFactory() {
@Override
publicThreadnewThread(Runnable r){
Thread thread =newThread(r,"Eureka-PeerNodesUpdater");
thread.setDaemon(true);
returnthread;
}
}
);
try{
// 预先更新集群节点
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask =newRunnable() {
@Override
publicvoidrun(){
try{
updatePeerEurekaNodes(resolvePeerUrls());
}catch(Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
}catch(Exception e) {
thrownewIllegalStateException(e);
}
for(PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: "+ node.getServiceUrl());
}
}
/**
* Given new set of replica URLs, destroy {@linkPeerEurekaNode}s no longer available, and
* create new ones. 新增集群备份节点,移除不再可用的节点,并且创建新的node节点
*
*@paramnewPeerUrls peer node URLs; this collection should have local node's URL filtered out
*/
protectedvoidupdatePeerEurekaNodes(List<String> newPeerUrls){
// 参数newPeerUrls为空则不处理
if(newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
// 即将关闭的节点集合
Set toShutdown =newHashSet<>(peerEurekaNodeUrls);
// 因为新增的节点可能在即将关闭的节点集合内,所以先从中移除。
toShutdown.removeAll(newPeerUrls);
// 即将新增的节点集合
Set toAdd =newHashSet<>(newPeerUrls);
// 从新增节点集合中移除本地节点集合
toAdd.removeAll(peerEurekaNodeUrls);
// 如果即将关闭的节点集合为空并且即将新增的节点也为空,则说明没有变化,不需要处理,立即返回。
if(toShutdown.isEmpty() && toAdd.isEmpty()) {// No change
return;
}
// 移除不再可能的节点,当前包含全部已有节点
List newNodeList =newArrayList<>(peerEurekaNodes);
// 即将关闭的节点集合不为空,则将当前节点集合中移除对应节点
if(!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
inti =0;
while(i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if(toShutdown.contains(eurekaNode.getServiceUrl())) {
// 移除节点,节点进行关闭shutdown操作
newNodeList.remove(i);
eurekaNode.shutDown();
}else{
i++;
}
}
}
// 新增节点
if(!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for(String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
// 重新赋值
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls =newHashSet<>(newPeerUrls);
}
// 实时获取排除自身节点所剩下集群的节点地址
protectedListresolvePeerUrls(){
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone,newEndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
intidx =0;
while(idx < replicaUrls.size()) {
if(isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
}else{
idx++;
}
}
returnreplicaUrls;
}
...
}
1)Eureka Server端启动入口类是继承ServletContextListener的EurekaBoostrap,首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。
2)调用EurekaServerContext上下文进行初始化:启动默认每10分钟定时更新集群节点数据、响应缓存初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。
3)集群节点的同步注册:支持失败最大5次重试,进行集群节点间的相互注册。初始化服务续约最小期望数量和最小续约阈值。
4)服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。
5)启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。
总 结
〓Eureka client
启动核心类DiscoveryClient,启动时会开启三个定时任务:
①刷新缓存定时服务,即定时拉取服务列表,默认每30秒进行定时拉取服务列表;同时②开启心跳线程定时服务,即定时向服务端进行服务续约,默认每30秒进行定时续约。③启动实例信息复制器进行刷新服务实例信息或服务注册请求。
〓Eureka Server
启动入口类是EurekaBootStrap,核心类是PeerAwareInstanceRegistryImpl,
初始化:首先会初始化eureka环境和初始化上下文,初始化上下文时会进行EurekaServer配置的初始化、JSON、XML编码器转化注册、初始化集群注册实例、和集群节点实例等。
更新节点信息定时任务、定时更新续约数据:调用EurekaServerContext上下文进行初始化:启动默认每10分钟定时更新集群节点数据、响应缓存initializedResponseCache初始化、启动默认每15分钟定时任务更新服务续约最小期望数量和最小续约阈值。
集群同步:集群节点的同步注册:支持失败最大5次重试,进行集群节点间的相互注册。
初始化服务续约最小期望数量和最小续约阈值。
本地缓存数据:服务端注册表维护了近期下线环形队列recentCanceledQueue、近期注册环形队列recentRegisteredQueue、近期变化队列recentlyChangeQueue。
剔除任务:启动默认每1分钟定时任务EvictionTask从最新变化队列中清除过期项(内存记录节点数据)。
这里只是针对Eureka启动初始化做了简要剖析,更多详细的篇章请看后面的分析。
PS:若哪里写的有误或者不明白的,请多多指教!
-关注搬运工来架构,与优秀的你一同进步-