总览
Eureka 分为 Server 和 Client。
而Eureka Server既作为server接受client的注册,又作为client向集群中的其他server实例注册自己。所以后文以Eureka Server为线索进行源码分析,既覆盖了server部分,也覆盖了client部分。
配置相关
通过org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
类进行自动配置。
注意该类的一个注解,
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
即,如果没有配置eureka.client.enabled
,那么默认为true,开启eureka的client功能。
此类也会实例化org.springframework.cloud.netflix.eureka.EurekaClientConfigBean
和org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean
两个配置实例。
分别对应了eureka.client
和eureka.instance
的相关配置。eureka.client
内的配置是作为client的一些行为相关的配置,如server地址,是否向server注册本client等。eureka.instance
内的配置是作为instance的一些相关配置,如本实例的instanceId,hostname等。
向其他server发送请求
- 程序启动后,实例化bean的过程中
// Instantiate all remaining (non-lazy-init) singletons.
beanFactory.preInstantiateSingletons();
这其中会实例化com.netflix.discovery.DiscoveryClient
。
DiscoveryClient
的构造函数执行过程中会fetch registry。即
if (clientConfig.shouldFetchRegistry()) {
boolean primaryFetchRegistryResult = fetchRegistry(false);
url为http://{host}:{port}/eureka/apps/
。
然后initScheduledTasks();
这些定时任务包括了
- if shouldFetchRegistry ,fetch registry (默认 30秒后执行一次,然后每隔30秒执行一次)
fetchRegistry(boolean forceFullRegistryFetch)
方法内根据applications.getRegisteredApplications().size() == 0
来抉择使用哪个url来fetch,http://{host}:{port}/eureka/apps/
(全量更新)
或是http://{host}:{port}/eureka/apps/delta
(增量更新) - if shouldRegisterWithEureka
- heartbeat (默认30秒后执行一次,然后每隔30秒执行一次)
url为http://{host}:{port}/eureka/apps/UNKNOWN/{instanceId}?status=UP&lastDirtyTimestamp=1610865104367
- InstanceInfoReplicator(默认40秒后执行一次)
com.netflix.discovery.InstanceInfoReplicator - 注册状态变化监听器
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
监听器现在不会被调用,接到事件时,会调用instanceInfoReplicator.onDemandUpdate();
(被调用的过程详见2)
- heartbeat (默认30秒后执行一次,然后每隔30秒执行一次)
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
finishRefresh();
1中的实例化bean之后,会执行finishRefresh()
。该方法会调用所有实现了Lifecycle
接口的bean的Lifecycle.start()
方法。然后依次调用
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration#start
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry#register
最后com.netflix.appinfo.ApplicationInfoManager#setInstanceStatusr
中调用监听器。
1中注册的响应函数被调用com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate
然后在新的线程中立即执行一次InstanceInfoReplicator.this.run();
,然后每隔30秒周期执行一次本函数
函数中会执行discoveryClient.register();
发送http请求,url为http://{host}:{port}/eureka/apps/UNKNOWN
,向server注册本实例信息。
注意
discoveryClient.register() 中的http请求发出后,eureka注册中心中就会注册上本服务,流量就会进入到服务器中
而springboot服务需要执行完TomcatWebServer.start(),才会暴露http端口号,才能实际接收http请求。见SpringBoot中Tomcat的源码分析一文
虽然二者是在不同的线程中执行的。但某些版本的springboot中,这两个函数触发顺序反了。有可能会先执行discoveryClient.register(),再执行TomcatWebServer.start()。即先注册至服务中,而此时http端口还不可用,最终导致服务的启动不平滑
- 发送请求的http client默认是
com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient
继承了com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient
抽象类
实现了com.netflix.discovery.shared.transport.EurekaHttpClient
接口
接受请求
- 启动后
EurekaController
负责接收Eureka网页请求 - restfule接口
https://github.com/Netflix/eureka/wiki/Eureka-REST-operations 文档中记录了restful接口实例。不过实际代码中使用的url与文档不同,url中不包含v2
,虽然不包括v2
,但默认版本是v2,所以逻辑一致。
2.1 eureka-core中的com.netflix.eureka.resources包内的*Resource
类中包含了实例间通信的restful接口,功能类似controller。
2.2 spring-cloud-netflix-eureka-server-3.0.0.jar 包中的org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#jerseyApplication
方法实例化了javax.ws.rs.core.Application
这个Bean。实例化的过程中会扫描2.1
中提到的部分*Resource
类。这些类会在后续的构造出url及其对对应的
2.3 jersey-server.jar
中的com.sun.jersey.server.impl.application.WebApplicationImpl#_initiate
方法末尾处
RulesMap<UriRule> rootRules = new RootResourceUriRules(this,
resourceConfig, wadlFactory, injectableFactory).getRules();
这是根据2.2
中扫描出的*Resource
类构造出RulesMap
,是一种规则映射,可以将restful请求映射到对应的处理method。
2.4 jersey-servlet.jar
中的com.sun.jersey.spi.container.servlet.WebComponent#service
方法中的语句_application.handleRequest(cRequest, w);
,将http request与2.3
中的ruleMap进行匹配,找到对应的处理method。
jersey-server.jar
中的com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider.ResponseOutInvoker#_dispatch
方法中通过反射调用具体的处理method。
InstanceRegistry
org.springframework.cloud.netflix.eureka.server.InstanceRegistry
是核心类。
该类有一个缓存属性,是继承于父类的属性
com.netflix.eureka.registry.AbstractInstanceRegistry#responseCache
,它默认使用的是com.netflix.eureka.registry.ResponseCacheImpl
实现。
ResponseCacheImpl
中有两个属性
ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>()
和LoadingCache<Key, Value> readWriteCacheMap
。其中readWriteCacheMap
是一个基于com.google.common.cache.LoadingCache
构建的缓存。
com.netflix.eureka.registry.ResponseCacheImpl#generatePayload
方法用于生成readWriteCacheMap
缓存的值。缓存key为com.netflix.eureka.registry.Key
。generatePayload
方法内部根据不同的Key执行不同的加载逻辑。
Eureka-CacheFillTimer
线程每隔30s执行一次com.netflix.eureka.registry.ResponseCacheImpl#getCacheUpdateTask
,将ResponseCacheImpl
内的readWriteCacheMap
中的缓存信息更新至readOnlyCacheMap
。
/eureka/apps/
拉取实例信息列表
http method: GET
- server 调用方
调用链为依次为
getApplicationsInternal:189, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
getApplications:167, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
.....
getApplications:134, EurekaHttpClientDecorator (com.netflix.discovery.shared.transport.decorator)
getAndStoreFullRegistry:1101, DiscoveryClient (com.netflix.discovery)
fetchRegistry:1014, DiscoveryClient (com.netflix.discovery)
<init>:441, DiscoveryClient (com.netflix.discovery)
server 调用方仅仅请求/eureka/apps/
url,没有添加任何参数。
服务端返回的response是一个com.netflix.discovery.shared.Applications
,然后将该对象处理一下之后,存储在com.netflix.discovery.DiscoveryClient#localRegionApps
中。
- server 接收方
handle method:com.netflix.eureka.resources.ApplicationsResource#getContainers
该方法使用(entityType:Application, entityName:ALL_APPS,等)构造com.netflix.eureka.registry.Key,通过这个key在responseCache中查找缓存值。缓存的相关信息见上面的InstanceRegistry一节。
如果没有缓存,使用com.netflix.eureka.registry.AbstractInstanceRegistry#getApplications
生成返回值并缓存。此方法调用com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationsFromMultipleRegions
,返回com.netflix.eureka.registry.AbstractInstanceRegistry#registry
中的全部已注册实例信息。
Eureka Server启动后发出的第一个请求就是这个请求,但是这时候收到的response是空列表,因为还没有任何实例在server中注册。
/eureka/apps/{appName}
向服务器注册实例信息
http method: POST
- server 调用方
调用链为依次为
register:48, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
......
register:56, EurekaHttpClientDecorator (com.netflix.discovery.shared.transport.decorator)
register:876, DiscoveryClient (com.netflix.discovery)
run:121, InstanceInfoReplicator (com.netflix.discovery)
run:101, InstanceInfoReplicator$1 (com.netflix.discovery)
server 调用方使用com.netflix.appinfo.InstanceInfo#getAppName
作为url参数中的{appName},默认为UNKNOWN。POST body为InstanceInfo。
InstanceInfo
实例化于org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.RefreshableEurekaClientConfiguration#eurekaApplicationInfoManager
,字段信息来源于org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean
配置类。
而这个配置类又实例化于org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration#eurekaInstanceConfigBean
。
EurekaInstanceConfigBean
的初始化过程会赋予一些一些关键字段默认值。
- appName 默认为 UNKNOWN
- hostInfo 默认来源于
org.springframework.cloud.commons.util.InetUtils#findFirstNonLoopbackHostInfo
,其中使用java.net.NetworkInterface#getNetworkInterfaces
方法获取所有网卡信息。 - ipAddress 默认为hostInfo.getIpAddress()
- hostname 默认为hostInfo.getHostname()
- instanceId 默认为{hostname}:{port}
之后这些字段会被人工配置值覆盖掉。
- server 接收方
handle method
首先根据路径/eureka/apps/{appName}
匹配至方法com.netflix.eureka.resources.ApplicationsResource#getApplicationResource
。
该方法实例化了一个com.netflix.eureka.resources.ApplicationResource
对象并返回,然后继续匹配,匹配至该对象的方法com.netflix.eureka.resources.ApplicationResource#addInstance
。
注册方法
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register()
注册到 <appName, <instanceId, Lease<InstanceInfo>>> 嵌套的map中。
注册成功后responseCache.invalidate()
重置缓存,此缓存曾在上面的 InstanceRegistry 一节中提到。
然后执行replicateToPeers()
,将信息同步至集群,使用类似下面的请求来同步信息。
http://server-peer0:8080/eureka/peerreplication/batch/
/eureka/apps/{appName}/{instanceId}?status={status}&lastDirtyTimestamp={lastDirtyTimestamp}
心跳信息
http method: PUT
- server 调用方
sendHeartBeat:103, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
.....
sendHeartBeat:89, EurekaHttpClientDecorator (com.netflix.discovery.shared.transport.decorator)
renew:893, DiscoveryClient (com.netflix.discovery)
run:1457, DiscoveryClient$HeartbeatThread (com.netflix.discovery)
该请求没有http body
- server 接收方
依次匹配
com.netflix.eureka.resources.ApplicationsResource#getApplicationResource
com.netflix.eureka.resources.ApplicationResource#getInstanceInfo
com.netflix.eureka.resources.InstanceResource#renewLease
刷新存活时间,同步至集群
/eureka/apps/delta
增量更新
http method: GET
- server 调用方
getApplicationsInternal:189, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
getDelta:172, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
......
getDelta:149, EurekaHttpClientDecorator (com.netflix.discovery.shared.transport.decorator)
getAndUpdateDelta:1135, DiscoveryClient (com.netflix.discovery)
fetchRegistry:1016, DiscoveryClient (com.netflix.discovery)
refreshRegistry:1531, DiscoveryClient (com.netflix.discovery)
run:1498, DiscoveryClient$CacheRefreshThread (com.netflix.discovery)
此请求的返回值和/eureka/apps/{appName}请求一样,是一个com.netflix.discovery.shared.Applications
。然后通过com.netflix.discovery.DiscoveryClient#updateDelta
方法将增量的变化信息存储至com.netflix.discovery.DiscoveryClient#localRegionApps
中。
- server 接收方
com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential
该方法使用(entityType:Application, entityName:ALL_APPS_DELTA,等)构造com.netflix.eureka.registry.Key,通过这个key在responseCache中查找缓存值。缓存的相关信息见上面的InstanceRegistry一节。
如果没有缓存,使用com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltass
生成返回值并缓存。
Client
通过上文对 Server 的流程可知,Eureka Client 节点的主要逻辑均在于com.netflix.discovery.DiscoveryClient
类,集群内的节点信息存储于localRegionApps
属性中,使用各种get方法获取。