Nacos源码剖析

1.客户端向nacos服务端注册

1.1 客户端注册的地方

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
        AutoServiceRegistrationAutoConfiguration.class,
        NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(registrationCustomizers.getIfAvailable(),
                nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry,
                autoServiceRegistrationProperties, registration);
    }

}

NacosServiceRegistryAutoConfiguration注册了三个Bean:

  • NacosServiceRegistry,register()方法,deregister()方法,根据Registration获取实例Instance的方法
  • NacosRegistration,加了@PostConstruct注解的init()设置port、心跳间隔、心跳超时(BeanPostProcessor#postProcessBeforeInitialization()初始化前会调用@PostConstruct注解的init()方法进行设置)。可以设置和获取各种属性:serviceId、host、port、uri等等
  • NacosAutoServiceRegistration,包含上面两个Bean,是个ApplicationListener

NacosAutoServiceRegistration是核心,其余两个Bean是为了拆分职能,为该类服务。


NacosAutoServiceRegistration继承自ApplicationListener<WebServerInitializedEvent>,监听的是WebServerInitializedEvent事件。

这个调用时机点:

  • refresh()的finishRefresh()
  • DefaultLifecycleProcessor#onRefresh
  • WebServerStartStopLifecycle#start发布ServletWebServerInitializedEvent事件

然后就调用至NacosAutoServiceRegistration中:

  • AbstractAutoServiceRegistration#onApplicationEvent
  • bind(event)
  • start()
     发布InstancePreRegisteredEvent事件;
     register();
     发布InstanceRegisteredEvent事件;
     设置running为true。
  • 重点看register
    NacosAutoServiceRegistration#register
    AbstractAutoServiceRegistration#register
  • this.serviceRegistry.register(getRegistration());这里的serviceRegistry就是NacosServiceRegistry,getRegistration()返回的就是NacosRegistration。
  • 最终来到NacosServiceRegistry#register(Registration)
     获取serviceId、group,以及将registration转换成Instance;
     namingService.registerInstance(serviceId, group, instance);注册实例;
  • NacosNamingService#registerInstance()

现在来重点看一下NacosNamingService#registerInstance:

  • 如果是临时实例,则beatReactor.addBeatInfo(groupedServiceName, beatInfo);这里添加了一个延时任务BeatTask,心跳的默认间隔是多少?在BeatReactor#buildBeatInfo有指定,是5s。
  • serverProxy.registerService(groupedServiceName, groupName, instance);实际就是NamingProxy#registerService。请求路径:/nacos/v1/ns/instance,POST请求

1.2 服务端服务注册的地方

/nacos/v1/ns/instance POST


InstanceController#register

  • 从请求中提取出Instance
  • serviceManager.registerInstance(namespaceId, serviceName, instance);
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

ServiceManager#registerInstance

  • createEmptyService:创建空的Service,将Service放入Map<String, Map<String, Service>> serviceMap中;初始化:延时任务ClientBeatCheckTask;监听服务数据的变化consistencyService.listen()
  • addInstance()加锁操作
     addIpAddresses()将注册实例加入到对应服务Service中;
     consistencyService.put(key, instances)参见下面的DelegateConsistencyServiceImpl#put

DelegateConsistencyServiceImpl#put

  • mapConsistencyService(key).put(key, value)
  • DistroConsistencyServiceImpl#put
     onPut(key, value);将注册实例更新到内存注册表,并发布服务变化事件,通过udp方式将服务变动通知给订阅的客户端。
     distroProtocol.sync()同步实例信息到nacos.server集群其他节点。
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        
        Service service = getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

1.3 (服务注册表发生变化时)服务端即时主动推送(UDP)

ServiceManager#registerInstance
-> ServiceManager#addInstance
-> DistroConsistencyServiceImpl#put
-> DistroConsistencyServiceImpl#onPut
-> notifier.addTask(key, DataOperation.CHANGE);放到阻塞队列tasks中
-> Notifier#run从tasks中取出并进行处理handle(pair)
-> Service#onChange
-> Service#updateIPs
 将注册实例更新到cluster的ephemeralInstances中。
 发布服务变更事件getPushService().serviceChanged(this)。
-> PushService#serviceChanged 事件ServiceChangeEvent
-> PushService#onApplicationEvent处理事件ServiceChangeEvent
 clientMap.get(namespaceId, serviceName)应该就是订阅该服务的客户端,然后通过udp方式将服务变动通知给订阅的客户端。

问题1.服务是在哪里监听的?

  • ServiceManager#registerInstance
  • -> ServiceManager#createEmptyService
  • -> ServiceManager#createServiceIfAbsent
  • -> ServiceManager#putServiceAndInit
consistencyService.listen(
  KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), 
  service);

consistencyService.listen(
  KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), 
  service);

问题2.服务是在哪里进行订阅的?
InstanceController#list
-> InstanceController#doSrvIpxt
-> PushService#addClient(xxx)
-> PushService#addClient(PushService.PushClient)

问题3.梳理一下监听订阅机制?

2.客户端拉取服务实例

2.1 客户端

Ribbon
-> ZoneAwareLoadBalancer构造方法
-> 父类DynamicServerListLoadBalancer()构造方法
-> DynamicServerListLoadBalancer#restOfInit
 enableAndInitLearnNewServersFeature();
 updateListOfServers();
-> NacosServerList#getUpdatedListOfServers
-> NacosNamingService#selectInstances()
-> HostReactor#getServiceInfo
-> HostReactor#updateServiceNow
-> NamingProxy#queryList

/nacos/v1/ns/instance/list,GET操作

2.2 服务端

InstanceController#list
-> InstanceController#doSrvIpxt
1)pushService.addClient() 通过UDP方式推送变更到订阅的客户端
2)service.srvIPs()获取所有持久化实例和临时实例

    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {

3.心跳机制

3.1 客户端BeatReactor

NacosAutoServiceRegistration父类AbstractAutoServiceRegistration#onApplicationEvent
-> bind()
-> start()
-> NacosServiceRegistry#register
-> NacosNamingService#registerInstance
-> 临时实例 BeatReactor#addBeatInfo
-> executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
-> BeatReactor.BeatTask#run
1)serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);发送心跳
2)executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);递归调用

/nacos/v1/ns/instance/beat,PUT方法

    @CanDistro
    @PutMapping("/beat")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {

InstanceController#beat
1)如果实例不存在,则重新注册(网络不通后者重启导致实例下线后);
2)service.processClientBeat(clientBeat);调用至ClientBeatProcessor#run,找到发送心跳的实例,然后设置实例的lastBeat属性:instance.setLastBeat(System.currentTimeMillis());也有可能更改实例的healthy属性(如果原来为不健康更改为健康,也要推送服务变更事件udp)

3.2 服务端 HealthCheckReactor

InstanceController#register
-> ServiceManager#registerInstance
-> ServiceManager#createEmptyService
-> ServiceManager#putServiceAndInit
-> Service#init
-> HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
-> ClientBeatCheckTask#run
1)getDistroMapper().responsible(service.getName()) 判断本服务器是否负责service的心跳检查工作;
2)如果某个实例超过15s没有收到心跳,则将它的healthy属性设置为false;并推送服务变更getPushService().serviceChanged(service);
3)如果某个实例超过30s没有收到心跳,则直接剔除该实例,deleteIp(instance);会发起一个http调用。

    public boolean responsible(String serviceName) {
        final List<String> servers = healthyList;
        
        if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
            return true;
        }
        
        if (CollectionUtils.isEmpty(servers)) {
            // means distro config is not ready yet
            return false;
        }
        
        int index = servers.indexOf(EnvUtil.getLocalAddress());
        int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
        if (lastIndex < 0 || index < 0) {
            return true;
        }
        
        int target = distroHash(serviceName) % servers.size();
        return target >= index && target <= lastIndex;
    }

4.AP集群架构

4.1 数据同步机制

服务端注册、以及下线超时实例都会走数据同步机制。
InstanceController#register
-> ServiceManager#registerInstance
-> addInstance()
-> DelegateConsistencyServiceImpl#put
-> DistroConsistencyServiceImpl#put
-> DistroProtocol#sync()

ClientBeatCheckTask#run
-> deleteIp(instance)
-> 服务端InstanceController#deregister
-> ServiceManager#removeInstance()
-> DelegateConsistencyServiceImpl#put
-> DistroConsistencyServiceImpl#put
-> DistroProtocol#sync()

    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }

DistroProtocol#sync()
-> 会给除了自己以外的所有服务器同步
-> distroTaskEngineHolder.getDelayTaskExecuteEngine(). addTask(distroKeyWithTarget, distroDelayTask);核心就是tasks.put(key, newTask); 这里任务是DistroDelayTask
----异步处理----
-> NacosDelayTaskExecuteEngine.ProcessRunnable#run
-> processTasks()从tasks中获取任务并进行处理
-> DistroDelayTaskProcessor#process
-> distroTaskEngineHolder.getExecuteWorkersManager(). addTask(distroKey, syncChangeTask);
-> 放到阻塞队列中queue.put(task);
----异步处理----
-> TaskExecuteWorker.InnerWorker#run从queue.take()获取任务并执行task.run(),任务类型是DistroSyncChangeTask
-> DistroSyncChangeTask#run
1)distroComponentHolder.findTransportAgent(type). syncData(distroData, getDistroKey().getTargetServer());最终调用NamingProxy#syncData
2)handleFailedTask();如果同步不成功,则重试

同步方法NamingProxy#syncData:
-> /nacos/v1/ns/distro/datum,PUT方法
-> 其他server处理方法DistroController#onSyncDatum
-> DistroProtocol#onReceive
-> DistroConsistencyServiceImpl#processData()
-> DistroConsistencyServiceImpl#onPut 将同步过来的实例更新到内存注册表(这里怎么避免再次对同步过来的数据进行UDP推送?

4.2 心跳只会在一台机器上检查

HealthCheckReactor
ClientBeatCheckTask
如果某台机器挂了,其集群机器总数会变化,服务名hash取模就会跟着变化

客户端会连接到集群中所有机器码?

4.3 集群状态信息同步

4.3.1 ServiceReporter

健康状态通过ServiceReporter
-> ServiceManager#init
-> GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
-> ServiceManager.ServiceReporter#run,计算所有namespaceId中所有serviceName的校验和,然后发送给其他server
-> synchronizer.send(server.getAddress(), msg);
-> ServiceStatusSynchronizer#send

/nacos/v1/ns/service/status
其他server处理ServiceController#serviceStatus,如果发现校验和不一样,会向源Server发起请求,更新服务实例的健康状况。

4.3.2 ServerStatusReporter

ServerListManager#init
-> GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);

4.3.3 ServerInfoUpdater

ServerListManager#init
-> GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());

4.4 机器启动时从某一台机器同步数据

5.CP架构

5.1 CAP与BASE

C代表Consistency,一致性,是指所有节点在同一时刻的数据是相同的,即更新操作执行结束并响应用户完成后,所有节点存储的数据应该是一样的(强调的是各个节点间的数据一致)。

A代表Availability,可用性,是指系统提供的服务一直处于可用状态,对于用户的请求可即时响应。任何客户端的请求,不管访问哪个节点,都能得到响应数据,但不保证是同一份最新数据,即不保证每个节点给你的数据都是最新的(强调服务可用,但不保证数据的一致)。

P代表Partition Tolerance,分区容错性,是指在分布式系统遇到网络分区的情况下,仍然可以响应用户的请求。网络分区是指因为网络故障导致网络不连通,不同节点分布在不同的子网中(强调不管内部出现什么样的数据同步问题,会一直提供服务)。

只要有网络交互就一定会有延迟和数据丢失,而这种状况必须接受,还必须保证系统不能挂掉。所以P是必须要保证的。

  • 当选择一致性C时,如果因为网络分区发生了消息丢失、延迟过高,部分节点无法保证特定信息最新的,这时系统将返回写失败错误,也就是集群拒绝新数据写入。
  • 当选择可用性A时,系统始终可用(也即处理客户端的请求),如果发生了分区,一些节点将无法返回最新的特定信息。

BASE可以看出AP的延伸,是对互联网大规模分布式系统的实践总结,强调可用性。

  • 基本可用Basically Available,当分布式系统在出现不可预知的故障时,允许损失部分功能的可用性,保障核心功能的可用性。四板斧:流量削峰(错峰处理)、延迟响应(排队等待处理)、体验降级(小图片代替原始图片)、过载保护。
  • 最终一致性Eventually consistent,系统中所有的数据副本在经过一段时间的同步后,最终能够达到一个一致的状态。也即在数据的一致性上,存在一个短暂的延迟。
  • 软状态Soft State:不同节点间,数据副本存在短暂的不一致,是一种过渡状态。

最终一致性,以什么数据为准?

  • 以最新写入的数据为准
  • 以第一次写入的数据为准

最终一致性具体的实现方式?

  • 读时修复:在读取数据时,检测到数据不一致,进行修复。
  • 写时修复:在写入数据时,检测到数据不一致,进行修复。比如不同节点写失败,就将数据缓存下来,然后定时重传,修复数据的不一致性。
  • 异步修复:最常用的方式,通过定时对账检测副本数据的一致性,并修复。

在实现最终一致性的时候,推荐同时实现自定义写一致性级别(All、Quorum、One、Any),让用户可以自主选择相应的一致性级别。

5.2 Nacos的CP架构

raft协议:

  • Leader选举(ZAB所有节点都需要发起投票,然后PK,raft先发投票的一般不出意外都会成为leader)
    1)选举超时时间(150-300ms)
    2)超时则变成候选人,发起投票,先投给自己,然后将请求发给其他人
    3)其他人收到请求,如果还未超时,则投票给该人并且重置自己的超时时间
    4)候选人收到大部分选票,则成为Leader
    5)Leader同步Append Entries信息给followers
  • 日志复制(数据同步):两阶段提交

DelegateConsistencyServiceImpl#put
RaftConsistencyServiceImpl#put
注册信息写文件和内存
配置信息写数据库
这里raft协议并没有两阶段提交,写完日志直接提交了。会发生问题,比如Leader成功提交了但是其他节点抛异常失败了。

心跳同步的数据
从节点:0表示本地有,1表示主节点有

6.Nacos2.X源码剖析

6.1 注册

6.1.1 客户端

跟1.4源码一样,NacosServiceRegistryAutoConfiguration注册了三个Bean:

  • NacosServiceRegistry,register()方法,deregister()方法,根据Registration获取实例Instance的方法
  • NacosRegistration,加了@PostConstruct注解的init()设置port、心跳间隔、心跳超时(BeanPostProcessor#postProcessBeforeInitialization()初始化前会调用@PostConstruct注解的init()方法进行设置)。可以设置和获取各种属性:serviceId、host、port、uri等等
  • NacosAutoServiceRegistration,包含上面两个Bean,是个ApplicationListener
    NacosAutoServiceRegistration是核心,其余两个Bean是为了拆分职能,为该类服务。

实例注册的地方:

  • WebServerStartStopLifecycle#start发布ServletWebServerInitializedEvent事件
  • NacosAutoServiceRegistration监听WebServerInitializedEvent事件
  • AbstractAutoServiceRegistration#onApplicationEvent
  • bind(event);
  • start()
     发布InstancePreRegisteredEvent事件;
     register();
     发布InstanceRegisteredEvent事件;
     设置running为true。
  • 重点看register
    NacosAutoServiceRegistration#register
    AbstractAutoServiceRegistration#register
  • this.serviceRegistry.register(getRegistration());这里的serviceRegistry就是NacosServiceRegistry,getRegistration()返回的就是NacosRegistration。
  • 最终来到NacosServiceRegistry#register(Registration)
     获取serviceId、group,以及将registration转换成Instance;
     namingService.registerInstance(serviceId, group, instance);注册实例;
  • NacosNamingService#registerInstance()

现在来重点看一下NacosNamingService#registerInstance:

  • clientProxy.registerService(serviceName, groupName, instance); 这里clientProxy是NamingClientProxyDelegate
  • getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);如果实例是临时的,则使用grpcClientProxy,否则使用httpClientProxy
  • NamingGrpcClientProxy#registerService
    1)redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    2)NamingGrpcClientProxy#doRegisterService

重点来看一下NamingGrpcClientProxy#doRegisterService

  • 创建InstanceRequest,REGISTER_INSTANCE
  • NamingGrpcClientProxy#requestToServer调用RpcClient#request()发起请求
  • NamingGrpcRedoService#instanceRegistered,将registeredInstances中对应的服务设置为已注册

6.1.2 NamingGrpcRedoService

实例注册流程:

  • 注册前,redoService.cacheInstanceForRedo(),先将实例放入到registeredInstances
  • 注册成功后,redoService.instanceRegistered(),将对应实例设置为已注册:redoData.setRegistered(true);

NamingGrpcRedoService中有一个调度线程池,只有一个单独的线程,每隔3s运行一次RedoScheduledTask。

RedoScheduledTask#run

  • 会根据实例的状态筛选出需要操作的实例进行处理。
    public void run() {
        if (!redoService.isConnected()) {
            LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
            return;
        }
        try {
            redoForInstances();
            redoForSubscribes();
        } catch (Exception e) {
            LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
        }
    }

6.1.3 服务端

服务端根据请求InstanceRequest找请求处理器InstanceRequestHandler

  • InstanceRequestHandler#handle
  • 根据请求类型REGISTER_INSTANCE,InstanceRequestHandler#registerInstance
  • EphemeralClientOperationServiceImpl#registerInstance
     1)getSingleton()获取单例Service;
     2)Client client = clientManager.getClient(clientId);获取客户端在服务端对应的对象信息;
     3)instanceInfo = getPublishInfo(instance);客户端Instance转换为服务端的Instance;
     4)client.addServiceInstance(singleton, instanceInfo);
     4-1)publishers.put(service, instancePublishInfo)将注册实例放到客户端对象的publishers这个map中
     4-2)发布客户端变化事件ClientEvent.ClientChangedEvent,向Nacos其他服务器同步数据
     5) client.setLastUpdatedTime();更新客户端lastUpdatedTime
     6)发布客户端注册事件ClientOperationEvent.ClientRegisterServiceEvent
     7)发布实例元数据变化事件MetadataEvent.InstanceMetadataEvent

InstanceRequestHandler#handle
registerInstance() 客户端注册信息都放在了对应的Client里面
NotifyCenter

  • 事件1 ClientChangedEvent 集群同步
  • 事件2 ClientRegisterServiceEvent -> 发布事件ServiceChangedEvent -> NamingSubscriberServiceV2Impl#onEvent
  • 事件3 InstanceMetadataEvent

服务要么全是临时实例,要么是持久实例,不会一部分是临时一部分是持久。这个跟1.X不同。
注册表结构也改变了,1.X是一个大Map,2.X分成了多个数据结构。
gRPC底层类似于Netty,服务端会对每个客户端生成一个Client,也即类似于Netty的SocketChannel。

Nacos1.X一个大Map会产生很多冲突,使用写时复制。
Nacos2.X拆分为了很多Map,并发粒度很小。大大分散了冲突的压力。

NamingSubscriberServiceV2Impl#onEvent:
服务注册会推送到所有订阅该服务的客户端;
服务订阅只会推送到新增的订阅的客户端。

客户端在服务端有两个对象:

  • Client
  • Connection

6.1.4 事件处理机制

客户端变化事件ClientEvent.ClientChangedEvent处理流程
NotifyCenter#publishEvent()
-> NotifyCenter#publishEvent()
-> EventPublisher publisher = INSTANCE.publisherMap.get(topic);然后publisher.publish(event);
-> DefaultPublisher#publish放到阻塞队列queue里面
---异步---
-> DefaultPublisher#run
-> openEventHandler();
-> 从阻塞队列queue中获取Event,并进行处理receiveEvent(event);
-> DefaultPublisher#notifySubscriber
-> DistroClientDataProcessor#onEvent
-> DistroClientDataProcessor#syncToAllServer
-> distroProtocol.sync(distroKey, DataOperation.CHANGE)
-> DistroProtocol#syncToTarget向不包括自己的其他所有Nacos Server同步数据
-> distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);核心就是tasks.put(key, newTask); 这里任务是DistroDelayTask
----异步处理----
-> NacosDelayTaskExecuteEngine.ProcessRunnable#run
-> processTasks()从tasks中获取任务并进行处理
-> DistroDelayTaskProcessor#process
-> distroTaskEngineHolder.getExecuteWorkersManager(). addTask(distroKey, syncChangeTask);
-> TaskExecuteWorker#process放到阻塞队列中queue.put(task);
----异步处理----
-> TaskExecuteWorker.InnerWorker#run从queue.take()获取任务并执行task.run(),任务类型是DistroSyncChangeTask
-> DistroSyncChangeTask#run
-> DistroSyncChangeTask#doExecuteWithCallback
-> getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer(), callback);
-> DistroClientTransportAgent#syncData()
-> clusterRpcClientProxy.asyncRequest(),请求类型是DistroDataRequest

服务端处理:
-> DistroDataRequestHandler#handle
-> DistroDataRequestHandler#handleSyncData
-> DistroProtocol#onReceive
 1)dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
 2)dataProcessor.processData(distroData);
-> DistroClientDataProcessor#processData
-> DistroClientDataProcessor#handlerClientSyncData
 1)clientManager.syncClientConnected();同步客户端连接
 2)upgradeClient(client, clientSyncData);更新内存注册表数据,发送服务注册事件ClientOperationEvent.ClientRegisterServiceEvent

发布客户端注册事件ClientOperationEvent.ClientRegisterServiceEvent
-> 前面都一样NotifyCenter -> DefaultPublisher -> ClientServiceIndexesManager
-> ClientServiceIndexesManager#onEvent
-> ClientServiceIndexesManager#handleClientOperation
-> ClientServiceIndexesManager#addPublisherIndexes
 1)publisherIndexes.get(service).add(clientId);新增服务对应的客户端id
 2)发布事件ServiceEvent.ServiceChangedEvent
-> 事件ServiceEvent.ServiceChangedEvent处理过程
-> 前面都一样NotifyCenter -> DefaultPublisher -> NamingSubscriberServiceV2Impl
-> NamingSubscriberServiceV2Impl#onEvent
-> delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
---异步处理:推送到当前服务的所有订阅者----
-> NacosDelayTaskExecuteEngine队列tasks -> PushDelayTaskExecuteEngine.PushDelayTaskProcessor队列queue -> PushExecuteTask#run
-> PushExecuteTask#run
-> delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
-> PushExecutorRpcImpl#doPushWithCallback
-> RpcPushService#pushWithCallback
-> connection.asyncRequest,请求是NotifySubscriberRequest
---客户端处理---
-> NamingPushRequestHandler#requestReply
-> ServiceInfoHolder#processServiceInfo 更新客户端服务实例本地缓存

6.2 服务发现

Ribbon
-> ZoneAwareLoadBalancer构造方法
-> 父类DynamicServerListLoadBalancer()构造方法
-> DynamicServerListLoadBalancer#restOfInit
 enableAndInitLearnNewServersFeature();
 updateListOfServers();
-> NacosServerList#getUpdatedListOfServers
-> NacosNamingService#selectInstances()
-> NamingClientProxyDelegate#subscribe 在下面通过grpc请求服务端获取到实例后,会更新本地缓存,然后发布InstancesChangeEvent事件,并且将ServiceInfo写入本地磁盘。
-> NamingGrpcClientProxy#subscribe
 1)redoService.cacheSubscriberForRedo()
 2)doSubscribe()
-> NamingGrpcClientProxy#doSubscribe发送SubscribeServiceRequest请求

服务端
SubscribeServiceRequestHandler
1)查询服务相关的实例
Map<Service, Set<clientId>> 拿到clientId,然后根据ClientId拿到Client然后再拿到其服务对应的实例。
2)订阅 并发布事件 ClientSubscribeServiceEvent -> 处理完发布事件ServiceSubscribedEvent

6.3 客户端定时拉取任务UpdateTask

-> NamingClientProxyDelegate创建ServiceInfoUpdateService
-> NamingClientProxyDelegate#subscribe
-> ServiceInfoUpdateService.scheduleUpdateIfAbsent
-> UpdateTask定时(每秒)拉取注册中心注册的服务namingClientProxy.queryInstancesOfService

6.4 心跳-健康检查

ConnectionManager#start,该方法有注解@PostConstruct。

  • RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(Runnable, 1000L, 3000L, TimeUnit.MILLISECONDS); 定时调度3s一次(每次结束后延迟3s)。
  • 超过20s没有给服务端发心跳的客户端,服务端会发起请求探活,如果失败或者超过1s未响应则剔除服务。ClientDetectionRequest 探活

在Nacos 1.x版本中,临时实例需要客户端(服务提供者)定时向Nacos发送心跳包来维持自己的健康状态。持久化实例并不基于客户端发送心跳包,而是服务端定时探测客户端进行健康检查(TCP端口探测、HTTP返回码探测)。

在Nacos 2.0版本之后持久化实例的监控检查并没有改变逻辑;但临时实例不再使用心跳包,而是通过判断gRPC长连接是否存活来判断临时实例是否健康。

7.gRPC

客户端
grpcClientProxy -> NamingGrpcClientProxy -> start()
心跳
3次重试连接服务端connectToServer(),如果失败,同步重连其他Server。
异步重连。

服务端
BaseRpcServer @PostConstruct
BaseGrpcServer
type -> RequestHandler type是类名,比如InstanceRequest
RequestHandlerRegistry-> ApplicationListener
注册Connection Client

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,137评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,824评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,465评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,131评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,140评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,895评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,535评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,435评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,952评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,081评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,210评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,896评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,552评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,089评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,198评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,531评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,209评论 2 357

推荐阅读更多精彩内容