SpringCloud源码解析 -- Eureka原理探究

本文通过阅读Eureka源码,分享Eureka的实现原理。
本文主要梳理Eureka整体设计及实现,并不一一列举Eureka源码细节。

源码分析基于Spring Cloud Hoxton,Eureka版本为1.9

Eureka分为Eureka Client,Eureka Server,多个Eureka Server节点组成一个Eureka集群,服务通过Eureka Client注册到Eureka Server。


CAP理论指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。
由于分布式系统中必须保证分区容错性,因此我们只能在A和C之间进行权衡。
Zookeeper保证的是CP, 而Eureka则是保证AP。
为什么呢?
在注册中心这种场景中,可用性比一致性更重要。
作为注册中心,其实数据是不经常变更的,只有服务发布,机器上下线,服务扩缩容时才变更。
因此Eureka选择AP,即使出问题了,也返回旧数据,保证服务能(最大程度)正常调用, 避免出现因为注册中心的问题导致服务不可用这种得不偿失的情况。
所以,Eureka各个节点都是平等的(去中心化的架构,无master/slave区分),挂掉的节点不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。

Eureka Client

Eureka 1.9只要引入spring-cloud-starter-netflix-eureka-client依赖,即使不使用@EnableDiscoveryClient或@EnableEurekaClient注解,服务也会注册到Eureka集群。

client主要逻辑在com.netflix.discovery.DiscoveryClient实现,EurekaClientAutoConfiguration中构建了其子类CloudEurekaClient。

定时任务

DiscoveryClient#initScheduledTasks方法设置定时任务,主要有CacheRefreshThread,HeartbeatThread,以及InstanceInfoReplicator。

同步

服务注册信息缓存在DiscoveryClient#localRegionApps变量中,CacheRefreshThread负责定时从Eureka Server读取最新的服务注册信息,更新到本地缓存。
CacheRefreshThread -> DiscoveryClient#refreshRegistry -> DiscoveryClient#fetchRegistry
当存在多个Eureka Server节点时,Client会与eureka.client.serviceUrl.defaultZone配置的第一个Server节点同步数据,当第一个Server节点同步失败,才会同步第二个节点,以此类推。

从DiscoveryClient#fetchRegistry可以看到,同步数据有两个方法
(1)全量同步
由DiscoveryClient#getAndStoreFullRegistry方法实现,通过Http Get调用Server接口apps/
获取Server节点中所有服务注册信息替换DiscoveryClient#localRegionApps

注意:Client请求Server端的服务,都是通过EurekaHttpClient接口发起,该接口实现类EurekaHttpClientDecorator通过RequestExecutor接口将请求委托给其他EurekaHttpClient实现类,并提供execute方法给子类实现扩展处理(该扩展处理可以针对每一个EurekaHttpClient方法,类似AOP)。子类RetryableEurekaHttpClient#execute中,会获取eureka.client.service-url.defaultZone中配置的地址,通过TransportClientFactory#newClient,构造一个RestTemplateTransportClientFactory,再真正发起请求。

(2)增量同步
由DiscoveryClient#getAndUpdateDelta方法实现,通过Http Get调用Server接口apps/delta,获取最新ADDED、MODIFIED,DELETED操作,更新本地缓存。
如果获取最新操作失败,则会发起全量同步。

配置:
eureka.client.fetch-registry,是否定时同步信息,默认true
eureka.client.registry-fetch-interval-seconds,间隔多少秒同步一次服务注册信息,默认30

心跳

HeartbeatThread -> DiscoveryClient#renew -> EurekaHttpClient#sendHeartBeat
通过Http Put调用Server接口apps/{appName}/{instanceId}
appName是服务的spring.application.name,instanceId是服务IP加服务端口。

注意:如果Server返回NOT_FOUND状态,则重新注册。

配置:
eureka.client.register-with-eureka,当前应用是否注册到Eureka集群,默认true
eureka.instance.lease-renewal-interval-in-seconds,间隔多少秒发送一次心跳,默认30

注册

DiscoveryClient#构造函数 -> DiscoveryClient#register
通过Http Post调用Server接口apps/{appName},发送当前应用的注册信息到Server。
配置:
eureka.client.register-with-eureka,当前应用是否注册到Eureka集群,默认true
eureka.client.should-enforce-registration-at-init,是否在初始化时注册,默认false

InstanceInfoReplicator

InstanceInfoReplicator任务会去监测应用自身的IP信息以及配置信息是否发生改变,如果发生改变,则会重新发起注册。
配置:
eureka.client.initial-instance-info-replication-interval-seconds,间隔多少秒检查一次自身信息,默认40

下线

EurekaClientAutoConfiguration配置了CloudEurekaClient的销毁方法

@Bean(destroyMethod = "shutdown")

DiscoveryClient#shutdown方法完成下线的处理工作,包括取消定时任务,调用unregister方法(通过Http Delete调用Server接口apps/{appName}/{id}),取消监控任务等

Eureka Server

@EnableEurekaServer引入EurekaServerMarkerConfiguration,EurekaServerMarkerConfiguration构建EurekaServerMarkerConfiguration.Marker。
EurekaServerAutoConfiguration会在Spring上下文中存在EurekaServerMarkerConfiguration.Marker时生效,构造Server端组件类。

Eureka Server也要使用DiscoveryClient,拉取其他Server节点的服务注册信息或者将自身注册到Eureka集群中。

启动同步

Server启动时,需要从相邻Server节点获取服务注册信息,同步到自身内存。

Server的服务注册信息存放在AbstractInstanceRegistry#registry变量中,类型为ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>。
外层Map Key为appName,外层Map Key为instanceId,Lease代表Client与Server之间维持的一个契约。InstanceInfo保存具体的服务注册信息,如instanceId,appName,ipAddr,port等。

EurekaServerBootstrap是Server端的启动引导类,EurekaServerInitializerConfiguration实现了Lifecycle接口,start方法调用eurekaServerBootstrap.contextInitialized完成Server端初始化。
eurekaServerBootstrap.contextInitialized -> EurekaServerBootstrap#initEurekaServerContext -> PeerAwareInstanceRegistryImpl#syncUp -> AbstractInstanceRegistry#register
PeerAwareInstanceRegistryImpl#syncUp调用DiscoveryClient#getApplications方法,获取相邻server节点的所有服务注册信息,再调用AbstractInstanceRegistry#register方法,注册到AbstractInstanceRegistry#registry变量中。

AbstractInstanceRegistry#register

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        ...
        // #1
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());   
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            ...
            // #2
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {  
                registrant = existingLease.getHolder();
            }
        } else {
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    // #3
                    updateRenewsPerMinThreshold();  
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // #4
        gMap.put(registrant.getId(), lease);    
        ...
        registrant.setActionType(ActionType.ADDED);
        // #5
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));   
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}

#1 通过appName,instanceId查询已有的Lease
#2 如果该服务已存在Lease,并且LastDirtyTimestamp的值更大,使用已存在的Lease。
#3 更新numberOfRenewsPerMinThreshold,该值用于自我保护模式。
#4 构建一个新的Lease,添加到AbstractInstanceRegistry#registry缓存中。
#5 添加recentlyChangedQueue,apps/delta接口从中获取最新变更操作。

提供服务

Server通过ApplicationsResource/ApplicationResource/InstanceResource对外提供Http服务。

AbstractInstanceRegistry负责实现cancle,register,renew,statusUpdate,deleteStatusOverride等操作的业务逻辑。
PeerAwareInstanceRegistryImpl通过replicateToPeers方法将操作同步到其他节点,以保证集群节点数据同步。
PeerAwareInstanceRegistryImpl#replicateToPeers方法最后一个参数isReplication,决定是否需要进行同步。
如果Server节点接收到其他Server节点发送的同步操作,是不需要再继续向其他Server同步的,否则会引起循环更新。
该参数通过Http Requst的Header参数x-netflix-discovery-replication决定(只有Client发送的请求该参数才为true)。

数据一致

PeerAwareInstanceRegistryImpl#replicateToPeers方法通过PeerEurekaNodes#getPeerEurekaNodes获取其他server节点地址,
PeerEurekaNodes#peerEurekaNodes变量维护了所有的Server节点信息。

PeerEurekaNodes通过peersUpdateTask任务定时从DNS或配置文件获取最新的Server节点地址列表,并更新PeerEurekaNodes#peerEurekaNodes。
配置:
eureka.server.peer-eureka-nodes-update-interval-ms,间隔多少分钟拉取一次Server节点地址列表,默认10

PeerEurekaNode管理具体一个Server节点,并负责向该Server节点同步register,cancel,heartbeat等操作。
PeerEurekaNode通过定时任务的方式同步这些操作。它维护了两个TaskDispatcher,批处理调度器batchingDispatcher和非批处理调度器nonBatchingDispatcher。
PeerEurekaNode#构造方法调用TaskDispatchers#createBatchingTaskDispatcher构造TaskDispatcher

public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                         int maxBufferSize,
                                                                         int workloadSize,
                                                                         int workerCount,
                                                                         long maxBatchingDelay,
                                                                         long congestionRetryDelayMs,
                                                                         long networkFailureRetryMs,
                                                                         TaskProcessor<T> taskProcessor) {
    final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
            id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
    );
    final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
    return new TaskDispatcher<ID, T>() {
        public void process(ID id, T task, long expiryTime) {
            acceptorExecutor.process(id, task, expiryTime);
        }

        public void shutdown() {
            acceptorExecutor.shutdown();
            taskExecutor.shutdown();
        }
    };
}

TaskDispatcher负责任务分发,过期任务会被抛弃,如果两个任务有相同id,则前一个任务则会被删除。
AcceptorExecutor负责整合任务,将任务放入批次中。
TaskExecutors将整合好的任务(批次)分给TaskProcessor处理,实际处理任务的是ReplicationTaskProcessor。
ReplicationTaskProcessor可以重复执行失败的任务,ReplicationTaskProcessor#process(List<ReplicationTask> tasks)处理批次任务,将tasks合并到一个请求,发送到下游Server接口peerreplication/batch/
任务类为ReplicationTask,它提供了handleFailure方法,当下游Server接口返回statusCode不在[200,300)区间,则调用该方法。

从TaskExecutors#BatchWorkerRunnable的run方法可以看到,
调用下游Server接口时,如果下游返回503状态或发生IO异常,会通过taskDispatcher.reprocess重新执行任务,以保证最终一致性。
如果发生其他异常,只打印日志,不重复执行任务。

配置:
eureka.server.max-elements-in-peer-replication-pool,等待执行任务最大数量,默认为10000

需要注意一下PeerEurekaNode#heartbeat方法,心跳任务实现了handleFailure方法

public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
    super.handleFailure(statusCode, responseEntity);
    if (statusCode == 404) {
        logger.warn("{}: missing entry.", getTaskName());
        if (info != null) {
            logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                    getTaskName(), info.getId(), info.getStatus());
            register(info);
        }
    } 
    ...
}

如果下游server节点没有找到服务注册信息,就返回404状态,这时需要重新注册该服务。这点很重要,它可以保证不同Server节点保持数据一致。

假设有一个client,注册到Eureka集群server1,server2,server3。下面来分析两个场景
场景1. client启动时,server1接收带client的注册信息,但同步给server2前宕机了,怎么办?
这时,client定时发起心跳,但它与server1心跳操作失败,只能向server2发起心跳,server2返回404(NOT_FOUND状态),client重新注册。

场景2. server3与其他机器server1,server2之间出现了网络分区,这时client注册到eureka集群。然后网络恢复了,server3怎么同步数据呢?
当server1向server3同步心跳时,server3返回404,于是server1重新向server3注册client信息,数据最终保持一致。

主动失效

AbstractInstanceRegistry#deltaRetentionTimer任务会定时移除recentlyChangedQueue中过期的增量操作信息
配置:
eureka.server.delta-retention-timer-interval-in-ms,间隔多少秒清理一次过期的增量操作信息,默认30
eureka.server.retention-time-in-m-s-in-delta-queue,增量操作保留多少分钟,默认3

AbstractInstanceRegistry#evictionTimer任务会定时剔除AbstractInstanceRegistry#registry中已经过期的(太久没收到心跳)服务注册信息。
计算服务失效时间时还要加上补偿时间,即计算本次任务执行的时间和上次任务执行的时间差,若超过eviction-interval-timer-in-ms配置值则加上超出时间差作为补偿时间。
每次剔除服务的数量都有一个上限,为注册服务数量*renewal-percent-threshold,Eureka会随机剔除过期的服务。
配置:
eureka.server.eviction-interval-timer-in-ms,间隔多少秒清理一次过期的服务,默认60
eureka.instance.lease-expiration-duration-in-seconds,间隔多少秒没收到心跳则判定服务过期,默认90
eureka.server.renewal-percent-threshold,自我保护阀值因子,默认0.85

自我保护机制

PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask,定时更新numberOfRenewsPerMinThreshold,该值用于判定是否进入自我保护模式,在自我保护模式下,AbstractInstanceRegistry#evictionTimer任务直接返回,不剔除过期服务。

numberOfRenewsPerMinThreshold计算在PeerAwareInstanceRegistryImpl#updateRenewsPerMinThreshold

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

expectedNumberOfClientsSendingRenews -> 已注册服务总数
60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds() -> expected-client-renewal-interval-seconds配置了Client间隔多少秒发一次心跳,这里计算一个Client每分钟发送心跳数量。
RenewalPercentThreshold 自我保护阀值因子。
可以看到,numberOfRenewsPerMinThreshold表示一分钟内Server接收心跳最低次数,实际数量少于该值则进入自我保护模式。
此时Eureka认为客户端与注册中心出现了网络故障(比如网络故障或频繁的启动关闭客户端),不再剔除任何服务,它要等待网络故障恢复后,再退出自我保护模式。这样可以最大程度保证服务间正常调用。

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled方法判定当前是否处于自我保护模式。该方法比较renewsLastMin中的值是否大于numberOfRenewsPerMinThreshold,AbstractInstanceRegistry#renewsLastMin统计一分钟内心跳次数。
配置:
eureka.server.enable-self-preservation,是否启用自我保护机制,默认为true
eureka.server.expected-client-renewal-interval-seconds,Client间隔多少秒发送一次心跳
eureka.server.renewal-percent-threshold,自我保护阀值因子,默认0.85

状态更新

InstanceInfo维护了状态变量status和覆盖状态变量overriddenStatus。
status是Eureka Client本身发布的状态。
overriddenstatus是手动或通过工具强制执行的状态。
Server端提供服务apps/{appName}/{instanceId}/status,可以变更服务实例status以及overriddenStatus,从而主动变更服务状态。
注意,并不会修改Client端的服务状态,而是修改Server段服务注册信息中保存的服务状态。
而Server处理Client注册或心跳时,会使用overriddenstatus覆盖status。
Eureka Client在获取到注册信息时,会调用DiscoveryClient#shuffleInstances方法,过滤掉非InstanceStatus.UP状态的服务实例,从而避免调动该实例,以达到服务实例的暂停服务,而无需关闭服务实例。

InstanceInfo还维护了lastDirtyTimestamp变量,代表服务注册信息最后更新时间。
从InstanceResource可以看到,更新状态statusUpdate或者删除状态deleteStatusUpdate时都可以提供lastDirtyTimestamp,
而处理心跳的renewLease方法,必须有lastDirtyTimestamp参数,validateDirtyTimestamp方法负责检验lastDirtyTimestamp参数

  1. 当lastDirtyTimestamp参数等于当前注册信息中的lastDirtyTimestamp,返回处理成功。
  2. 当lastDirtyTimestamp参数大于当前注册信息中的lastDirtyTimestamp,返回NOT_FOUND状态,表示Client的信息已经过期,需要重新注册。
  3. 当lastDirtyTimestamp参数小于当前注册信息中的lastDirtyTimestamp,返回CONFLICT(409)状态,表示数据冲突,并返回当前节点中该服务的注册信息。
    这时如果心跳是Client发起的,Client会忽略409的返回状态(DiscoveryClient#renew),但如果是其他Server节点同步过来的,发送心跳的Server节点会使用返回的服务注册信息更新本节点的注册信息(PeerEurekaNode#heartbeat)。

配置:
eureka.client.filter-only-up-instances,获取实例时是否只保留UP状态的实例,默认为true
eureka.server.sync-when-timestamp-differs,当时间戳不一致时,是否进行同步数据,默认为true

文本关于Eureka的分享就到这里,我们可以Eureka设计和实现都比较简单,但是非常实用。
我在深入阅读Eureka源码前犹豫了一段时间(毕竟Eureka 2.0 开源流产),不过经过一段时间深入学习,收获不少,希望这篇文章也可以给对Eureka感兴趣的同学提供一个深入学习思路。

如果您觉得本文不错,欢迎关注我的微信公众号,您的关注是我坚持的动力!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351