1.客户端向nacos服务端注册
1.1 客户端注册的地方
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
NacosServiceRegistryAutoConfiguration注册了三个Bean:
- NacosServiceRegistry,register()方法,deregister()方法,根据Registration获取实例Instance的方法
- NacosRegistration,加了@PostConstruct注解的init()设置port、心跳间隔、心跳超时(BeanPostProcessor#postProcessBeforeInitialization()初始化前会调用@PostConstruct注解的init()方法进行设置)。可以设置和获取各种属性:serviceId、host、port、uri等等
- NacosAutoServiceRegistration,包含上面两个Bean,是个ApplicationListener
NacosAutoServiceRegistration是核心,其余两个Bean是为了拆分职能,为该类服务。
NacosAutoServiceRegistration继承自ApplicationListener<WebServerInitializedEvent>,监听的是WebServerInitializedEvent事件。
这个调用时机点:
- refresh()的finishRefresh()
- DefaultLifecycleProcessor#onRefresh
- WebServerStartStopLifecycle#start发布ServletWebServerInitializedEvent事件
然后就调用至NacosAutoServiceRegistration中:
- AbstractAutoServiceRegistration#onApplicationEvent
- bind(event)
- start()
发布InstancePreRegisteredEvent事件;
register();
发布InstanceRegisteredEvent事件;
设置running为true。 - 重点看register
NacosAutoServiceRegistration#register
AbstractAutoServiceRegistration#register - this.serviceRegistry.register(getRegistration());这里的serviceRegistry就是NacosServiceRegistry,getRegistration()返回的就是NacosRegistration。
- 最终来到NacosServiceRegistry#register(Registration)
获取serviceId、group,以及将registration转换成Instance;
namingService.registerInstance(serviceId, group, instance);注册实例; - NacosNamingService#registerInstance()
现在来重点看一下NacosNamingService#registerInstance:
- 如果是临时实例,则beatReactor.addBeatInfo(groupedServiceName, beatInfo);这里添加了一个延时任务BeatTask,心跳的默认间隔是多少?在BeatReactor#buildBeatInfo有指定,是5s。
- serverProxy.registerService(groupedServiceName, groupName, instance);实际就是NamingProxy#registerService。请求路径:/nacos/v1/ns/instance,POST请求
1.2 服务端服务注册的地方
/nacos/v1/ns/instance POST
InstanceController#register
- 从请求中提取出Instance
- serviceManager.registerInstance(namespaceId, serviceName, instance);
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
ServiceManager#registerInstance
- createEmptyService:创建空的Service,将Service放入Map<String, Map<String, Service>> serviceMap中;初始化:延时任务ClientBeatCheckTask;监听服务数据的变化consistencyService.listen()
- addInstance()加锁操作
addIpAddresses()将注册实例加入到对应服务Service中;
consistencyService.put(key, instances)参见下面的DelegateConsistencyServiceImpl#put
DelegateConsistencyServiceImpl#put
- mapConsistencyService(key).put(key, value)
- DistroConsistencyServiceImpl#put
onPut(key, value);将注册实例更新到内存注册表,并发布服务变化事件,通过udp方式将服务变动通知给订阅的客户端。
distroProtocol.sync()同步实例信息到nacos.server集群其他节点。
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
1.3 (服务注册表发生变化时)服务端即时主动推送(UDP)
ServiceManager#registerInstance
-> ServiceManager#addInstance
-> DistroConsistencyServiceImpl#put
-> DistroConsistencyServiceImpl#onPut
-> notifier.addTask(key, DataOperation.CHANGE);放到阻塞队列tasks中
-> Notifier#run从tasks中取出并进行处理handle(pair)
-> Service#onChange
-> Service#updateIPs
将注册实例更新到cluster的ephemeralInstances中。
发布服务变更事件getPushService().serviceChanged(this)。
-> PushService#serviceChanged 事件ServiceChangeEvent
-> PushService#onApplicationEvent处理事件ServiceChangeEvent
clientMap.get(namespaceId, serviceName)应该就是订阅该服务的客户端,然后通过udp方式将服务变动通知给订阅的客户端。
问题1.服务是在哪里监听的?
- ServiceManager#registerInstance
- -> ServiceManager#createEmptyService
- -> ServiceManager#createServiceIfAbsent
- -> ServiceManager#putServiceAndInit
consistencyService.listen(
KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true),
service);
consistencyService.listen(
KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false),
service);
问题2.服务是在哪里进行订阅的?
InstanceController#list
-> InstanceController#doSrvIpxt
-> PushService#addClient(xxx)
-> PushService#addClient(PushService.PushClient)
问题3.梳理一下监听订阅机制?
2.客户端拉取服务实例
2.1 客户端
Ribbon
-> ZoneAwareLoadBalancer构造方法
-> 父类DynamicServerListLoadBalancer()构造方法
-> DynamicServerListLoadBalancer#restOfInit
enableAndInitLearnNewServersFeature();
updateListOfServers();
-> NacosServerList#getUpdatedListOfServers
-> NacosNamingService#selectInstances()
-> HostReactor#getServiceInfo
-> HostReactor#updateServiceNow
-> NamingProxy#queryList
/nacos/v1/ns/instance/list,GET操作
2.2 服务端
InstanceController#list
-> InstanceController#doSrvIpxt
1)pushService.addClient() 通过UDP方式推送变更到订阅的客户端
2)service.srvIPs()获取所有持久化实例和临时实例
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
3.心跳机制
3.1 客户端BeatReactor
NacosAutoServiceRegistration父类AbstractAutoServiceRegistration#onApplicationEvent
-> bind()
-> start()
-> NacosServiceRegistry#register
-> NacosNamingService#registerInstance
-> 临时实例 BeatReactor#addBeatInfo
-> executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
-> BeatReactor.BeatTask#run
1)serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);发送心跳
2)executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);递归调用
/nacos/v1/ns/instance/beat,PUT方法
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
InstanceController#beat
1)如果实例不存在,则重新注册(网络不通后者重启导致实例下线后);
2)service.processClientBeat(clientBeat);调用至ClientBeatProcessor#run,找到发送心跳的实例,然后设置实例的lastBeat属性:instance.setLastBeat(System.currentTimeMillis());也有可能更改实例的healthy属性(如果原来为不健康更改为健康,也要推送服务变更事件udp)
3.2 服务端 HealthCheckReactor
InstanceController#register
-> ServiceManager#registerInstance
-> ServiceManager#createEmptyService
-> ServiceManager#putServiceAndInit
-> Service#init
-> HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
-> ClientBeatCheckTask#run
1)getDistroMapper().responsible(service.getName()) 判断本服务器是否负责service的心跳检查工作;
2)如果某个实例超过15s没有收到心跳,则将它的healthy属性设置为false;并推送服务变更getPushService().serviceChanged(service);
3)如果某个实例超过30s没有收到心跳,则直接剔除该实例,deleteIp(instance);会发起一个http调用。
public boolean responsible(String serviceName) {
final List<String> servers = healthyList;
if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
return true;
}
if (CollectionUtils.isEmpty(servers)) {
// means distro config is not ready yet
return false;
}
int index = servers.indexOf(EnvUtil.getLocalAddress());
int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true;
}
int target = distroHash(serviceName) % servers.size();
return target >= index && target <= lastIndex;
}
4.AP集群架构
4.1 数据同步机制
服务端注册、以及下线超时实例都会走数据同步机制。
InstanceController#register
-> ServiceManager#registerInstance
-> addInstance()
-> DelegateConsistencyServiceImpl#put
-> DistroConsistencyServiceImpl#put
-> DistroProtocol#sync()
ClientBeatCheckTask#run
-> deleteIp(instance)
-> 服务端InstanceController#deregister
-> ServiceManager#removeInstance()
-> DelegateConsistencyServiceImpl#put
-> DistroConsistencyServiceImpl#put
-> DistroProtocol#sync()
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
DistroProtocol#sync()
-> 会给除了自己以外的所有服务器同步
-> distroTaskEngineHolder.getDelayTaskExecuteEngine(). addTask(distroKeyWithTarget, distroDelayTask);核心就是tasks.put(key, newTask); 这里任务是DistroDelayTask
----异步处理----
-> NacosDelayTaskExecuteEngine.ProcessRunnable#run
-> processTasks()从tasks中获取任务并进行处理
-> DistroDelayTaskProcessor#process
-> distroTaskEngineHolder.getExecuteWorkersManager(). addTask(distroKey, syncChangeTask);
-> 放到阻塞队列中queue.put(task);
----异步处理----
-> TaskExecuteWorker.InnerWorker#run从queue.take()获取任务并执行task.run(),任务类型是DistroSyncChangeTask
-> DistroSyncChangeTask#run
1)distroComponentHolder.findTransportAgent(type). syncData(distroData, getDistroKey().getTargetServer());最终调用NamingProxy#syncData
2)handleFailedTask();如果同步不成功,则重试
同步方法NamingProxy#syncData:
-> /nacos/v1/ns/distro/datum,PUT方法
-> 其他server处理方法DistroController#onSyncDatum
-> DistroProtocol#onReceive
-> DistroConsistencyServiceImpl#processData()
-> DistroConsistencyServiceImpl#onPut 将同步过来的实例更新到内存注册表(这里怎么避免再次对同步过来的数据进行UDP推送?)
4.2 心跳只会在一台机器上检查
HealthCheckReactor
ClientBeatCheckTask
如果某台机器挂了,其集群机器总数会变化,服务名hash取模就会跟着变化
客户端会连接到集群中所有机器码?
4.3 集群状态信息同步
4.3.1 ServiceReporter
健康状态通过ServiceReporter
-> ServiceManager#init
-> GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
-> ServiceManager.ServiceReporter#run,计算所有namespaceId中所有serviceName的校验和,然后发送给其他server
-> synchronizer.send(server.getAddress(), msg);
-> ServiceStatusSynchronizer#send
/nacos/v1/ns/service/status
其他server处理ServiceController#serviceStatus,如果发现校验和不一样,会向源Server发起请求,更新服务实例的健康状况。
4.3.2 ServerStatusReporter
ServerListManager#init
-> GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
4.3.3 ServerInfoUpdater
ServerListManager#init
-> GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());
4.4 机器启动时从某一台机器同步数据
5.CP架构
5.1 CAP与BASE
C代表Consistency,一致性,是指所有节点在同一时刻的数据是相同的,即更新操作执行结束并响应用户完成后,所有节点存储的数据应该是一样的(强调的是各个节点间的数据一致)。
A代表Availability,可用性,是指系统提供的服务一直处于可用状态,对于用户的请求可即时响应。任何客户端的请求,不管访问哪个节点,都能得到响应数据,但不保证是同一份最新数据,即不保证每个节点给你的数据都是最新的(强调服务可用,但不保证数据的一致)。
P代表Partition Tolerance,分区容错性,是指在分布式系统遇到网络分区的情况下,仍然可以响应用户的请求。网络分区是指因为网络故障导致网络不连通,不同节点分布在不同的子网中(强调不管内部出现什么样的数据同步问题,会一直提供服务)。
只要有网络交互就一定会有延迟和数据丢失,而这种状况必须接受,还必须保证系统不能挂掉。所以P是必须要保证的。
- 当选择一致性C时,如果因为网络分区发生了消息丢失、延迟过高,部分节点无法保证特定信息最新的,这时系统将返回写失败错误,也就是集群拒绝新数据写入。
- 当选择可用性A时,系统始终可用(也即处理客户端的请求),如果发生了分区,一些节点将无法返回最新的特定信息。
BASE可以看出AP的延伸,是对互联网大规模分布式系统的实践总结,强调可用性。
- 基本可用Basically Available,当分布式系统在出现不可预知的故障时,允许损失部分功能的可用性,保障核心功能的可用性。四板斧:流量削峰(错峰处理)、延迟响应(排队等待处理)、体验降级(小图片代替原始图片)、过载保护。
- 最终一致性Eventually consistent,系统中所有的数据副本在经过一段时间的同步后,最终能够达到一个一致的状态。也即在数据的一致性上,存在一个短暂的延迟。
- 软状态Soft State:不同节点间,数据副本存在短暂的不一致,是一种过渡状态。
最终一致性,以什么数据为准?
- 以最新写入的数据为准
- 以第一次写入的数据为准
最终一致性具体的实现方式?
- 读时修复:在读取数据时,检测到数据不一致,进行修复。
- 写时修复:在写入数据时,检测到数据不一致,进行修复。比如不同节点写失败,就将数据缓存下来,然后定时重传,修复数据的不一致性。
- 异步修复:最常用的方式,通过定时对账检测副本数据的一致性,并修复。
在实现最终一致性的时候,推荐同时实现自定义写一致性级别(All、Quorum、One、Any),让用户可以自主选择相应的一致性级别。
5.2 Nacos的CP架构
raft协议:
- Leader选举(ZAB所有节点都需要发起投票,然后PK,raft先发投票的一般不出意外都会成为leader)
1)选举超时时间(150-300ms)
2)超时则变成候选人,发起投票,先投给自己,然后将请求发给其他人
3)其他人收到请求,如果还未超时,则投票给该人并且重置自己的超时时间
4)候选人收到大部分选票,则成为Leader
5)Leader同步Append Entries信息给followers - 日志复制(数据同步):两阶段提交
DelegateConsistencyServiceImpl#put
RaftConsistencyServiceImpl#put
注册信息写文件和内存
配置信息写数据库
这里raft协议并没有两阶段提交,写完日志直接提交了。会发生问题,比如Leader成功提交了但是其他节点抛异常失败了。
心跳同步的数据
从节点:0表示本地有,1表示主节点有
6.Nacos2.X源码剖析
6.1 注册
6.1.1 客户端
跟1.4源码一样,NacosServiceRegistryAutoConfiguration注册了三个Bean:
- NacosServiceRegistry,register()方法,deregister()方法,根据Registration获取实例Instance的方法
- NacosRegistration,加了@PostConstruct注解的init()设置port、心跳间隔、心跳超时(BeanPostProcessor#postProcessBeforeInitialization()初始化前会调用@PostConstruct注解的init()方法进行设置)。可以设置和获取各种属性:serviceId、host、port、uri等等
- NacosAutoServiceRegistration,包含上面两个Bean,是个ApplicationListener
NacosAutoServiceRegistration是核心,其余两个Bean是为了拆分职能,为该类服务。
实例注册的地方:
- WebServerStartStopLifecycle#start发布ServletWebServerInitializedEvent事件
- NacosAutoServiceRegistration监听WebServerInitializedEvent事件
- AbstractAutoServiceRegistration#onApplicationEvent
- bind(event);
- start()
发布InstancePreRegisteredEvent事件;
register();
发布InstanceRegisteredEvent事件;
设置running为true。 - 重点看register
NacosAutoServiceRegistration#register
AbstractAutoServiceRegistration#register - this.serviceRegistry.register(getRegistration());这里的serviceRegistry就是NacosServiceRegistry,getRegistration()返回的就是NacosRegistration。
- 最终来到NacosServiceRegistry#register(Registration)
获取serviceId、group,以及将registration转换成Instance;
namingService.registerInstance(serviceId, group, instance);注册实例; - NacosNamingService#registerInstance()
现在来重点看一下NacosNamingService#registerInstance:
- clientProxy.registerService(serviceName, groupName, instance); 这里clientProxy是NamingClientProxyDelegate
- getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);如果实例是临时的,则使用grpcClientProxy,否则使用httpClientProxy
- NamingGrpcClientProxy#registerService
1)redoService.cacheInstanceForRedo(serviceName, groupName, instance);
2)NamingGrpcClientProxy#doRegisterService
重点来看一下NamingGrpcClientProxy#doRegisterService
- 创建InstanceRequest,REGISTER_INSTANCE
- NamingGrpcClientProxy#requestToServer调用RpcClient#request()发起请求
- NamingGrpcRedoService#instanceRegistered,将registeredInstances中对应的服务设置为已注册
6.1.2 NamingGrpcRedoService
实例注册流程:
- 注册前,redoService.cacheInstanceForRedo(),先将实例放入到registeredInstances
- 注册成功后,redoService.instanceRegistered(),将对应实例设置为已注册:redoData.setRegistered(true);
NamingGrpcRedoService中有一个调度线程池,只有一个单独的线程,每隔3s运行一次RedoScheduledTask。
RedoScheduledTask#run
- 会根据实例的状态筛选出需要操作的实例进行处理。
public void run() {
if (!redoService.isConnected()) {
LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
return;
}
try {
redoForInstances();
redoForSubscribes();
} catch (Exception e) {
LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
}
}
6.1.3 服务端
服务端根据请求InstanceRequest找请求处理器InstanceRequestHandler
- InstanceRequestHandler#handle
- 根据请求类型REGISTER_INSTANCE,InstanceRequestHandler#registerInstance
- EphemeralClientOperationServiceImpl#registerInstance
1)getSingleton()获取单例Service;
2)Client client = clientManager.getClient(clientId);获取客户端在服务端对应的对象信息;
3)instanceInfo = getPublishInfo(instance);客户端Instance转换为服务端的Instance;
4)client.addServiceInstance(singleton, instanceInfo);
4-1)publishers.put(service, instancePublishInfo)将注册实例放到客户端对象的publishers这个map中
4-2)发布客户端变化事件ClientEvent.ClientChangedEvent,向Nacos其他服务器同步数据
5) client.setLastUpdatedTime();更新客户端lastUpdatedTime
6)发布客户端注册事件ClientOperationEvent.ClientRegisterServiceEvent
7)发布实例元数据变化事件MetadataEvent.InstanceMetadataEvent
InstanceRequestHandler#handle
registerInstance() 客户端注册信息都放在了对应的Client里面
NotifyCenter
- 事件1 ClientChangedEvent 集群同步
- 事件2 ClientRegisterServiceEvent -> 发布事件ServiceChangedEvent -> NamingSubscriberServiceV2Impl#onEvent
- 事件3 InstanceMetadataEvent
服务要么全是临时实例,要么是持久实例,不会一部分是临时一部分是持久。这个跟1.X不同。
注册表结构也改变了,1.X是一个大Map,2.X分成了多个数据结构。
gRPC底层类似于Netty,服务端会对每个客户端生成一个Client,也即类似于Netty的SocketChannel。
Nacos1.X一个大Map会产生很多冲突,使用写时复制。
Nacos2.X拆分为了很多Map,并发粒度很小。大大分散了冲突的压力。
NamingSubscriberServiceV2Impl#onEvent:
服务注册会推送到所有订阅该服务的客户端;
服务订阅只会推送到新增的订阅的客户端。
客户端在服务端有两个对象:
- Client
- Connection
6.1.4 事件处理机制
客户端变化事件ClientEvent.ClientChangedEvent处理流程:
NotifyCenter#publishEvent()
-> NotifyCenter#publishEvent()
-> EventPublisher publisher = INSTANCE.publisherMap.get(topic);然后publisher.publish(event);
-> DefaultPublisher#publish放到阻塞队列queue里面
---异步---
-> DefaultPublisher#run
-> openEventHandler();
-> 从阻塞队列queue中获取Event,并进行处理receiveEvent(event);
-> DefaultPublisher#notifySubscriber
-> DistroClientDataProcessor#onEvent
-> DistroClientDataProcessor#syncToAllServer
-> distroProtocol.sync(distroKey, DataOperation.CHANGE)
-> DistroProtocol#syncToTarget向不包括自己的其他所有Nacos Server同步数据
-> distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);核心就是tasks.put(key, newTask); 这里任务是DistroDelayTask
----异步处理----
-> NacosDelayTaskExecuteEngine.ProcessRunnable#run
-> processTasks()从tasks中获取任务并进行处理
-> DistroDelayTaskProcessor#process
-> distroTaskEngineHolder.getExecuteWorkersManager(). addTask(distroKey, syncChangeTask);
-> TaskExecuteWorker#process放到阻塞队列中queue.put(task);
----异步处理----
-> TaskExecuteWorker.InnerWorker#run从queue.take()获取任务并执行task.run(),任务类型是DistroSyncChangeTask
-> DistroSyncChangeTask#run
-> DistroSyncChangeTask#doExecuteWithCallback
-> getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer(), callback);
-> DistroClientTransportAgent#syncData()
-> clusterRpcClientProxy.asyncRequest(),请求类型是DistroDataRequest
服务端处理:
-> DistroDataRequestHandler#handle
-> DistroDataRequestHandler#handleSyncData
-> DistroProtocol#onReceive
1)dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
2)dataProcessor.processData(distroData);
-> DistroClientDataProcessor#processData
-> DistroClientDataProcessor#handlerClientSyncData
1)clientManager.syncClientConnected();同步客户端连接
2)upgradeClient(client, clientSyncData);更新内存注册表数据,发送服务注册事件ClientOperationEvent.ClientRegisterServiceEvent
发布客户端注册事件ClientOperationEvent.ClientRegisterServiceEvent
-> 前面都一样NotifyCenter -> DefaultPublisher -> ClientServiceIndexesManager
-> ClientServiceIndexesManager#onEvent
-> ClientServiceIndexesManager#handleClientOperation
-> ClientServiceIndexesManager#addPublisherIndexes
1)publisherIndexes.get(service).add(clientId);新增服务对应的客户端id
2)发布事件ServiceEvent.ServiceChangedEvent
-> 事件ServiceEvent.ServiceChangedEvent处理过程
-> 前面都一样NotifyCenter -> DefaultPublisher -> NamingSubscriberServiceV2Impl
-> NamingSubscriberServiceV2Impl#onEvent
-> delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
---异步处理:推送到当前服务的所有订阅者----
-> NacosDelayTaskExecuteEngine队列tasks -> PushDelayTaskExecuteEngine.PushDelayTaskProcessor队列queue -> PushExecuteTask#run
-> PushExecuteTask#run
-> delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
-> PushExecutorRpcImpl#doPushWithCallback
-> RpcPushService#pushWithCallback
-> connection.asyncRequest,请求是NotifySubscriberRequest
---客户端处理---
-> NamingPushRequestHandler#requestReply
-> ServiceInfoHolder#processServiceInfo 更新客户端服务实例本地缓存
6.2 服务发现
Ribbon
-> ZoneAwareLoadBalancer构造方法
-> 父类DynamicServerListLoadBalancer()构造方法
-> DynamicServerListLoadBalancer#restOfInit
enableAndInitLearnNewServersFeature();
updateListOfServers();
-> NacosServerList#getUpdatedListOfServers
-> NacosNamingService#selectInstances()
-> NamingClientProxyDelegate#subscribe 在下面通过grpc请求服务端获取到实例后,会更新本地缓存,然后发布InstancesChangeEvent事件,并且将ServiceInfo写入本地磁盘。
-> NamingGrpcClientProxy#subscribe
1)redoService.cacheSubscriberForRedo()
2)doSubscribe()
-> NamingGrpcClientProxy#doSubscribe发送SubscribeServiceRequest请求
服务端
SubscribeServiceRequestHandler
1)查询服务相关的实例
Map<Service, Set<clientId>> 拿到clientId,然后根据ClientId拿到Client然后再拿到其服务对应的实例。
2)订阅 并发布事件 ClientSubscribeServiceEvent -> 处理完发布事件ServiceSubscribedEvent
6.3 客户端定时拉取任务UpdateTask
-> NamingClientProxyDelegate创建ServiceInfoUpdateService
-> NamingClientProxyDelegate#subscribe
-> ServiceInfoUpdateService.scheduleUpdateIfAbsent
-> UpdateTask定时(每秒)拉取注册中心注册的服务namingClientProxy.queryInstancesOfService
6.4 心跳-健康检查
ConnectionManager#start,该方法有注解@PostConstruct。
- RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(Runnable, 1000L, 3000L, TimeUnit.MILLISECONDS); 定时调度3s一次(每次结束后延迟3s)。
- 超过20s没有给服务端发心跳的客户端,服务端会发起请求探活,如果失败或者超过1s未响应则剔除服务。ClientDetectionRequest 探活
在Nacos 1.x版本中,临时实例需要客户端(服务提供者)定时向Nacos发送心跳包来维持自己的健康状态。持久化实例并不基于客户端发送心跳包,而是服务端定时探测客户端进行健康检查(TCP端口探测、HTTP返回码探测)。
在Nacos 2.0版本之后持久化实例的监控检查并没有改变逻辑;但临时实例不再使用心跳包,而是通过判断gRPC长连接是否存活来判断临时实例是否健康。
7.gRPC
客户端
grpcClientProxy -> NamingGrpcClientProxy -> start()
心跳
3次重试连接服务端connectToServer(),如果失败,同步重连其他Server。
异步重连。
服务端
BaseRpcServer @PostConstruct
BaseGrpcServer
type -> RequestHandler type是类名,比如InstanceRequest
RequestHandlerRegistry-> ApplicationListener
注册Connection Client