玩转Eureka

Register - 注册

Eureka Instance注册的REST入口在com.netflix.eureka.resources.ApplicationResource#addInstance

/**    * Registers information about a particular instance for an    * {@linkcom.netflix.discovery.shared.Application}.    *    *@paraminfo    *            {@linkInstanceInfo} information of the instance.    *@paramisReplication    *            a header parameter containing information whether this is    *            replicated from other nodes.    */@POST@Consumes({"application/json","application/xml"})publicResponseaddInstance(InstanceInfo info,

                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION)String isReplication){        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// *********** 字段校验 ************// validate that the instanceinfo contains all the necessary required fieldsif(isBlank(info.getId())) {returnResponse.status(400).entity("Missing instanceId").build();        }elseif(isBlank(info.getHostName())) {returnResponse.status(400).entity("Missing hostname").build();        }elseif(isBlank(info.getAppName())) {returnResponse.status(400).entity("Missing appName").build();        }elseif(!appName.equals(info.getAppName())) {returnResponse.status(400).entity("Mismatched appName, expecting "+ appName +" but was "+ info.getAppName()).build();        }elseif(info.getDataCenterInfo() ==null) {returnResponse.status(400).entity("Missing dataCenterInfo").build();        }elseif(info.getDataCenterInfo().getName() ==null) {returnResponse.status(400).entity("Missing dataCenterInfo Name").build();        }// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();// 仅当DataCenterInfo为AmazonInfo实例的时候,其父类有可能是UniqueIdentifierif(dataCenterInfoinstanceofUniqueIdentifier) {// ......}// *********** 字段校验 END ************registry.register(info,"true".equals(isReplication));// (1)returnResponse.status(204).build();// 204 to be backwards compatible}

真正的注册操作在(1)处,需要注意的是isReplication变量取决于HTTP头x-netflix-discovery-replication的值。继续追踪(1)的调用栈,发现执行注册操作的方法是是com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

注意该方法的javadoc,他告诉了我们一个比较重要的讯息:将InstanceInfo实例信息注册到Eureka并且复制该信息到其他peer。如果当前收到的注册信息是来自其他peer的复制事件,那么将不会将这个注册信息继续复制到其他peer,这个标志位就是上面所述的isReplication。

/**    * Registers the information about the {@linkInstanceInfo} and replicates    * this information to all peer eureka nodes. If this is replication event    * from other replica nodes then it is not replicated.    *    *@paraminfo    *            the {@linkInstanceInfo} to be registered and replicated.    *@paramisReplication    *            true if this is a replication event from other replica nodes,    *            false otherwise.    */@Overridepublicvoidregister(finalInstanceInfo info,finalbooleanisReplication){// 默认租约有效时长为90sintleaseDuration = Lease.DEFAULT_DURATION_IN_SECS;// 注册信息里包含则依照注册信息的租约时长if(info.getLeaseInfo() !=null&& info.getLeaseInfo().getDurationInSecs() >0) {            leaseDuration = info.getLeaseInfo().getDurationInSecs();        }// super为AbstractInstanceRegistrysuper.register(info, leaseDuration, isReplication);// 复制到其他peerreplicateToPeers(Action.Register, info.getAppName(), info.getId(), info,null, isReplication);    }

我们看到是先获取到租约的有效时长,然后才是真真正正地委托给super执行注册操作super.register(...)并将注册信息复制到其他peer。register方法非常长,我们重点观察一下他的注册表的结构:

privatefinalConcurrentHashMap>> registry

该注册表是一个以app name为key(在Spring Cloud里就是spring.application.name),嵌套Map为value的ConcurrentHashMap结构。其嵌套Map是以Instance ID为key,Lease对象为value的键值结构。这个registry注册表在Eureka Server或SpringBoot Admin的监控面板上以Eureka Service这个角色出现。

/**    * Registers a new instance with a given duration.    *    *@seecom.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)    */publicvoidregister(InstanceInfo registrant,intleaseDuration,booleanisReplication){try{            read.lock();// 可以看出registry是一个以info的app name为key的Map结构, 也就是以spring.application.name的大写串为keyMap> gMap = registry.get(registrant.getAppName());            REGISTER.increment(isReplication);if(gMap ==null) {finalConcurrentHashMap> gNewMap =newConcurrentHashMap>();                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if(gMap ==null) {                    gMap = gNewMap;                }            }// registry的value的Map结构是以info的id为key,这里的id就是Eureka文档上的Instance ID,给你个例子你就想起是什么东西了:10.8.88.233:config-server:10888Lease existingLease = gMap.get(registrant.getId());// .......Lease lease =newLease(registrant, leaseDuration);if(existingLease !=null) {                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());            }            gMap.put(registrant.getId(), lease);// .......}finally{            read.unlock();        }    }

上面是register(...)中关于registry的大致操作,其中有相当一部分的操作被略去了,如果感兴趣的话可以细致地研究一下。

Renew and Cancel Lease - 续约与取消租约

续约的REST入口在com.netflix.eureka.resources.InstanceResource#renewLease

而取消租约的REST入口在com.netflix.eureka.resources.InstanceResource#cancelLease

两者的基本思想相似,经由InstanceRegistry->AbstractInstanceRegistry->PeerAwareInstanceRegistryImpl,其中PeerAwareInstanceRegistryImpl装饰了添加复制信息到其他节点的功能。其中register、renew、cancel、statusUpdate和deleteStatusOverride都会将其信息复制到其他节点。

Fetch Registry - 获取注册信息

获取所有Eureka Instance的注册信息,com.netflix.eureka.resources.ApplicationsResource#getContainers,其注册信息由ResponseCacheImpl缓存,缓存的过期时间在其构造函数中由EurekaServerConfig.getResponseCacheUpdateIntervalMs()所控制,默认缓存时间为30s。而差量注册信息在Server端会保存得更为长一些(大约3分钟),因此获取的差量可能会重复返回相同的实例。Eureka Client会自动处理这些重复信息。

Evcition

Eureke Server定期进行失效节点的清理,执行该任务的定时器被定义在com.netflix.eureka.registry.AbstractInstanceRegistry#evictionTimer,真正的任务是由他的内部类AbstractInstanceRegistry#EvictionTask所执行,默认为每60s执行一次清理任务,其执行间隔由EurekaServerConfig#getEvictionIntervalTimerInMs[eureka.server.eviction-interval-timer-in-ms]所决定。

回顾一下上面刚说完的注册流程,在PeerAwareInstanceRegistryImpl#register里面特别指出了默认的租约时长为90s[eureka.Instance.lease-expiration-duration-in-seconds],即如果90s后都没有收到特定的Eureka Instance的Heartbeats,则会认为这个Instance已经失效(Instance在正常情况下默认每隔30s发送一个Heartbeats[eureka.Instance.lease-renewal-interval-in-seconds],对以上两个默认值有疑问的可以翻阅LeaseInfo),EvictionTask则会把这个Instance纳入清理的范围。我们看看EvictionTask的清理代码是怎么写的。

publicvoidevict(longadditionalLeaseMs){        logger.debug("Running the evict task");if(!isLeaseExpirationEnabled()) {            logger.debug("DS: lease expiration is currently disabled.");return;        }// We collect first all expired items, to evict them in random order. For large eviction sets,// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,// the impact should be evenly distributed across all applications.// (2) 下面的for循环就是把registry中所有的Lease提取到局部变量expiredLeasesList> expiredLeases =newArrayList<>();for(Entry>> groupEntry : registry.entrySet()) {            Map> leaseMap = groupEntry.getValue();if(leaseMap !=null) {for(Entry> leaseEntry : leaseMap.entrySet()) {                    Lease lease = leaseEntry.getValue();if(lease.isExpired(additionalLeaseMs) && lease.getHolder() !=null) {                        expiredLeases.add(lease);                    }                }            }        }// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for// triggering self-preservation. Without that we would wipe out full registry.intregistrySize = (int) getLocalRegistrySize();intregistrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());// (3)intevictionLimit = registrySize - registrySizeThreshold;inttoEvict = Math.min(expiredLeases.size(), evictionLimit);if(toEvict >0) {            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);            Random random =newRandom(System.currentTimeMillis());for(inti =0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)intnext = i + random.nextInt(expiredLeases.size() - i);                Collections.swap(expiredLeases, i, next);                Lease lease = expiredLeases.get(i);                String appName = lease.getHolder().getAppName();                String id = lease.getHolder().getId();                EXPIRED.increment();                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);                internalCancel(appName, id,false);            }        }    }

在(2)中把本地的registry中的租约信息全部提取出来,并在(3)通过serverConfig.getRenewalPercentThreshold()[eureka.server.renewal-percent-threshold,默认85%]计算出一个最大可剔除的阈值evictionLimit。

新增Peer Node时的初始化

在有多个Eureka Server的情况下,每个Eureka Server之间是如何发现对方的呢?

通过调试之后,我们根据调用链从下往上追溯,其初始入口为org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized

publicvoidcontextInitialized(ServletContext context){try{            initEurekaEnvironment();            initEurekaServerContext();// (4)context.setAttribute(EurekaServerContext.class.getName(),this.serverContext);        }catch(Throwable e) {            log.error("Cannot bootstrap eureka server :", e);thrownewRuntimeException("Cannot bootstrap eureka server :", e);        }    }

由下个入口(4)最终可以定位到方法com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp,从对应的javadoc上我们可以知道该方法从peer eureka节点往自己填充注册表信息。 如果操作失败则此同步操作将failover到其他节点,直到遍历完列表(service urls)为止。该方法与普通的Eureka Client注册到Eureka Server不同的一点是,其标志位isReplication为true,如果不记得这是什么作用的话可以翻阅到上面的Register - 注册小节。

Peer Node信息的定时更新

首先我们看Eureka Server的上下文实体中的方法com.netflix.eureka.DefaultEurekaServerContext#initialize

@PostConstruct@Overridepublicvoidinitialize()throwsException{        logger.info("Initializing ...");        peerEurekaNodes.start();// (5)registry.init(peerEurekaNodes);        logger.info("Initialized");    }

该方法明确指出这是一个Spring Bean,在构建Bean完成后执行此方法,继续追踪(5)。

publicvoidstart(){        taskExecutor = Executors.newSingleThreadScheduledExecutor(newThreadFactory() {@OverridepublicThreadnewThread(Runnable r){                        Thread thread =newThread(r,"Eureka-PeerNodesUpdater");                        thread.setDaemon(true);returnthread;                    }                }        );try{            updatePeerEurekaNodes(resolvePeerUrls());// (6)Runnable peersUpdateTask =newRunnable() {// (7)@Overridepublicvoidrun(){try{                        updatePeerEurekaNodes(resolvePeerUrls());// (6)}catch(Throwable e) {                        logger.error("Cannot update the replica Nodes", e);                    }                }            };            taskExecutor.scheduleWithFixedDelay(                    peersUpdateTask,// (7)serverConfig.getPeerEurekaNodesUpdateIntervalMs(),// (8)serverConfig.getPeerEurekaNodesUpdateIntervalMs(),                    TimeUnit.MILLISECONDS            );        }catch(Exception e) {thrownewIllegalStateException(e);        }for(PeerEurekaNode node : peerEurekaNodes) {            logger.info("Replica node URL:  "+ node.getServiceUrl());        }    }

上面这段代码很清晰地告诉我们在启动Eureka Server的时候就会调用updatePeerEurekaNodes(...)更新peer的状态,并封装为一个Runnable进行周期性更新。这个定时时间由serverConfig.getPeerEurekaNodesUpdateIntervalMs()[eureka.server.peer-eureka-nodes-update-interval-ms]所控制,默认值为600s,即10min。一直经由EndpointUtils#getDiscoveryServiceUrls、EndpointUtils#getServiceUrlsFromConfig至EurekaClientConfigBean#getEurekaServerServiceUrls获得对应zone的service urls,如有需要可以覆盖上述getEurekaServerServiceUrls方法以动态获取service urls,而不是选择Spring Cloud默认从properties文件读取。

Self Preservation - 自我保护

当新增Eureka Server时,他会先尝试从其他Peer上获取所有Eureka Instance的注册信息。如果在获取时出现问题,该Eureka Server会在放弃之前尝试在其他Peer上获取注册信息。如果这个Eureka Server成功获取到所有Instance的注册信息,那么他就会根据所获取到的注册信息设置应该接收到的续约阈值。如果在任何时候续约的阈值低于所设定的值(在15分钟[eureka.server.renewal-threshold-update-interval-ms]内低于85%[eureka.server.renewal-percent-threshold]),则该Eureka Server会出于保护当前注册列表的目的而停止将任何Instance进行过期处理。

在Netflix中上述保护措施被成为自我保护模式,主要是用于Eureka Server与Eureka Client存在网络分区情况下的场景。在这种情况下,Eureka Server尝试保护其已有的实例信息,但如果出现大规模的网络分区时,相应的Eureka Client会获取到大量无法响应的服务。所以,Eureka Client必须确保对于一些不存在或者无法响应的Eureka Instance具备更加弹性的应对策略,例如快速超时并尝试其他实例。

在网络分区出现时可能会发生以下几种情况:

Peer之间的心跳可能会失败,某Eureka Server检测到这种情况并为了保护当前的注册列表而进入了自我保护模式。新的注册可能发生在某些孤立的Eureka Server上,某些Eureka Client可能会拥有新的注册列表,而另外一些则可能没有(不同的实例视图)。

当网络恢复到稳定状态后,Eureka Server会进行自我修复。当Peer能正常通信之后注册信息会被重新同步。

最重要的一点是,在网络中断期间Eureka Server应该更距弹性,但在这段期间Eureka Client可能会有不同的实例视图。

作者:Chrisdon

链接:https://www.jianshu.com/p/4e43acbad7ae

来源:简书

简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容