上一篇文章中,我们讨论了 Client,关于 Client 的注册,心跳(续约),远程调用等,我们都对源码做了分析,今天就对 Server 的源码做一个分析。
0. Server 有哪些功能
- 提供服务注册功能。
- 消费者可以获取服务列表。
- 服务可以续约。
- Server 集群之间的数据共享。
1. 提供服务注册功能
我们使用 EurekaServer 的时候,需要在启动类上加入 @EnableEurekaServer
注解,这个注解肯定就是我们研究的入口。
该注解注释说道:
Annotation to activate Eureka Server related configuration {@link EurekaServerAutoConfiguration}
通过EurekaServerAutoConfiguration
激活 Eureka 相关的配置。
进入该类查看,该类可以说是一个 config
配置类,不是能很直观的找到入口。
我们可以通过日志找到入口,怎么找呢?当 Client 启动的时候,就会尝试向注册中心注册,而注册中心在注册成功之后,就会打印日志。通过日志,我们搜索到注册事件发生在 ApplicationResource
的 addInstance
方法中。而 Client
的请求 requestUri
则是 POST http://localhost:8761/eureka/apps/SERVICE-HI HTTP/1.1
(我的测试 demo)。
这个ApplicationResource
类的注释说这个类是用于 处理特定请求相关的资源
。该类主要的方法就是addInstance
方法。
关键代码如下:
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// validate that the instanceinfo contains all the necessary required fields
// 校验数据是否正确。。。。
// handle cases where clients may be registering with bad DataCenterInfo with missing data
// 处理客户端数据丢失的情况。。。。
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
参数的 InstanceInfo
对象就是服务实例的具体信息,比如 ip
, 实例名称,端口,更新时间,心跳 url
,控制页面的 url 等。isReplication
参数作用则是判断这是否是集群之间的复制。防止重复注册。
最后调用register.register
方法并返回 204.
通过跟踪代码,发现register
的关键代码在PeerAwareInstanceRegistryImpl
类的 register
方法中。该代码注释写道:注册这个节点信息,并传播给所有其他同伴节点。但如果这是一个复制事件的话,则不会进行传播。
代码如下:
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
代码中,可以看到默认的续约时间是 90 秒,然后调用父类的register
方法,最后尝试复制到其他节点(需要判断isReplication
字段)。
抽象父类 AbstractInstanceRegistry
的 register
方法很长,简单说说逻辑。
这个类将所有的实例保存在一个双重的map
中(ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
),也就是每个App
对应一个 Map
,Map
中存储着多个实例。
我们回到replicateToPeers
方法中,如果不是复制事件,EurekaServer
会将该实例传播到自己的所有同伴中去,通常这些同伴就是配置文件中写入的。反之,如果是复制事件,则直接 return
,也就是说,Eureka
的复制事件是不会有蝴蝶效应的,这有助于控制事件的传播范围。
而这些节点也是需要定时更新的。默认 10 分钟更新一次。通过调用 PeerEurekaNodes
的 updatePeerEurekaNodes
方法进行更新。
到这里,关于 EurekaServer
服务注册的功能就基本差不多了。
2. 消费者可以获取服务列表
从前面的注册过程中,我们知道,Client
请求的路径是 /apps
,我们使用 IDEA 的 Java Enterprise
功能,查看该路径下的方法:
根据上图中的箭头,便可找到这个对应URL
的方法 ApplicationsResource#getContainers()
,当然你也可以使用全局搜索。
该方法会从缓存中取出所有的实例,返回给客户端。
这个缓存的实现是一个 ResponseCacheImpl
类,内部使用 Map
保存实例,而不是像 ZK 使用树形结构保存。该类在构造的时候,就会创建一个定时任务,任务内容则是执行 getCacheUpdateTask
方法,用于更新缓存,默认 30 秒更新一次。
3. 服务可以续约
续约其实就是心跳,客户端需要向Server
发送心跳证明自己没有发生故障。而客户端执行此任务的就是DiscoveryClient
的 内部类HeartbeatThread
,从名字上就可以看出,这是一个心跳任务,内部 run
方法执行的就是外部类的 renew
方法。
我们层层追踪,在 JerseyApplicationClient
的 sendHeartBeat
方法中,看到心跳的路径是 http://localhost:8761/eureka/apps/SERVICE-FEIGN/localhost:service-feign:8765?status=UP&lastDirtyTimestamp=1522984522100
,同时,这个请求也是个 PUT 请求。那么在 Server 端接收的方法是什么呢?什么方法参数对应这个呢?用 Java Enterprise 没有搜到,然后通过全局搜索:
InstanceResource
类的 renewLease
方法,该方法注释如下:A put request for renewing lease from a client instance。
该方法有几个参数,其中的isReplication
参数需要注意,其实和前面的 isReplication
一样,不论是注册还是更新,都会在集群之间尽心复制,而复制的手段就是调用相同的接口,通过isReplication
参数进去区分。如果是复制的话,就不再继续进行传播了。
最后返回一个 Response.ok().build()
ResponseImpl
对象.
4. Server 集群之间的数据共享
我相信通过前面代码的阅读,大家应该对 EurekaServer 集群之间的复制都有了一点印象了,那我们再 继续看看这一块的逻辑。
当有注册事件或者心跳事件时,都会对集群中的同伴节点进行传播。
注册事件的主要方法是 PeerAwareInstanceRegistryImpl 类的 register 方法,注册成功之后会执行 replicateToPeers 方法,这个就是 EurekaServer 之间的数据共享了。其中 isReplication 决定是否复制到别的节点。如果是别的 Server 的复制请求的话,则停止复制的蔓延。代码如下:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
// 复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
而续约事件在 PeerAwareInstanceRegistryImpl 类的 renew 方法中也存在调用共享数据的行为。代码如下:
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
// 复制
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
同时,集群中的节点也是会定时更新的,PeerEurekaNodes 的 start 方法会创建一个定时任务,默认的时间是 10 分钟,代码如下:
public void start() {
....
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
总结
关于 EurekaServer 的主要 4 个功能就大致这么多,总结下。
Server 给 Client 提供注册功能,所有实例保存在一个双重 Map 中,注册后,Server 会将数据复制到配置的同伴节点。
Client 从 Server 获取服务列表的是通过 Http 请求 Server 的 ApplicationsResource#getContainers() 方法获取所有的服务列表,而这个服务列表的更新时间是 30 秒,因此会有一些延迟。
服务的续约其实就是心跳,每个 RPC 服务都是需要心跳的。Client 的心跳实现是 HeartbeatThread ,会定时的调用 renew 方法,这个方法会调用 Server 的 InstanceResource#renewLease 方法,同时,这个行为也会复制到配置文件中的其他 Server 节点。
Server 为了高可用,可以做成集群,那么,也就是说,需要进行数据的备份,ZK 始终使用的是一个节点,为了保证 CAP 的 CP,而 Eureka 的设计理念和 ZK 不同,Eureka 认为一致性在注册中心这种服务上优先级不是最高的。于是 Eureka 选择了 AP,保证高可用和容错。在每一次注册事件和续约事件都会进行集群之间的复制,复制过程就是和普通的 Server 调用系统,只不过加入了一个区别参数 isReplication,当时复制事件的时候,就不能继续复制到其他的节点了,这是 Eureka 的设计。
好,能力不高,水平有限,就到这里。good luck!!!!