Eureka 源码分析

总览

Eureka 分为 Server 和 Client。
而Eureka Server既作为server接受client的注册,又作为client向集群中的其他server实例注册自己。所以后文以Eureka Server为线索进行源码分析,既覆盖了server部分,也覆盖了client部分。

配置相关

通过org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration类进行自动配置。
注意该类的一个注解,
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
即,如果没有配置eureka.client.enabled,那么默认为true,开启eureka的client功能。
此类也会实例化org.springframework.cloud.netflix.eureka.EurekaClientConfigBeanorg.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean两个配置实例。
分别对应了eureka.clienteureka.instance的相关配置。eureka.client内的配置是作为client的一些行为相关的配置,如server地址,是否向server注册本client等。eureka.instance内的配置是作为instance的一些相关配置,如本实例的instanceId,hostname等。

向其他server发送请求

  1. 程序启动后,实例化bean的过程中
// Instantiate all remaining (non-lazy-init) singletons.
beanFactory.preInstantiateSingletons();

这其中会实例化com.netflix.discovery.DiscoveryClient
DiscoveryClient的构造函数执行过程中会fetch registry。即

if (clientConfig.shouldFetchRegistry()) {
    boolean primaryFetchRegistryResult = fetchRegistry(false);

url为http://{host}:{port}/eureka/apps/
然后initScheduledTasks();
这些定时任务包括了

  • if shouldFetchRegistry ,fetch registry (默认 30秒后执行一次,然后每隔30秒执行一次)
    fetchRegistry(boolean forceFullRegistryFetch)方法内根据applications.getRegisteredApplications().size() == 0来抉择使用哪个url来fetch,http://{host}:{port}/eureka/apps/(全量更新)
    或是http://{host}:{port}/eureka/apps/delta(增量更新)
  • if shouldRegisterWithEureka
    • heartbeat (默认30秒后执行一次,然后每隔30秒执行一次)
      url为http://{host}:{port}/eureka/apps/UNKNOWN/{instanceId}?status=UP&lastDirtyTimestamp=1610865104367
    • InstanceInfoReplicator(默认40秒后执行一次)
      com.netflix.discovery.InstanceInfoReplicator
    • 注册状态变化监听器
      applicationInfoManager.registerStatusChangeListener(statusChangeListener);
      监听器现在不会被调用,接到事件时,会调用instanceInfoReplicator.onDemandUpdate();
      (被调用的过程详见2)
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);

// Last step: publish corresponding event.
finishRefresh();

1中的实例化bean之后,会执行finishRefresh()。该方法会调用所有实现了Lifecycle接口的bean的Lifecycle.start()方法。然后依次调用
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration#start
org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry#register
最后com.netflix.appinfo.ApplicationInfoManager#setInstanceStatusr中调用监听器。
1中注册的响应函数被调用com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate
然后在新的线程中立即执行一次InstanceInfoReplicator.this.run();,然后每隔30秒周期执行一次本函数
函数中会执行discoveryClient.register();
发送http请求,url为http://{host}:{port}/eureka/apps/UNKNOWN,向server注册本实例信息。

注意
discoveryClient.register() 中的http请求发出后,eureka注册中心中就会注册上本服务,流量就会进入到服务器中
而springboot服务需要执行完TomcatWebServer.start(),才会暴露http端口号,才能实际接收http请求。见SpringBoot中Tomcat的源码分析一文
虽然二者是在不同的线程中执行的。但某些版本的springboot中,这两个函数触发顺序反了。有可能会先执行discoveryClient.register(),再执行TomcatWebServer.start()。即先注册至服务中,而此时http端口还不可用,最终导致服务的启动不平滑

  1. 发送请求的http client默认是com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient
    继承了com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient抽象类
    实现了 com.netflix.discovery.shared.transport.EurekaHttpClient接口

接受请求

  1. 启动后EurekaController负责接收Eureka网页请求
  2. restfule接口
    https://github.com/Netflix/eureka/wiki/Eureka-REST-operations 文档中记录了restful接口实例。不过实际代码中使用的url与文档不同,url中不包含v2,虽然不包括v2,但默认版本是v2,所以逻辑一致。

2.1 eureka-core中的com.netflix.eureka.resources包内的*Resource类中包含了实例间通信的restful接口,功能类似controller。
2.2 spring-cloud-netflix-eureka-server-3.0.0.jar 包中的org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration#jerseyApplication方法实例化了javax.ws.rs.core.Application这个Bean。实例化的过程中会扫描2.1中提到的部分*Resource类。这些类会在后续的构造出url及其对对应的

2.3 jersey-server.jar中的com.sun.jersey.server.impl.application.WebApplicationImpl#_initiate方法末尾处

RulesMap<UriRule> rootRules = new RootResourceUriRules(this,
        resourceConfig, wadlFactory, injectableFactory).getRules();

这是根据2.2中扫描出的*Resource类构造出RulesMap,是一种规则映射,可以将restful请求映射到对应的处理method。

2.4 jersey-servlet.jar中的com.sun.jersey.spi.container.servlet.WebComponent#service方法中的语句_application.handleRequest(cRequest, w);,将http request与2.3中的ruleMap进行匹配,找到对应的处理method。
jersey-server.jar中的com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider.ResponseOutInvoker#_dispatch方法中通过反射调用具体的处理method。

InstanceRegistry

org.springframework.cloud.netflix.eureka.server.InstanceRegistry是核心类。
该类有一个缓存属性,是继承于父类的属性
com.netflix.eureka.registry.AbstractInstanceRegistry#responseCache,它默认使用的是com.netflix.eureka.registry.ResponseCacheImpl实现。
ResponseCacheImpl中有两个属性
ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>()LoadingCache<Key, Value> readWriteCacheMap。其中readWriteCacheMap是一个基于com.google.common.cache.LoadingCache构建的缓存。

com.netflix.eureka.registry.ResponseCacheImpl#generatePayload方法用于生成readWriteCacheMap缓存的值。缓存key为com.netflix.eureka.registry.KeygeneratePayload方法内部根据不同的Key执行不同的加载逻辑。

Eureka-CacheFillTimer线程每隔30s执行一次com.netflix.eureka.registry.ResponseCacheImpl#getCacheUpdateTask,将ResponseCacheImpl内的readWriteCacheMap中的缓存信息更新至readOnlyCacheMap

/eureka/apps/

拉取实例信息列表
http method: GET

  1. server 调用方
    调用链为依次为
getApplicationsInternal:189, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
getApplications:167, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
.....
getApplications:134, EurekaHttpClientDecorator (com.netflix.discovery.shared.transport.decorator)
getAndStoreFullRegistry:1101, DiscoveryClient (com.netflix.discovery)
fetchRegistry:1014, DiscoveryClient (com.netflix.discovery)
<init>:441, DiscoveryClient (com.netflix.discovery)

server 调用方仅仅请求/eureka/apps/url,没有添加任何参数。
服务端返回的response是一个com.netflix.discovery.shared.Applications,然后将该对象处理一下之后,存储在com.netflix.discovery.DiscoveryClient#localRegionApps中。

  1. server 接收方
    handle method: com.netflix.eureka.resources.ApplicationsResource#getContainers
    该方法使用(entityType:Application, entityName:ALL_APPS,等)构造com.netflix.eureka.registry.Key,通过这个key在responseCache中查找缓存值。缓存的相关信息见上面的InstanceRegistry一节。
    如果没有缓存,使用com.netflix.eureka.registry.AbstractInstanceRegistry#getApplications生成返回值并缓存。此方法调用com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationsFromMultipleRegions,返回com.netflix.eureka.registry.AbstractInstanceRegistry#registry中的全部已注册实例信息。
    Eureka Server启动后发出的第一个请求就是这个请求,但是这时候收到的response是空列表,因为还没有任何实例在server中注册。

/eureka/apps/{appName}

向服务器注册实例信息
http method: POST

  1. server 调用方
    调用链为依次为
register:48, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
......
register:56, EurekaHttpClientDecorator (com.netflix.discovery.shared.transport.decorator)
register:876, DiscoveryClient (com.netflix.discovery)
run:121, InstanceInfoReplicator (com.netflix.discovery)
run:101, InstanceInfoReplicator$1 (com.netflix.discovery)

server 调用方使用com.netflix.appinfo.InstanceInfo#getAppName作为url参数中的{appName},默认为UNKNOWN。POST body为InstanceInfo。

InstanceInfo实例化于org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.RefreshableEurekaClientConfiguration#eurekaApplicationInfoManager,字段信息来源于org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean配置类。
而这个配置类又实例化于org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration#eurekaInstanceConfigBean
EurekaInstanceConfigBean的初始化过程会赋予一些一些关键字段默认值。

  • appName 默认为 UNKNOWN
  • hostInfo 默认来源于org.springframework.cloud.commons.util.InetUtils#findFirstNonLoopbackHostInfo,其中使用java.net.NetworkInterface#getNetworkInterfaces方法获取所有网卡信息。
  • ipAddress 默认为hostInfo.getIpAddress()
  • hostname 默认为hostInfo.getHostname()
  • instanceId 默认为{hostname}:{port}
    之后这些字段会被人工配置值覆盖掉。
  1. server 接收方
    handle method
    首先根据路径/eureka/apps/{appName}匹配至方法com.netflix.eureka.resources.ApplicationsResource#getApplicationResource
    该方法实例化了一个com.netflix.eureka.resources.ApplicationResource对象并返回,然后继续匹配,匹配至该对象的方法com.netflix.eureka.resources.ApplicationResource#addInstance
    注册方法
    org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register()
    注册到 <appName, <instanceId, Lease<InstanceInfo>>> 嵌套的map中。
    注册成功后responseCache.invalidate()重置缓存,此缓存曾在上面的 InstanceRegistry 一节中提到。
    然后执行replicateToPeers(),将信息同步至集群,使用类似下面的请求来同步信息。
    http://server-peer0:8080/eureka/peerreplication/batch/

/eureka/apps/{appName}/{instanceId}?status={status}&lastDirtyTimestamp={lastDirtyTimestamp}

心跳信息
http method: PUT

  1. server 调用方
sendHeartBeat:103, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
.....
sendHeartBeat:89, EurekaHttpClientDecorator (com.netflix.discovery.shared.transport.decorator)
renew:893, DiscoveryClient (com.netflix.discovery)
run:1457, DiscoveryClient$HeartbeatThread (com.netflix.discovery)

该请求没有http body

  1. server 接收方
    依次匹配
    com.netflix.eureka.resources.ApplicationsResource#getApplicationResource
    com.netflix.eureka.resources.ApplicationResource#getInstanceInfo
    com.netflix.eureka.resources.InstanceResource#renewLease
    刷新存活时间,同步至集群

/eureka/apps/delta

增量更新
http method: GET

  1. server 调用方
getApplicationsInternal:189, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
getDelta:172, AbstractJerseyEurekaHttpClient (com.netflix.discovery.shared.transport.jersey)
......
getDelta:149, EurekaHttpClientDecorator (com.netflix.discovery.shared.transport.decorator)
getAndUpdateDelta:1135, DiscoveryClient (com.netflix.discovery)
fetchRegistry:1016, DiscoveryClient (com.netflix.discovery)
refreshRegistry:1531, DiscoveryClient (com.netflix.discovery)
run:1498, DiscoveryClient$CacheRefreshThread (com.netflix.discovery)

此请求的返回值和/eureka/apps/{appName}请求一样,是一个com.netflix.discovery.shared.Applications。然后通过com.netflix.discovery.DiscoveryClient#updateDelta方法将增量的变化信息存储至com.netflix.discovery.DiscoveryClient#localRegionApps中。

  1. server 接收方
    com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential
    该方法使用(entityType:Application, entityName:ALL_APPS_DELTA,等)构造com.netflix.eureka.registry.Key,通过这个key在responseCache中查找缓存值。缓存的相关信息见上面的InstanceRegistry一节。
    如果没有缓存,使用com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltass生成返回值并缓存。

Client

通过上文对 Server 的流程可知,Eureka Client 节点的主要逻辑均在于com.netflix.discovery.DiscoveryClient类,集群内的节点信息存储于localRegionApps属性中,使用各种get方法获取。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352