Nacos Server 源码分析

版本

Nacos 2.5.1

启动方式

console模块下的com.alibaba.nacos.Nacos,启动spring应用
使用Intellij IDEA启动时,可以使用VM options-Dnacos.standalone=true,启动单机模式

配置文件位于console/src/main/resources/application.properties
sql建表语句位于config/src/main/resources/META-INF/mysql-schema.sql

开发的集群模式启动
application.properties中配置集群列表"nacos.member.list=192.168.3.10:8648,192.168.3.10:8748,192.168.3.10:8848"
注意不要写127.0.0.1

启动命令VM options填写"-Dserver.port=8848
-Dnacos.home=E:\Gits\nacos\nacos-cluster-0"

nacos.home的相关代码位于com.alibaba.nacos.sys.env.EnvUtil#getNacosHome

默认端口号

端口 与主端口的偏移量 描述
8848 0 Nacos HTTP API 端口,用于Nacos AdminAPI及HTTP OpenAPI的访问
9848 1000 客户端gRPC请求服务端端口,用于客户端向服务端发起连接和请求
9849 1001 服务端gRPC请求服务端端口,用于服务间同步等
7848 -1000 Jraft请求服务端端口,用于处理服务端间的Raft相关请求

端口 与主端口的偏移量 描述

8080 独立配置 Nacos控制台端口,访问Nacos控制台及Nacos控制台的API

启动流程

com.alibaba.nacos.core.code.SpringApplicationRunListener调用StartingApplicationListener加了新的配置源EnvUtil.getApplicationConfFileResource()

GrpcClusterClient#connectToServer启动连接其它nacos服务实例的grpc端口。
另一种实现GrpcSdkClient#connectToServer是nacos sdk客户端去连接nacos服务端的grpc端口。这里包含了根据8848计算其它端口号的逻辑。

NotifyCenter & Event

NotifyCenter负责发布事件,由Subscriber响应事件并处理。EventPublisher居中负责事件的转发。
每个Event类型对应一个EventPublisher。nacos的事件响应都是异步的,跨线程的。
阅读代码时,根据事件的类名,寻找对应处理的Subscriber即可,Subscriber#onEvent为响应函数入口。

事件发布

com.alibaba.nacos.common.notify.NotifyCenter#publishEvent(com.alibaba.nacos.common.notify.Event) 静态函数

  1. 如果事件是慢事件SlowEvent,则使用INSTANCE.sharePublisher发布事件
  2. NotifyCenter#publisherMap中,以事件类名为key,获取对应的EventPublisher
    然后调用publisher.publish(event);执行EventPublisher内的预置逻辑

EventPublisher

  1. 可以使用NotifyCenter#registerToPublisher()函数,使用NotifyCenter#DEFAULT_PUBLISHER_FACTORY,默认的工厂类,构建默认的EventPublisher实现DefaultPublisher。之后再通过NotifyCenter#registerSubscriber()函数单独注册Subscriber

  2. 可以使用NotifyCenter#registerSubscriber()函数,入参中提供Subscriber
    入参也可以提供EventPublisher的工厂函数,如果不提供,则使用默认的工厂类,构建默认的DefaultPublisher

SubscriberSmartSubscriber的区别就是Subscriber的接口只能指定一个关联的Event类型,而SmartSubscriber的新接口可以指定多个。

这块感觉设计的有些混乱,有些可以通过Subscriber的接口知道他关联哪些Event。有些却需要再注册Subscriber时再指定关联哪些Event

DefaultPublisher

DefaultPublisher 是一个单独的线程,初始化时传入队列长度参数,初始化启动线程。

接受事件时将事件加入队列。
线程一刻不停的从队列中取出事件,遍历所有注册进来的Subscriber,使用Subscriber#executor异步执行Subscriber#onEvent事件。

详细过程是线程从队列中取出事件,交由执行receiveEvent(event);
receiveEvent(event);中,遍历DefaultPublisher#subscribers中所有注册进来的的SubscriberSubscriber#scopeMatches 确认事件匹配当前Subscriber
通过Event#sequence确保事件有序执行。
最终使用Subscriber#executor中的线程异步执行Subscriber#onEvent事件,如果Subscriber#executor为null,则使用当前DefaultPublisher的线程执行Subscriber#onEvent

其它 EventPublisher

NamingEventPublisher
TraceEventPublisherFactory
逻辑差不太多,区别点主要是一个EventPublisher,可以支持多种Event。内部通过一个map,key为Event,value为Subscriber集合。

接受请求

配置列表 模糊查询

fuzzySearchConfig:419, ConfigController (com.alibaba.nacos.config.server.controller)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
doInvoke:205, InvocableHandlerMethod (org.springframework.web.method.support)
invokeForRequest:150, InvocableHandlerMethod (org.springframework.web.method.support)
invokeAndHandle:117, ServletInvocableHandlerMethod (org.springframework.web.servlet.mvc.method.annotation)
invokeHandlerMethod:903, RequestMappingHandlerAdapter (org.springframework.web.servlet.mvc.method.annotation)
handleInternal:809, RequestMappingHandlerAdapter (org.springframework.web.servlet.mvc.method.annotation)
handle:87, AbstractHandlerMethodAdapter (org.springframework.web.servlet.mvc.method)
doDispatch:1072, DispatcherServlet (org.springframework.web.servlet)
doService:965, DispatcherServlet (org.springframework.web.servlet)
processRequest:1006, FrameworkServlet (org.springframework.web.servlet)
doGet:898, FrameworkServlet (org.springframework.web.servlet)
service:529, HttpServlet (javax.servlet.http)
service:883, FrameworkServlet (org.springframework.web.servlet)
service:623, HttpServlet (javax.servlet.http)
internalDoFilter:199, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:51, WsFilter (org.apache.tomcat.websocket.server)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:42, XssFilter (com.alibaba.nacos.console.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:91, CorsFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:84, ParamCheckerFilter (com.alibaba.nacos.core.paramcheck)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:109, NacosHttpTpsFilter (com.alibaba.nacos.core.control.http)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:69, AuthFilter (com.alibaba.nacos.core.auth)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:67, NacosWebFilter (com.alibaba.nacos.config.server.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:218, FilterChainProxy (org.springframework.security.web)
doFilter:190, FilterChainProxy (org.springframework.security.web)
invokeDelegate:354, DelegatingFilterProxy (org.springframework.web.filter)
doFilter:267, DelegatingFilterProxy (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:100, RequestContextFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:93, FormContentFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:96, WebMvcMetricsFilter (org.springframework.boot.actuate.metrics.web.servlet)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:201, CharacterEncodingFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:57, HttpRequestContextFilter (com.alibaba.nacos.core.context.remote)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
invoke:168, StandardWrapperValve (org.apache.catalina.core)
invoke:90, StandardContextValve (org.apache.catalina.core)
invoke:482, AuthenticatorBase (org.apache.catalina.authenticator)
invoke:130, StandardHostValve (org.apache.catalina.core)
invoke:93, ErrorReportValve (org.apache.catalina.valves)
invoke:74, StandardEngineValve (org.apache.catalina.core)
invoke:660, AbstractAccessLogValve (org.apache.catalina.valves)
service:346, CoyoteAdapter (org.apache.catalina.connector)
service:396, Http11Processor (org.apache.coyote.http11)
process:63, AbstractProcessorLight (org.apache.coyote)
process:937, AbstractProtocol$ConnectionHandler (org.apache.coyote)
doRun:1791, NioEndpoint$SocketProcessor (org.apache.tomcat.util.net)
run:52, SocketProcessorBase (org.apache.tomcat.util.net)
runWorker:1190, ThreadPoolExecutor (org.apache.tomcat.util.threads)
run:659, ThreadPoolExecutor$Worker (org.apache.tomcat.util.threads)
run:63, TaskThread$WrappingRunnable (org.apache.tomcat.util.threads)
run:748, Thread (java.lang)

HttpRequestContextFilter
ThreadLocal RequestContext中填入http请求等信息,RequestContext类是nacos自定义的类。

AuthFilter
鉴权功能,@Secured注解相关鉴权功能

ParamCheckerFilter
@ExtractorManager.Extractor注解解析请求参数,使用AbstractParamChecker对请求参数进行通用的检查。

XssFilter
response设置"Content-Security-Policy"header,xss相关

NacosWebFilter
"/v1/cs/*"专属的filter
request设置encoding UTF-8,response设置content type header 为 json

NacosHttpTpsFilter
"/v1/ns/", "/v2/ns/", "/v1/cs/", "/v2/cs/"专属的filter
@Secured注解相关鉴权功能
应该是tps流量控制插件,但是默认实现是无管控

查询配置

比如配置列表的模糊查询接口ConfigController#fuzzySearchConfig,调用ConfigInfoPersistService对数据库进行查询。

默认是derby数据库,通过EmbeddedConfigInfoPersistServiceImpl实现类查询,
内部逻辑就是根据参数组装sql,调用PaginationHelper.fetchPageLimit。然后是BaseDatabaseOperate,使用JdbcTemplate对数据库进行查询,先查count,再查列表数据。

配置为mysql后使用的是ExternalConfigInfoPersistServiceImpl实现类,与derby的流程一致,唯一区别是PaginationHelper.fetchPageLimit内直接调用JdbcTemplate对数据库进行查询。

获取配置详情接口也是普通的查询mysql

发布修改配置

com.alibaba.nacos.config.server.service.ConfigOperationService#publishConfig
先是一些aop

CapacityManagementAspect
检查配置文件大小是否超过限制,默认关闭,不检查。
大小限制数据来自于group_capacity tenant_capacity表,有rest接口可以写入数据,但没看到webui界面有相关设置。

ConfigChangeAspect
为config change plugin提供aop接入点,默认没有插件

RequestLogAspect
MetricsMonitor统计publish次数,就写config耗时

业务逻辑:

  1. 修改mysql内的配置时,使用事务
    事务内进行更新配置,插入配置历史(his_config_info表)两个操作
    其中更新配置语句是UPDATE config_info SET ... WHERE data_id=? AND group_id=? AND tenant_id=? AND (md5=? OR md5 IS NULL OR md5='')
    where语句额外使用md5作为判断条件。md5是根据配置文件内容计算出来的,web界面打开配置时会从后端得到当前配置的md5,调用配置更新接口时会传入就旧配置的md5,这样后端可以确保这期间没有其他人修改过这个配置文件。

  2. 发布配置修改事件ConfigDataChangeEvent
    NotifyCenter.publishEvent(event);进行事件发布,NotifyCenter会找到该消息对应的DefaultPublisher线程异步处理事件
    默认ConfigDataChangeEvent事件有两个监听器AsyncNotifyServiceExternalDumpService。我们先向下看,后面再描述这个异步流程。

  3. 紧接着的同步操作是向"com.alibaba.nacos.config.traceLog"logger中写入操作日志,这里只记录日志内容的md5,操作者ip等,没有日志的具体内容

  4. 继续讲解ConfigDataChangeEvent事件的异步操作

其实此事件就是让nacos集群的所有节点执行ExternalDumpService#dump。本地会监听该事件,执行ExternalDumpService#dumpAsyncNotifyService是向其它nacos节点发送grpc消息,让其他节点执行ExternalDumpService#dump

AsyncNotifyService
MetricsMonitor统计配置变更次数
ServerMemberManager获取其它nacos实例地址
com.alibaba.nacos.config.server.utils.ConfigExecutor#executeAsyncNotify再次进行异步执行
通过grpc向nacos内的其他节点发送ConfigChangeClusterSyncRequest消息。
grpc服务定义于nacos_grpc_service.proto文件内,是一个通用的消息服务,
其它nacos节点的GrpcRequestAcceptor是处理该消息的服务实现类。GrpcRequestAcceptor#request内部进行消息分发,最终由ConfigChangeClusterSyncRequestHandler#handle处理,这里的内部也是调用ExternalDumpService#dump进行处理。

ExternalDumpService#dump
TaskManager增加新的任务DumpTask,再次进行异步处理。
名为"com.alibaba.nacos.server.DumpTaskManager"的TaskManager.ProcessRunnable线程处理这些任务。任务key为"dataid+groupid+namespaceid"。根据任务key选择Processor,这里会选择默认Processor。任务交由com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process处理。
DumpProcessor#process先去库中查询最新的配置信息,然后调用com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump直接处理填充好的ConfigDumpEvent事件。com.alibaba.nacos.config.server.service.ConfigCacheService#dump
如果内存中该配置文件的md5发生了变化,将会
写入本地文件缓存,位于"{nacos.home}/data/config-data/"下
更新内存中缓存的md5值

NotifyCenter发布LocalDataChangeEvent事件,事件对应的EventPublisher中有LongPollingServiceRpcConfigChangeNotifier两个subscriber。
转到对应的EventPublisher线程,开始执行两个subscriber。

LongPollingService
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey));
通知LongPolling的客户端配置有变化。旧版本使用长轮询的方式,这里会拿到空的客户端列表,也就没有任何操作。

RpcConfigChangeNotifier
在本机查找关心这个配置文件的所有nacos客户端,
RpcPushTask中,向客户端异步发送ConfigChangeNotifyRequest请求,ConfigChangeNotifyRequest只是一个普通Java类,经由GrpcUtils.convert()函数转化为Payloadgrpc消息。这个消息只发送配置文件名称,没有具体配置内容。如果推送失败,会使用指数退避的时间持续重试,直到达到重试次数。

发布灰度配置逻辑与上面的类似,一个事务内,插入config_info_gray表,插入his_config_info表。事务成功后,发布ConfigDataChangeEvent事件。然后发布LocalDataChangeEvent事件,向所有监听这个配置文件的nacos客户端推送ConfigChangeNotifyRequest

表结构

  1. config_info表
    主键id
    tenant_id,groupid,dataid 一起是一个唯一索引,其中tenant_id就是namespace

配置文件直接原封不动存入content字段
md5判断表内容是否有变更
插入时唯一索引避免重复插入
变更时where语句加上md5字段,确保只有一个人变更成功,同时旧配置插入历史记录表his_config_info。

  1. config_info_gray 表
    主键id
    data_id,group_id,tenant_id,gray_name 一起是一个唯一索引,其中tenant_id就是namespace

nacos 集群

集群模式下,即使只启动一台服务器,客户端连接这一台服务器也可以正常变更配置。对于配置中心而言,nacos的各个实例就像是无状态的实例,可以任意扩缩容,没有其他影响。不过太多的话会增加mysql的负载,因为实例会不停地轮询mysql的用户角色表,配置历史,配置灰度表,确保数据缓存与mysql一致。

局限

灰度发布 beta发布

每个配置文件,只能同时存在一种beta。不过看后端代码应该是支持多个灰度,但是web页面不支持。
beta区分实例是能到ip地址,一个ip起两个服务无法区分,同时生效
灰度发布生效期间,无法更改主配置,无法再次修改beta配置

服务注册中心

核心逻辑简要概述

nacos允许一个客户端实例注册多个微服务,即一个{ip:port}可以提供多个服务
ServiceStorage#serviceDataIndexes相当于一个缓存。获取服务信息及实例列表时,直接从缓存中返回数据。
新的微服务实例注册或是取消注册时,主动更新这个缓存。

nacos客户端在调用其他微服务时,需要关心这一微服务的实例列表变化。通过SubscribeServiceRequestgrpc消息告知某一个nacos server实例,它需要订阅这个服务的实例列表变化信息。这一个nacos server实例会在将来向这个nacos 客户端推送NotifySubscriberRequest消息,包含微服务的实例列表。
所以每个nacos server实例只会通知部分nacos客户端,就是建立了推送消息连接的那部分nacos客户端。

nacos客户端发送InstanceRequestgrpc消息向某一个nacos server实例注册当前微服务实例。nacos server实例在本机记录好新实例信息后,会通过DistroDataRequestgrpc消息,distro协议广播至其它全部nacos server实例。这样所有的nacos server实例都有了这个新的微服务实例信息了。然后每个nacos server实例主动更新ServiceStorage#serviceDataIndexes这个缓存。
然后每个nacos server实例会向在各自nacos server实例订阅的nacos客户端推送微服务实例列表变化消息。
比如nacos客户端A在nacos server实例1上订阅了消息,nacos客户端B在nacos server实例2上订阅了消息。那么nacos server实例1向nacos客户端A推送变化消息,nacos server实例2向nacos客户端B推送变化消息,

接下来先陈述一些重要的数据结构,它们用于微服务,微服务实例列表,订阅信息的内存查询与内存存储

数据结构

  1. POJO

1.1 Service
表示一个微服务,仅包含微服务的最基本的标识信息,常用于map中的key

  • namespace group name 三个字段一起确定一个微服务,ServiceequalshashCode只和这三个字段有关。

1.2 ServiceInfo
表示一个微服务的详细信息

  • 包含 namespace group name 三个字段
  • List<Instance> hosts
    表示该服务的实例列表

1.3 Instance
表示一个微服务实例的详细信息

  • 包含ip port metadata等

1.4 ConnectionBasedClient
表示一个nacos client实例
与微服务实例的区别是,因为nacos支持一个客户端实例注册多个微服务,所以一个ConnectionBasedClient可以关联多个Instance。不过要注意的是ConnectionBasedClientInstance只是逻辑上有关联,代码中没有关联在一起,代码中使用InstancePublishInfo来表示微服务实例信息。

  • String connectionId
    表示该nacos client实例的id,默认由时间戳 ip port拼接而成,见AddressTransportFilter#transportReadyGrpcServerConstants#ATTR_TRANS_KEY_CONN_ID相关代码。
  • <Service, InstancePublishInfo> publishers
    表示该nacos client实例关联的多个服务Service,以及每个服务对应的微服务实例信息InstancePublishInfo。
  • <Service, Subscriber> subscribers
    表示该nacos client实例订阅的多个服务Service,以及订阅信息Subscriber。一般是该实例会调用其它服务,才会订阅那个服务的实例列表变化。

1.5 InstancePublishInfo
表示一个微服务实例的信息。
Instance逻辑上的信息大致一致,但InstancePublishInfo仅作为ConnectionBasedClient这类Client类的属性字段使用。重点表示某一个Client对外提供微服务时的实例信息。在有一些细微的区别,比如Instance可以用于对外提供实例的全部信息,比如Instance#ephemeral表示该实例是临时实例还是永久实例。但InstancePublishInfo没有ephemeral字段,因为InstancePublishInfo总是属于ConnectionBasedClient这类Client类的属性,而ephemeral是Client的属性,Client才会带着ephemeral信息。

  • 包含ip port 等信息

1.6 Subscriber
InstancePublishInfo类似,仅作为ConnectionBasedClient这类Client类的属性字段使用。重点表示某一个Client需要调用其它微服务时,需要订阅该微服务的实例列表变化事件。

  • 包含ip port 等信息
  1. 核心逻辑所在的类

2.1 ServiceStorage
通过Service查询该服务的详细信息及实例列表信息。
相当于一个查询缓存,查询时直接查出数据,不需要再从各个Client类中收集实例列表。
实例注册等处会抛出ServiceEvent.ServiceChangedEvent事件,事件监听器会更新这一缓存。

  • ServiceStorage#getPushData
    ServiceManagerClientServiceIndexesManagerClientManager等处构建出该服务的详细信息及实例列表信息,记录在serviceDataIndexesmap中,然后返回。
    相当于该缓存的数据更新操作。一般在服务实例注册,去注册时主动调用。

  • ServiceStorage#getData
    serviceDataIndexesmap中读取服务的详细信息及实例列表信息。如果map中存在,直接返回map数据,如果不存在,通过getPushData构建缓存并返回。
    相当于读缓存的操作。如果缓存没有则更新缓存。

2.2 ClientServiceIndexesManager 通过Service查询相关ClientId列表
这是一个为了方便查询而存在的索引类。
publisherIndexes 存储了所有 Service 及其对应的 实例列表所在的clientId集合
subscriberIndexes 存储了 Service 及订阅这个服务的clientId集合,但需要注意的是只存储了与当前nacos server建立推送连接的nacos client。所以nacos server实例的subscriberIndexes合在一起才是所有的订阅client全集。但是每个nacos server实例只存储了一部分订阅client,因为一个nacos client只与一个nacos server实例建立服务器推送连接,所以只有那个建立连接的nacos server实例才会推送订阅变化消息。

2.3 ConnectionBasedClientManager 通过ClientId查询具体Client详细信息

以上均为@Component注解的单例Bean。

2.2 ServiceManager
通过Service查询Service。
这里的Service不包含服务的详细信息,也不包含服务的实例列表信息。
这个类不是@Component注解的Bean,但依旧是单例。

  • ServiceManager.getInstance()
    静态方法获取单例

  • ServiceManager#containSingleton()
    可以查询服务是否存在。

  • ServiceManager#getSingleton()
    通过Service查询Service。查询时只需要提供namespace group name字段的Service,但查出来的Service还包含了ephemeral revision等信息。
    如果Service不存在,则添加这个Service。

2.5 核心Component类的map字段

字段名称 字段类型 描述
ServiceStorage#serviceDataIndexes Map<Service, ServiceInfo> ServiceInfo包含了所有服务详细信息以及全部实例列表信息
ServiceManager#singletonRepository Map<Service, Service> 根据Service查询Service,具体见本节上文
ClientServiceIndexesManager#publisherIndexes Map<Service, Set<String>> value为 String 类型的 clientId 集合,包含了所有服务及其实例ClientId
ClientServiceIndexesManager#subscriberIndexes Map<Service, Set<String>> value为 String 类型的 clientId 集合。包含了部分服务及订阅的实例ClientId,是与当前nacos server实例建立推送连接的那部分Client
ConnectionBasedClientManager#clients Map<String, ConnectionBasedClient> key为 String 类型的 clientId ,ConnectionBasedClient 为实例信息,包含了所有Client信息,即使是那些没有与当前nacos server建立连接的Client
ConnectionBasedClient#publishers Map<Service, InstancePublishInfo> 该nacos client实例提供的全部微服务实例信息。一个nacos client实例可以对外提供多个微服务。key为 Service ,value 为 InstancePublishInfo,是当前微服务实例的信息
ConnectionBasedClient#subscribers Map<Service, Subscriber>

结合关键代码进行重要流程简介

  1. 一些重要查询函数
  • Service singleton = ServiceManager.getInstance().getSingleton(service);
    提供namespace group name字段的Service,查出或添加Service。

  • ServiceStorage#getPushData
    相当于该缓存的数据更新操作。一般在服务实例注册,去注册时主动调用。

  • ServiceStorage#getData
    相当于读缓存的操作。如果缓存没有则更新缓存。

  1. 微服务实例的注册
    InstanceRequest grpc消息,InstanceRequestHandler#handle处理该消息
    2.1 Service singleton = ServiceManager.getInstance().getSingleton(service);
    至此ServiceManager完成Service的添加。

2.2 Client client = clientManager.getClient(clientId); 根据ClientId查出Client
Instance转为InstancePublishInfoClient.publishers加入InstancePublishInfo
至此Client新增了实例信息。

发布ClientEvent.ClientChangedEvent事件
Client#generateSyncData生成最新的ClientSyncData通过DistroDataRequestgrpc消息广播到nacos server所有其他节点。其他节点也就收到了这个新的Client和微服务实例信息,几乎是也走了一遍微服务实例的注册的流程。

2.3 Client更新lastUpdatedTime, revision字段,发布ClientOperationEvent.ClientRegisterServiceEvent事件
ClientServiceIndexesManager监听该事件,ClientServiceIndexesManager#publisherIndexes加入Service与对应的clientId。
发布ServiceEvent.ServiceChangedEvent事件
NamingSubscriberServiceV2Impl#onEvent监听该事件。
直接调用ServiceStorage#getPushData,从Client等中加载Service及其实例列表信息,写入到ServiceStorage#serviceDataIndexes。这里是一处ServiceStorage#serviceDataIndexes的数据更新点。
ClientServiceIndexesManager#subscriberIndexes中获取与当前nacos server实例建立推送连接的,订阅该服务的ClientId,向当前nacos server实例中的所有订阅该服务的Client推送NotifySubscriberRequest消息,发送实例列表。由于ClientEvent.ClientChangedEvent事件会广播到所有nacos server实例,其它实例内部也会发布ServiceEvent.ServiceChangedEvent事件,然后向各自实例的订阅Client子集推送NotifySubscriberRequest消息。

  1. 查询微服务对应的实例列表
    ServiceQueryRequest grpc消息为例,查询某一服务关联的实例列表

通过ServiceStorage#getData获取实例列表
ServiceStorage#getData是直接从ServiceStorage#serviceDataIndexes这个Map<Service, ServiceInfo>中获取服务实例列表,直接返回服务实例列表信息。
如果ServiceStorage#serviceDataIndexes没有这个key,则通过ServiceStorage#getPushData加载。
先通过ServiceManager#singletonRepositoryMap<Service, Service>,确认Service是否存在,不存在直接返回。
ServiceStorage#getAllInstancesFromIndex加载实例列表。
ClientServiceIndexesManager#publisherIndexes<Service, Set<String>>加载clientId列表
ConnectionBasedClientManager#clients<String, ConnectionBasedClient>获取Client
client.AbstractClient#publishers<Service, InstancePublishInfo>获取每个Client的实例地址
至此可以拿到服务对应的实例列表,最后放回ServiceStorage#serviceDataIndexes

  1. 订阅服务
    客户端向某个nacos server实例订阅他关心的服务实例列表变化信息。
    SubscribeServiceRequest grpc消息,SubscribeServiceRequestHandler.handle处理该消息
    ServiceStorage#getData获取服务及实例列表并返回客户端。

查询到Client,在Client#subscribers中加入这个Subscriber,发布ClientSubscribeServiceEvent事件。
ClientServiceIndexesManager响应事件,ClientServiceIndexesManager#subscriberIndexes中加入这个Subscriber。
如果是首次加入,则发布ServiceEvent.ServiceSubscribedEvent事件。
NamingSubscriberServiceV2Impl#onEvent响应这个事件,向刚刚订阅的nacos客户端推送服务端消息 NotifySubscriberRequest,包含了最新的服务及实例列表

所以首次订阅时,SubscribeServiceRequest订阅消息会返回实例列表,服务器还会立刻推送一次NotifySubscriberRequest消息,再次返回实例列表。

  1. EmptyServiceAutoCleanerV2默认每60秒执行EmptyServiceAutoCleanerV2#cleanEmptyService

后面开始代码详解

查询接口

服务实例列表

GET http://{ip}:{port}/nacos/v1/ns/catalog/services?pageNo=1&pageSize=10&withInstances=true

com.alibaba.nacos.naming.controllers.CatalogController#listDetail 接受请求,后续全部为内存操作

CatalogController#listDetail => CatalogServiceV2Impl#pageListServiceDetail

ServiceManager#namespaceSingletonMaps map中获取namespace key对应的全部Service列表。
针对每个Service
NamingMetadataManager#serviceMetadataMap map中获取这个Service对应的metadata
ServiceStorage#serviceDataIndexes map中获取这个Service对应的ServiceInfoServiceInfo#hosts记录了所有的实例信息。

grpc 接受请求

grpc服务定义于nacos_grpc_service.proto文件内,是一个通用的消息服务,
GrpcRequestAcceptor#request内部进行消息分发,最终由RequestHandler#handle处理。
RequestHandler的实现类继承RequestHandler抽象类时会通过泛型声明它关联的具体的消息类型

查询服务的实例列表消息 ServiceQueryRequest

与http的服务实例列表接口的逻辑大致一致,都是从一个数据源取的数据

服务实例注册消息 InstanceRequest

InstanceRequest
InstanceRequestHandler.handle => InstanceRequestHandler#registerInstance

EphemeralClientOperationServiceImpl#registerInstance

  1. Service singleton = ServiceManager.getInstance().getSingleton(service);
    ServiceManager#getSingleton
    ServiceManager#singletonRepository这个map中加入当前Service。key value均为 <Service, Service>。
    如果原map中没有,是一个新的Service,会发布MetadataEvent.ServiceMetadataEvent事件。
    ServiceManager#namespaceSingletonMaps中加入服务信息。他的key是namespace,value是Service Set。

NamingMetadataManager响应MetadataEvent.ServiceMetadataEvent事件。仅处理该Service的Metadata的数据过期问题。

  1. Client client = clientManager.getClient(clientId);
    获取到ConnectionBasedClient,也就是当前服务实例对应的ConnectionBasedClient类对象。
  1. client.addServiceInstance(singleton, instanceInfo);
    ConnectionBasedClient.publishers内加入<Service service, InstancePublishInfo instancePublishInfo> 条目信息。

3.1 发布ClientEvent.ClientChangedEvent事件

DistroClientDataProcessor#onEvent响应。
=> DistroClientDataProcessor#syncToAllServer
=> DistroProtocol#sync
后续详细流程位于DistroProtocol#sync DistroProtocol#onReceive 两节
简单说就是将Client和新加入的微服务实例信息广播到其它nacos server节点。其他节点如果没有相同的微服务实例信息,也执行一遍与EphemeralClientOperationServiceImpl#registerInstance类似的微服务实例注册逻辑,代码位于DistroClientDataProcessor#upgradeClient。其它nacos server的微服务实例注册逻辑内部也会再次执行DistroProtocol#sync进行广播。但是接收方如果已经存在该实例相同的信息时,将不再执行微服务实例注册逻辑,也就不会继续进行广播了。

  1. 发布 ClientOperationEvent.ClientRegisterServiceEvent

ClientServiceIndexesManager#onEvent响应
=> ClientServiceIndexesManager#handleClientOperation
=> ClientServiceIndexesManager#addPublisherIndexes

ClientServiceIndexesManager#publisherIndexes加入<Service, Set<clientId>>条目
发布ServiceEvent.ServiceChangedEvent事件

4.1 NamingSubscriberServiceV2Impl#onEvent 响应 ServiceEvent.ServiceChangedEvent事件

主要做的事情为

  • ServiceStorage记录这个服务的新实例信息,包括 ServiceStorage#serviceClusterIndexServiceStorage#serviceDataIndexes
  • 向订阅该Service的所有Subscriber推送NotifySubscriberRequest消息

详细的过程如下

4.1.1 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
后面的 DistroProtocol#sync 一节会描述类似的 delayTaskEngine.addTask的详细过程。我们现在直接介绍重要部分。

4.1.2 PushDelayTask默认是合并500ms内的相同key的任务,key为 Service,通过PushDelayTask#merge函数合并。合并逻辑为合并推送的targetClients列表或是推送全部。

这里的 delayTaskEngine 是 PushDelayTaskExecuteEnginePushDelayTaskExecuteEngine.taskProcessors为空,只能使用默认的PushDelayTaskProcessor处理PushDelayTask

4.1.3
PushDelayTaskProcessor#process =>
NamingExecuteTaskDispatcher.getInstance() .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));

NacosExecuteTaskExecuteEngine#addTask
NamingExecuteTaskDispatcher.executeEngine中的taskProcessors为空,defaultTaskProcessor也为空。所以会使用NacosExecuteTaskExecuteEngine#executeWorkers执行任务。执行的是刚刚封装的PushExecuteTask

4.1.4 PushExecuteTask.run
4.1.4.1 PushDataWrapper wrapper = generatePushData();
=> ServiceStorage#getPushData
Service数据复制为ServiceInfo,复制Service的基本信息

ServiceStorage#getAllInstancesFromIndex 生成该服务的所有实例列表
ClientServiceIndexesManager#publisherIndexes获取该服务的所有clientid列表
使用每个clientid从ConnectionBasedClientManager#clients获取对应的Client,然后从AbstractClient#publishers获取实例信息InstancePublishInfo
InstancePublishInfo转为Instance,从NamingMetadataManager#instanceMetadataMap中获取meta信息。
ServiceStorage#serviceClusterIndex中加<Service service, Set<String> clusters>条目。clusters默认只有一个DEFAULT,来自于UtilsAndCommons#DEFAULT_CLUSTER_NAME

ServiceStorage#serviceDataIndexes 加入 <Service singleton, ServiceInfo result>条目,也就是服务对应服务实例列表。

继续回到generatePushData()
NamingMetadataManager#instanceMetadataMap中获取meta信息,和刚刚的ServiceInfo合并为PushDataWrapper

4.1.4.2 getTargetClientIds()
ClientServiceIndexesManager#subscriberIndexes获取所以订阅该服务的clientId客户端id。
针对每一个客户端id,Client client = clientManager.getClient(each)
AbstractClient#subscribers获取Subscriber

4.1.4.3 针对每一个订阅者
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper, new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));

PushExecutorDelegate.doPushWithCallback
=> getPushExecuteService(clientId, subscriber)
默认SpiImplPushExecutorHolder#pushExecutors为空,最终返回PushExecutorRpcImpl

PushExecutorRpcImpl#doPushWithCallback
根据PushDataWrapper生成ServiceInfo,根据规则过滤一些实例。
使用RpcPushService#pushWithCallback向订阅者客户端推送NotifySubscriberRequest消息

推送成功回调 PushExecuteTask.ServicePushCallback#onSuccess
发布PushServiceTraceEvent事件,默认没有监听该事件

  1. 发布 MetadataEvent.InstanceMetadataEvent
    NamingMetadataManager#onEvent响应事件
    => NamingMetadataManager#handleInstanceMetadataEvent
    NamingMetadataManager#expiredMetadataInfos 移除该服务的metadata过期标记
  1. 发布RegisterInstanceTraceEvent事件
    回到InstanceRequestHandler#registerInstance最后还会发布RegisterInstanceTraceEvent事件
    默认没有监听器监听此事件

订阅服务实例变更消息 SubscribeServiceRequest

SubscribeServiceRequestHandler.handle => InstanceRequestHandler#registerInstance

构建Subscriber对象
构建ServiceInfo对象,包含Service以及实例列表。
来自于ServiceStorage#getData,先从ServiceStorage#serviceDataIndexes直接获取ServiceInfo对象。如果没有使用ServiceStorage#getPushData构建ServiceInfo对象,从ServiceManager#getSingleton获取Service,从ClientServiceIndexesManager#publisherIndexesConnectionBasedClientManager#clientsAbstractClient#publishers等中获取实例列表,共同构建ServiceInfo对象。

判断SubscribeServiceRequest消息参数中的subscribe是true or false
true 订阅 EphemeralClientOperationServiceImpl#subscribeService 发布SubscribeServiceTraceEvent
false 取消订阅 EphemeralClientOperationServiceImpl#unsubscribeService 发布UnsubscribeServiceTraceEvent

然后返回ServiceInfo对象

  1. EphemeralClientOperationServiceImpl#subscribeService
    1.1 ConnectionBasedClientManager#clients获取当前发送订阅消息的Client
    AbstractClient#addServiceSubscriber AbstractClient#subscribers中加入Subscriber,这里包含了想要订阅的服务信息

1.2 发布 ClientOperationEvent.ClientSubscribeServiceEvent
ClientServiceIndexesManager#onEvent 响应事件
=> ClientServiceIndexesManager#handleClientOperation
=> ClientServiceIndexesManager#addSubscriberIndexes
ClientServiceIndexesManager#subscriberIndexes 中加入希望监听这个Service的ClientId, <Service service, Set<String clientId>>

如果是首次加入,Set<String clientId> 中首次加入这个ClientId,
则发布 ServiceEvent.ServiceSubscribedEvent 事件

1.2.1 ServiceEvent.ServiceSubscribedEvent 事件
NamingSubscriberServiceV2Impl#onEvent 响应这个事件
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));
构建默认延迟500毫秒的PushDelayTask,任务中包含了Service和新增订阅的ClientId,注意PushDelayTask.pushToAll = false,并且指定了接受的客户端PushDelayTask#targetClients为刚刚订阅的客户端
PushDelayTask的主要逻辑就是向刚刚订阅的nacos客户端推送服务端消息 NotifySubscriberRequest,包含了最新的服务及实例列表

PushDelayTask的具体执行逻辑如下
1.2.2 后面的 DistroProtocol#sync 一节会描述类似的 delayTaskEngine.addTask的详细过程。我们现在直接介绍重要部分。
1.2.2.1 PushDelayTask会进行合并 PushDelayTask#merge,合并的逻辑就是合并要推送的nacos server列表。
1.2.2.2 PushDelayTaskExecuteEngine#taskProcessors为空,由PushDelayTaskExecuteEngine#defaultTaskProcessor处理PushDelayTask,也就是PushDelayTaskExecuteEngine.PushDelayTaskProcessor

PushDelayTaskExecuteEngine.PushDelayTaskProcessor#process
=> NamingExecuteTaskDispatcher#dispatchAndExecuteTask 执行 PushExecuteTask任务
=> PushDelayTaskExecuteEngine#addTask

1.2.2.3 PushDelayTaskExecuteEngine#taskProcessors为空,PushDelayTaskExecuteEngine#defaultTaskProcessor为null
使用PushDelayTaskExecuteEngine#executeWorkers执行PushExecuteTask任务

1.2.2.4 PushExecuteTask#run
PushExecuteTask#generatePushData 生成 PushDataWrapper
ServiceInfo数据来自于ServiceStorage#getPushDataServiceMetadata来自于NamingMetadataManager#getServiceMetadata,一起封装为PushDataWrapper

PushExecuteTask#getTargetClientIds 获取需要发送的客户端。
由于这个PushDelayTask构建时,PushDelayTask.pushToAll = false,并且指定了接受的客户端PushDelayTask#targetClients为刚刚订阅的客户端,所以只会向刚刚订阅的客户端发送
PushExecutorDelegate#doPushWithCallback
=> PushExecutorRpcImpl#doPushWithCallback
=> RpcPushService#pushWithCallback
向刚刚订阅的nacos客户端推送服务端消息 NotifySubscriberRequest,包含了最新的服务及实例列表

  1. 发布SubscribeServiceTraceEvent
    回到EphemeralClientOperationServiceImpl#subscribeService函数的末尾,发布SubscribeServiceTraceEvent
    不过默认没有监听器响应这个事件

DistroProtocol

DistroProtocol#sync

给其他所有nacos server节点除了自身,发送消息。
针对每个nacos server节点和微服务实例,积攒默认1秒的消息,合并为1个消息。
经过层层异步调用到达DistroDelayTaskProcessor#process,根据DistroProtocol#sync中不同的action,生成不同的Task,DistroSyncDeleteTask或是DistroSyncChangeTask。然后在异步执行这个Task。最终通过DistroClientTransportAgent#syncData()向其它nacos server实例发送grpc消息DistroDataRequest,消息中包含了action以及必要的微服务实例信息。

详细流程如下:

  1. 封装为默认1秒的DistroDelayTask,通过NacosDelayTaskExecuteEngine#addTask,将任务加入到NacosDelayTaskExecuteEngine#tasks中。这里NacosDelayTaskExecuteEngine的实现是DistroDelayTaskExecuteEngineDistroTaskEngineHolder构建DistroDelayTaskExecuteEngine时,会加入默认的DistroDelayTaskProcessor`。

addTask函数中,先用key查找是否已存在任务,如果存在,newTask.merge(existTask);新旧任务合并。然后放回map。
key为DistroKey,使用resourceKey resourceType targetServer共同确定。
比如resourceKey = Client.getClientId() resourceType="Nacos:Naming:v2:ClientData" targetServer=target nacos server ip:port。
也就是同一个微服务实例,同一个nacos server实例,同一个resourceType,在1秒内的任务会被合并为一个。

DistroDelayTask的合并逻辑就是根据createTime字段,保留最新的任务的action,舍弃旧的action

  1. NacosDelayTaskExecuteEngine初始化时,会启动一个线程,每100ms执行一次ProcessRunnable

也就是每100ms,ProcessRunnable中会遍历NacosDelayTaskExecuteEngine#tasks中所有的任务,取出已经到时间的任务。对每个已经到时间的任务执行
NacosTaskProcessor processor = getProcessor(taskKey);
processor.process(task)

DistroDelayTaskExecuteEngine中的NacosTaskProcessor processor = getProcessor(taskKey);实现是将DistroKey.resourceType作为key寻找NacosTaskProcessor,比如"Nacos:Naming:v2:ClientData"

默认DistroDelayTaskExecuteEngine#taskProcessors是空的,没有注册任何NacosTaskProcessor,最终所有getProcessor走的都是defaultTaskProcessor DistroDelayTaskProcessor

DistroDelayTaskProcessor#process
根据action,构建DistroSyncDeleteTask或是DistroSyncChangeTask,加入到DistroExecuteTaskExecuteEngine#addTask

  1. DistroExecuteTaskExecuteEngine#addTask内部根据DistroKey寻找NacosTaskProcessor,执行processor.process
    如果NacosTaskProcessor为null,则通过DistroKeyNacosExecuteTaskExecuteEngine#executeWorkers中找一个TaskExecuteWorker,执行worker.process

默认DistroExecuteTaskExecuteEngine#taskProcessors也是空的,没有注册任何NacosTaskProcessor,而且DistroExecuteTaskExecuteEngine#defaultTaskProcessor也是null。最终均由TaskExecuteWorker#process执行。

TaskExecuteWorker#process则是将任务加入到TaskExecuteWorker#queue队列中。InnerWorker会不停的从这个队列中取出任务,并执行Runnable.run,也就是DistroSyncDeleteTask.run或是DistroSyncChangeTask.run。二者共用AbstractDistroExecuteTask#run

  1. AbstractDistroExecuteTask#run内部
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
    根据DistroKey.resourceTypeDistroComponentHolder#transportAgentMap中寻找对应的DistroTransportAgent。默认只注册了DistroClientTransportAgent,也只用到了他。

如果transportAgent.supportCallbackTransport(),则执行doExecuteWithCallback(new DistroExecuteCallback());。默认DistroClientTransportAgent.supportCallbackTransport为true。

  1. DistroSyncChangeTask#doExecuteWithCallback
    DistroData distroData = getDistroData(type);传入DistroKey.resourceType,也就是"Nacos:Naming:v2:ClientData"。
    DistroComponentHolder#dataStorageMap,这个map中只注册了一个条目,最终获取到DistroClientDataProcessor
    DistroClientDataProcessor#getDistroData根据DistroKey.resourceKeyConnectionBasedClientManager#getClient获取到当前连接的微服务客户端数据ConnectionBasedClient

ConnectionBasedClient#generateSyncData生成ClientSyncData,里边包含了微服务实例的相关完整数据,同时ClientSyncData#attributes加入了"revision"条目,也就是这一ClientSyncData的版本号
再封装为DistroData

  1. DistroClientTransportAgent#syncData()
    封装为DistroDataRequest,通过ClusterRpcClientProxy#asyncRequest经grpc发送出去。

DistroProtocol#onReceive

DistroDataRequest的请求处理,在接收方的nacos server实例内部
DistroDataRequestHandler#handle
=> DistroProtocol#onReceive
=> DistroClientDataProcessor#processData
从消息中反序列化出ClientSyncData
=> DistroClientDataProcessor#handlerClientSyncData

  1. clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    ClientManagerDelegate#syncClientConnected
    => ConnectionBasedClientManager#syncClientConnected
    根据attributes中的"connectionType"获取不同类型的ClientFactory,默认ConnectionBasedClientFactory
    ConnectionBasedClientFactory构建出ConnectionBasedClient
    ConnectionBasedClientManager#clients中如果不存在,才加入新的<connectionId, Client> 条目,此时新的Client内部没有具体连接信息。
  1. Client client = clientManager.getClient(clientSyncData.getClientId());
    取出 上一步新建的Client

  2. upgradeClient(client, clientSyncData);
    DistroClientDataProcessor#upgradeClient

3.1 ServiceManager#getSingleton
服务实例注册消息 一节中提过
ServiceManager#singletonRepository ServiceManager#namespaceSingletonMaps 中加入Service

3.2 判断当前Client内部是否存在相同的微服务实例信息
如果不存在,则会执行一遍与EphemeralClientOperationServiceImpl#registerInstance内类似的服务实例注册逻辑。然后继续向其它nacos server实例广播该服务的注册消息。但是其它nacos server实例再次执行DistroProtocol#onReceive => upgradeClient(client, clientSyncData); 时,在当前这一步,发现已经存在相同的微服务实例信息,则不再执行后续的服务实例注册逻辑,也就阻止了无尽的消息广播。

接下来的逻辑与EphemeralClientOperationServiceImpl#registerInstance内的服务实例注册逻辑类似。
3.2.1 client.addServiceInstance(singleton, instancePublishInfo);
AbstractClient#publishers中加入微服务实例信息
发布ClientEvent.ClientChangedEvent事件。
该事件会再次触发
=> DistroClientDataProcessor#syncToAllServer
=> DistroProtocol#sync
一直到当前函数的流程,但是接收方如果已经有这个微服务实例信息的话,就不会执行client.addServiceInstance(singleton, instancePublishInfo);,也就不会继续产生ClientEvent.ClientChangedEvent事件

3.2.2 ClientOperationEvent.ClientRegisterServiceEvent事件
3.2.3 MetadataEvent.InstanceMetadataEvent事件

3.3 如果当前Client存在了外来ClientSyncData中没有的Service信息,则删除本地该实例的Service信息,发布ClientOperationEvent.ClientDeregisterServiceEvent事件。
也就是一个nacos client实例是可以提供多个Service的,这里在同步一个nacos client实例的Service列表。

3.4 client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
同步本地的Client的版本号

至此,没看到如果收到低版本号的Client消息时应该忽略的逻辑,猜测这里是有问题的。会将本地的客户端信息由新版本降低为低版本旧数据的可能。

一些问题

  1. ServiceInfo#lastRefTime 应该是nacos客户端用于定期轮询更新缓存用的。nacos server端应该是用不到这个字段。

  2. 如果同一时刻,实例1向serverA发送deregister,实例2向serverB发送deregister,两个server互相广播消息,最终是如何一致的。
    以serverA为例,收到实例1的变化时,会移除实例1的实例信息,然后更新ServiceStorage#serviceDataIndexes缓存。收到实例2的变化时,会移除实例2的实例信息,然后再更新缓存。无论这些消息、步骤如何异步,如何乱序。最终都可以保证有一个更新缓存的步骤是在最后发生的。这次更新缓存久一点更新出正确的结果。

  3. 如果一个客户端快速重启,向serverA发送deregister,向serverB发送register。nacos server会如何显示这一实例的状态。
    nacos-spring-cloud没有使用@RefreshScope注解,应该不会出现这种状况。
    如果出现这种情况,感觉极端情况下是会不停的重复广播注册、去注册消息的。但是DelayTask的merge与心跳机制最终应该会统一稳定为已注册状态。

断点

  1. 发送 grpc请求
    io.grpc.internal.ClientCallImpl#sendMessage
ClientStream clientStream = ((RetriableStream) this.stream).state.drainedSubstreams.iterator().next().stream;


String local = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString();
String remote = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();

return "send: " + method.getFullMethodName() + " " + local + " -> " + remote + "\n" + message;
  1. 接受 grpc 请求
    io.grpc.stub.ServerCalls.UnaryServerCallHandler.UnaryServerCallListener#onHalfClose method.invoke(request, responseObserver);一行
String local = call.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString();
String remote = call.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();

return "receive: " + call.method.getFullMethodName() + " " + remote + " -> " + local + "\n" + request;

可以通过过滤掉频繁的jraft消息
!((ServerCallImpl) call).method.getFullMethodName().contains("jraft")

  1. 接受 http请求
    org.springframework.web.servlet.DispatcherServlet#doDispatch
"receive: " + request.getMethod() + " " +request.getRequestURL() + " params: " + new Gson().toJson(request.getParameterMap())
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容