版本
Nacos 2.5.1
启动方式
console模块下的com.alibaba.nacos.Nacos,启动spring应用
使用Intellij IDEA启动时,可以使用VM options-Dnacos.standalone=true,启动单机模式
配置文件位于console/src/main/resources/application.properties
sql建表语句位于config/src/main/resources/META-INF/mysql-schema.sql
开发的集群模式启动
application.properties中配置集群列表"nacos.member.list=192.168.3.10:8648,192.168.3.10:8748,192.168.3.10:8848"
注意不要写127.0.0.1
启动命令VM options填写"-Dserver.port=8848
-Dnacos.home=E:\Gits\nacos\nacos-cluster-0"
nacos.home的相关代码位于com.alibaba.nacos.sys.env.EnvUtil#getNacosHome
默认端口号
| 端口 | 与主端口的偏移量 | 描述 |
|---|---|---|
| 8848 | 0 | Nacos HTTP API 端口,用于Nacos AdminAPI及HTTP OpenAPI的访问 |
| 9848 | 1000 | 客户端gRPC请求服务端端口,用于客户端向服务端发起连接和请求 |
| 9849 | 1001 | 服务端gRPC请求服务端端口,用于服务间同步等 |
| 7848 | -1000 | Jraft请求服务端端口,用于处理服务端间的Raft相关请求 |
端口 与主端口的偏移量 描述
8080 独立配置 Nacos控制台端口,访问Nacos控制台及Nacos控制台的API
启动流程
com.alibaba.nacos.core.code.SpringApplicationRunListener调用StartingApplicationListener加了新的配置源EnvUtil.getApplicationConfFileResource()
GrpcClusterClient#connectToServer启动连接其它nacos服务实例的grpc端口。
另一种实现GrpcSdkClient#connectToServer是nacos sdk客户端去连接nacos服务端的grpc端口。这里包含了根据8848计算其它端口号的逻辑。
NotifyCenter & Event
NotifyCenter负责发布事件,由Subscriber响应事件并处理。EventPublisher居中负责事件的转发。
每个Event类型对应一个EventPublisher。nacos的事件响应都是异步的,跨线程的。
阅读代码时,根据事件的类名,寻找对应处理的Subscriber即可,Subscriber#onEvent为响应函数入口。
事件发布
com.alibaba.nacos.common.notify.NotifyCenter#publishEvent(com.alibaba.nacos.common.notify.Event) 静态函数
- 如果事件是慢事件
SlowEvent,则使用INSTANCE.sharePublisher发布事件 - 从
NotifyCenter#publisherMap中,以事件类名为key,获取对应的EventPublisher。
然后调用publisher.publish(event);执行EventPublisher内的预置逻辑
EventPublisher
可以使用
NotifyCenter#registerToPublisher()函数,使用NotifyCenter#DEFAULT_PUBLISHER_FACTORY,默认的工厂类,构建默认的EventPublisher实现DefaultPublisher。之后再通过NotifyCenter#registerSubscriber()函数单独注册Subscriber可以使用
NotifyCenter#registerSubscriber()函数,入参中提供Subscriber。
入参也可以提供EventPublisher的工厂函数,如果不提供,则使用默认的工厂类,构建默认的DefaultPublisher。
Subscriber和SmartSubscriber的区别就是Subscriber的接口只能指定一个关联的Event类型,而SmartSubscriber的新接口可以指定多个。
这块感觉设计的有些混乱,有些可以通过Subscriber的接口知道他关联哪些Event。有些却需要再注册Subscriber时再指定关联哪些Event。
DefaultPublisher
DefaultPublisher 是一个单独的线程,初始化时传入队列长度参数,初始化启动线程。
接受事件时将事件加入队列。
线程一刻不停的从队列中取出事件,遍历所有注册进来的Subscriber,使用Subscriber#executor异步执行Subscriber#onEvent事件。
详细过程是线程从队列中取出事件,交由执行receiveEvent(event);。
receiveEvent(event);中,遍历DefaultPublisher#subscribers中所有注册进来的的Subscriber,Subscriber#scopeMatches 确认事件匹配当前Subscriber。
通过Event#sequence确保事件有序执行。
最终使用Subscriber#executor中的线程异步执行Subscriber#onEvent事件,如果Subscriber#executor为null,则使用当前DefaultPublisher的线程执行Subscriber#onEvent。
其它 EventPublisher
NamingEventPublisher
TraceEventPublisherFactory
逻辑差不太多,区别点主要是一个EventPublisher,可以支持多种Event。内部通过一个map,key为Event,value为Subscriber集合。
接受请求
配置列表 模糊查询
fuzzySearchConfig:419, ConfigController (com.alibaba.nacos.config.server.controller)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
doInvoke:205, InvocableHandlerMethod (org.springframework.web.method.support)
invokeForRequest:150, InvocableHandlerMethod (org.springframework.web.method.support)
invokeAndHandle:117, ServletInvocableHandlerMethod (org.springframework.web.servlet.mvc.method.annotation)
invokeHandlerMethod:903, RequestMappingHandlerAdapter (org.springframework.web.servlet.mvc.method.annotation)
handleInternal:809, RequestMappingHandlerAdapter (org.springframework.web.servlet.mvc.method.annotation)
handle:87, AbstractHandlerMethodAdapter (org.springframework.web.servlet.mvc.method)
doDispatch:1072, DispatcherServlet (org.springframework.web.servlet)
doService:965, DispatcherServlet (org.springframework.web.servlet)
processRequest:1006, FrameworkServlet (org.springframework.web.servlet)
doGet:898, FrameworkServlet (org.springframework.web.servlet)
service:529, HttpServlet (javax.servlet.http)
service:883, FrameworkServlet (org.springframework.web.servlet)
service:623, HttpServlet (javax.servlet.http)
internalDoFilter:199, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:51, WsFilter (org.apache.tomcat.websocket.server)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:42, XssFilter (com.alibaba.nacos.console.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:91, CorsFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:84, ParamCheckerFilter (com.alibaba.nacos.core.paramcheck)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:109, NacosHttpTpsFilter (com.alibaba.nacos.core.control.http)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:69, AuthFilter (com.alibaba.nacos.core.auth)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:67, NacosWebFilter (com.alibaba.nacos.config.server.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:218, FilterChainProxy (org.springframework.security.web)
doFilter:190, FilterChainProxy (org.springframework.security.web)
invokeDelegate:354, DelegatingFilterProxy (org.springframework.web.filter)
doFilter:267, DelegatingFilterProxy (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:100, RequestContextFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:93, FormContentFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:96, WebMvcMetricsFilter (org.springframework.boot.actuate.metrics.web.servlet)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilterInternal:201, CharacterEncodingFilter (org.springframework.web.filter)
doFilter:117, OncePerRequestFilter (org.springframework.web.filter)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
doFilter:57, HttpRequestContextFilter (com.alibaba.nacos.core.context.remote)
internalDoFilter:168, ApplicationFilterChain (org.apache.catalina.core)
doFilter:144, ApplicationFilterChain (org.apache.catalina.core)
invoke:168, StandardWrapperValve (org.apache.catalina.core)
invoke:90, StandardContextValve (org.apache.catalina.core)
invoke:482, AuthenticatorBase (org.apache.catalina.authenticator)
invoke:130, StandardHostValve (org.apache.catalina.core)
invoke:93, ErrorReportValve (org.apache.catalina.valves)
invoke:74, StandardEngineValve (org.apache.catalina.core)
invoke:660, AbstractAccessLogValve (org.apache.catalina.valves)
service:346, CoyoteAdapter (org.apache.catalina.connector)
service:396, Http11Processor (org.apache.coyote.http11)
process:63, AbstractProcessorLight (org.apache.coyote)
process:937, AbstractProtocol$ConnectionHandler (org.apache.coyote)
doRun:1791, NioEndpoint$SocketProcessor (org.apache.tomcat.util.net)
run:52, SocketProcessorBase (org.apache.tomcat.util.net)
runWorker:1190, ThreadPoolExecutor (org.apache.tomcat.util.threads)
run:659, ThreadPoolExecutor$Worker (org.apache.tomcat.util.threads)
run:63, TaskThread$WrappingRunnable (org.apache.tomcat.util.threads)
run:748, Thread (java.lang)
HttpRequestContextFilter
向ThreadLocal RequestContext中填入http请求等信息,RequestContext类是nacos自定义的类。
AuthFilter
鉴权功能,@Secured注解相关鉴权功能
ParamCheckerFilter
@ExtractorManager.Extractor注解解析请求参数,使用AbstractParamChecker对请求参数进行通用的检查。
XssFilter
response设置"Content-Security-Policy"header,xss相关
NacosWebFilter
"/v1/cs/*"专属的filter
request设置encoding UTF-8,response设置content type header 为 json
NacosHttpTpsFilter
"/v1/ns/", "/v2/ns/", "/v1/cs/", "/v2/cs/"专属的filter
@Secured注解相关鉴权功能
应该是tps流量控制插件,但是默认实现是无管控
查询配置
比如配置列表的模糊查询接口ConfigController#fuzzySearchConfig,调用ConfigInfoPersistService对数据库进行查询。
默认是derby数据库,通过EmbeddedConfigInfoPersistServiceImpl实现类查询,
内部逻辑就是根据参数组装sql,调用PaginationHelper.fetchPageLimit。然后是BaseDatabaseOperate,使用JdbcTemplate对数据库进行查询,先查count,再查列表数据。
配置为mysql后使用的是ExternalConfigInfoPersistServiceImpl实现类,与derby的流程一致,唯一区别是PaginationHelper.fetchPageLimit内直接调用JdbcTemplate对数据库进行查询。
获取配置详情接口也是普通的查询mysql
发布修改配置
com.alibaba.nacos.config.server.service.ConfigOperationService#publishConfig
先是一些aop
CapacityManagementAspect
检查配置文件大小是否超过限制,默认关闭,不检查。
大小限制数据来自于group_capacity tenant_capacity表,有rest接口可以写入数据,但没看到webui界面有相关设置。
ConfigChangeAspect
为config change plugin提供aop接入点,默认没有插件
RequestLogAspect
MetricsMonitor统计publish次数,就写config耗时
业务逻辑:
修改mysql内的配置时,使用事务
事务内进行更新配置,插入配置历史(his_config_info表)两个操作
其中更新配置语句是UPDATE config_info SET ... WHERE data_id=? AND group_id=? AND tenant_id=? AND (md5=? OR md5 IS NULL OR md5='')
where语句额外使用md5作为判断条件。md5是根据配置文件内容计算出来的,web界面打开配置时会从后端得到当前配置的md5,调用配置更新接口时会传入就旧配置的md5,这样后端可以确保这期间没有其他人修改过这个配置文件。发布配置修改事件
ConfigDataChangeEvent
NotifyCenter.publishEvent(event);进行事件发布,NotifyCenter会找到该消息对应的DefaultPublisher线程异步处理事件
默认ConfigDataChangeEvent事件有两个监听器AsyncNotifyService和ExternalDumpService。我们先向下看,后面再描述这个异步流程。紧接着的同步操作是向"com.alibaba.nacos.config.traceLog"logger中写入操作日志,这里只记录日志内容的md5,操作者ip等,没有日志的具体内容
继续讲解
ConfigDataChangeEvent事件的异步操作
其实此事件就是让nacos集群的所有节点执行ExternalDumpService#dump。本地会监听该事件,执行ExternalDumpService#dump。AsyncNotifyService是向其它nacos节点发送grpc消息,让其他节点执行ExternalDumpService#dump。
AsyncNotifyService
MetricsMonitor统计配置变更次数
从ServerMemberManager获取其它nacos实例地址
com.alibaba.nacos.config.server.utils.ConfigExecutor#executeAsyncNotify再次进行异步执行
通过grpc向nacos内的其他节点发送ConfigChangeClusterSyncRequest消息。
grpc服务定义于nacos_grpc_service.proto文件内,是一个通用的消息服务,
其它nacos节点的GrpcRequestAcceptor是处理该消息的服务实现类。GrpcRequestAcceptor#request内部进行消息分发,最终由ConfigChangeClusterSyncRequestHandler#handle处理,这里的内部也是调用ExternalDumpService#dump进行处理。
ExternalDumpService#dump
向TaskManager增加新的任务DumpTask,再次进行异步处理。
名为"com.alibaba.nacos.server.DumpTaskManager"的TaskManager.ProcessRunnable线程处理这些任务。任务key为"dataid+groupid+namespaceid"。根据任务key选择Processor,这里会选择默认Processor。任务交由com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process处理。
DumpProcessor#process先去库中查询最新的配置信息,然后调用com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump直接处理填充好的ConfigDumpEvent事件。com.alibaba.nacos.config.server.service.ConfigCacheService#dump
如果内存中该配置文件的md5发生了变化,将会
写入本地文件缓存,位于"{nacos.home}/data/config-data/"下
更新内存中缓存的md5值
向NotifyCenter发布LocalDataChangeEvent事件,事件对应的EventPublisher中有LongPollingService和RpcConfigChangeNotifier两个subscriber。
转到对应的EventPublisher线程,开始执行两个subscriber。
LongPollingService
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey));
通知LongPolling的客户端配置有变化。旧版本使用长轮询的方式,这里会拿到空的客户端列表,也就没有任何操作。
RpcConfigChangeNotifier
在本机查找关心这个配置文件的所有nacos客户端,
在RpcPushTask中,向客户端异步发送ConfigChangeNotifyRequest请求,ConfigChangeNotifyRequest只是一个普通Java类,经由GrpcUtils.convert()函数转化为Payloadgrpc消息。这个消息只发送配置文件名称,没有具体配置内容。如果推送失败,会使用指数退避的时间持续重试,直到达到重试次数。
发布灰度配置逻辑与上面的类似,一个事务内,插入config_info_gray表,插入his_config_info表。事务成功后,发布ConfigDataChangeEvent事件。然后发布LocalDataChangeEvent事件,向所有监听这个配置文件的nacos客户端推送ConfigChangeNotifyRequest。
表结构
- config_info表
主键id
tenant_id,groupid,dataid 一起是一个唯一索引,其中tenant_id就是namespace
配置文件直接原封不动存入content字段
md5判断表内容是否有变更
插入时唯一索引避免重复插入
变更时where语句加上md5字段,确保只有一个人变更成功,同时旧配置插入历史记录表his_config_info。
- config_info_gray 表
主键id
data_id,group_id,tenant_id,gray_name一起是一个唯一索引,其中tenant_id就是namespace
nacos 集群
集群模式下,即使只启动一台服务器,客户端连接这一台服务器也可以正常变更配置。对于配置中心而言,nacos的各个实例就像是无状态的实例,可以任意扩缩容,没有其他影响。不过太多的话会增加mysql的负载,因为实例会不停地轮询mysql的用户角色表,配置历史,配置灰度表,确保数据缓存与mysql一致。
局限
灰度发布 beta发布
每个配置文件,只能同时存在一种beta。不过看后端代码应该是支持多个灰度,但是web页面不支持。
beta区分实例是能到ip地址,一个ip起两个服务无法区分,同时生效
灰度发布生效期间,无法更改主配置,无法再次修改beta配置
服务注册中心
核心逻辑简要概述
nacos允许一个客户端实例注册多个微服务,即一个{ip:port}可以提供多个服务
ServiceStorage#serviceDataIndexes相当于一个缓存。获取服务信息及实例列表时,直接从缓存中返回数据。
新的微服务实例注册或是取消注册时,主动更新这个缓存。
nacos客户端在调用其他微服务时,需要关心这一微服务的实例列表变化。通过SubscribeServiceRequestgrpc消息告知某一个nacos server实例,它需要订阅这个服务的实例列表变化信息。这一个nacos server实例会在将来向这个nacos 客户端推送NotifySubscriberRequest消息,包含微服务的实例列表。
所以每个nacos server实例只会通知部分nacos客户端,就是建立了推送消息连接的那部分nacos客户端。
nacos客户端发送InstanceRequestgrpc消息向某一个nacos server实例注册当前微服务实例。nacos server实例在本机记录好新实例信息后,会通过DistroDataRequestgrpc消息,distro协议广播至其它全部nacos server实例。这样所有的nacos server实例都有了这个新的微服务实例信息了。然后每个nacos server实例主动更新ServiceStorage#serviceDataIndexes这个缓存。
然后每个nacos server实例会向在各自nacos server实例订阅的nacos客户端推送微服务实例列表变化消息。
比如nacos客户端A在nacos server实例1上订阅了消息,nacos客户端B在nacos server实例2上订阅了消息。那么nacos server实例1向nacos客户端A推送变化消息,nacos server实例2向nacos客户端B推送变化消息,
接下来先陈述一些重要的数据结构,它们用于微服务,微服务实例列表,订阅信息的内存查询与内存存储
数据结构
- POJO
1.1 Service
表示一个微服务,仅包含微服务的最基本的标识信息,常用于map中的key
- namespace group name 三个字段一起确定一个微服务,
Service的equals和hashCode只和这三个字段有关。
1.2 ServiceInfo
表示一个微服务的详细信息
- 包含 namespace group name 三个字段
- List<Instance> hosts
表示该服务的实例列表
1.3 Instance
表示一个微服务实例的详细信息
- 包含ip port metadata等
1.4 ConnectionBasedClient
表示一个nacos client实例
与微服务实例的区别是,因为nacos支持一个客户端实例注册多个微服务,所以一个ConnectionBasedClient可以关联多个Instance。不过要注意的是ConnectionBasedClient与Instance只是逻辑上有关联,代码中没有关联在一起,代码中使用InstancePublishInfo来表示微服务实例信息。
- String connectionId
表示该nacos client实例的id,默认由时间戳 ip port拼接而成,见AddressTransportFilter#transportReady中GrpcServerConstants#ATTR_TRANS_KEY_CONN_ID相关代码。 - <Service, InstancePublishInfo> publishers
表示该nacos client实例关联的多个服务Service,以及每个服务对应的微服务实例信息InstancePublishInfo。 - <Service, Subscriber> subscribers
表示该nacos client实例订阅的多个服务Service,以及订阅信息Subscriber。一般是该实例会调用其它服务,才会订阅那个服务的实例列表变化。
1.5 InstancePublishInfo
表示一个微服务实例的信息。
与Instance逻辑上的信息大致一致,但InstancePublishInfo仅作为ConnectionBasedClient这类Client类的属性字段使用。重点表示某一个Client对外提供微服务时的实例信息。在有一些细微的区别,比如Instance可以用于对外提供实例的全部信息,比如Instance#ephemeral表示该实例是临时实例还是永久实例。但InstancePublishInfo没有ephemeral字段,因为InstancePublishInfo总是属于ConnectionBasedClient这类Client类的属性,而ephemeral是Client的属性,Client才会带着ephemeral信息。
- 包含ip port 等信息
1.6 Subscriber
与InstancePublishInfo类似,仅作为ConnectionBasedClient这类Client类的属性字段使用。重点表示某一个Client需要调用其它微服务时,需要订阅该微服务的实例列表变化事件。
- 包含ip port 等信息
- 核心逻辑所在的类
2.1 ServiceStorage
通过Service查询该服务的详细信息及实例列表信息。
相当于一个查询缓存,查询时直接查出数据,不需要再从各个Client类中收集实例列表。
实例注册等处会抛出ServiceEvent.ServiceChangedEvent事件,事件监听器会更新这一缓存。
ServiceStorage#getPushData
从ServiceManager,ClientServiceIndexesManager,ClientManager等处构建出该服务的详细信息及实例列表信息,记录在serviceDataIndexesmap中,然后返回。
相当于该缓存的数据更新操作。一般在服务实例注册,去注册时主动调用。ServiceStorage#getData
从serviceDataIndexesmap中读取服务的详细信息及实例列表信息。如果map中存在,直接返回map数据,如果不存在,通过getPushData构建缓存并返回。
相当于读缓存的操作。如果缓存没有则更新缓存。
2.2 ClientServiceIndexesManager 通过Service查询相关ClientId列表
这是一个为了方便查询而存在的索引类。
publisherIndexes 存储了所有 Service 及其对应的 实例列表所在的clientId集合
subscriberIndexes 存储了 Service 及订阅这个服务的clientId集合,但需要注意的是只存储了与当前nacos server建立推送连接的nacos client。所以nacos server实例的subscriberIndexes合在一起才是所有的订阅client全集。但是每个nacos server实例只存储了一部分订阅client,因为一个nacos client只与一个nacos server实例建立服务器推送连接,所以只有那个建立连接的nacos server实例才会推送订阅变化消息。
2.3 ConnectionBasedClientManager 通过ClientId查询具体Client详细信息
以上均为@Component注解的单例Bean。
2.2 ServiceManager
通过Service查询Service。
这里的Service不包含服务的详细信息,也不包含服务的实例列表信息。
这个类不是@Component注解的Bean,但依旧是单例。
ServiceManager.getInstance()
静态方法获取单例ServiceManager#containSingleton()
可以查询服务是否存在。ServiceManager#getSingleton()
通过Service查询Service。查询时只需要提供namespace group name字段的Service,但查出来的Service还包含了ephemeral revision等信息。
如果Service不存在,则添加这个Service。
2.5 核心Component类的map字段
| 字段名称 | 字段类型 | 描述 |
|---|---|---|
ServiceStorage#serviceDataIndexes |
Map<Service, ServiceInfo> |
ServiceInfo包含了所有服务详细信息以及全部实例列表信息 |
ServiceManager#singletonRepository |
Map<Service, Service> | 根据Service查询Service,具体见本节上文 |
ClientServiceIndexesManager#publisherIndexes |
Map<Service, Set<String>> | value为 String 类型的 clientId 集合,包含了所有服务及其实例ClientId |
ClientServiceIndexesManager#subscriberIndexes |
Map<Service, Set<String>> | value为 String 类型的 clientId 集合。包含了部分服务及订阅的实例ClientId,是与当前nacos server实例建立推送连接的那部分Client |
ConnectionBasedClientManager#clients |
Map<String, ConnectionBasedClient> | key为 String 类型的 clientId ,ConnectionBasedClient 为实例信息,包含了所有Client信息,即使是那些没有与当前nacos server建立连接的Client |
ConnectionBasedClient#publishers |
Map<Service, InstancePublishInfo> | 该nacos client实例提供的全部微服务实例信息。一个nacos client实例可以对外提供多个微服务。key为 Service ,value 为 InstancePublishInfo,是当前微服务实例的信息 |
ConnectionBasedClient#subscribers |
Map<Service, Subscriber> |
结合关键代码进行重要流程简介
- 一些重要查询函数
Service singleton = ServiceManager.getInstance().getSingleton(service);
提供namespace group name字段的Service,查出或添加Service。ServiceStorage#getPushData
相当于该缓存的数据更新操作。一般在服务实例注册,去注册时主动调用。ServiceStorage#getData
相当于读缓存的操作。如果缓存没有则更新缓存。
- 微服务实例的注册
InstanceRequestgrpc消息,InstanceRequestHandler#handle处理该消息
2.1Service singleton = ServiceManager.getInstance().getSingleton(service);
至此ServiceManager完成Service的添加。
2.2 Client client = clientManager.getClient(clientId); 根据ClientId查出Client
Instance转为InstancePublishInfo,Client.publishers加入InstancePublishInfo
至此Client新增了实例信息。
发布ClientEvent.ClientChangedEvent事件
Client#generateSyncData生成最新的ClientSyncData通过DistroDataRequestgrpc消息广播到nacos server所有其他节点。其他节点也就收到了这个新的Client和微服务实例信息,几乎是也走了一遍微服务实例的注册的流程。
2.3 Client更新lastUpdatedTime, revision字段,发布ClientOperationEvent.ClientRegisterServiceEvent事件
ClientServiceIndexesManager监听该事件,ClientServiceIndexesManager#publisherIndexes加入Service与对应的clientId。
发布ServiceEvent.ServiceChangedEvent事件
NamingSubscriberServiceV2Impl#onEvent监听该事件。
直接调用ServiceStorage#getPushData,从Client等中加载Service及其实例列表信息,写入到ServiceStorage#serviceDataIndexes。这里是一处ServiceStorage#serviceDataIndexes的数据更新点。
从ClientServiceIndexesManager#subscriberIndexes中获取与当前nacos server实例建立推送连接的,订阅该服务的ClientId,向当前nacos server实例中的所有订阅该服务的Client推送NotifySubscriberRequest消息,发送实例列表。由于ClientEvent.ClientChangedEvent事件会广播到所有nacos server实例,其它实例内部也会发布ServiceEvent.ServiceChangedEvent事件,然后向各自实例的订阅Client子集推送NotifySubscriberRequest消息。
- 查询微服务对应的实例列表
以ServiceQueryRequestgrpc消息为例,查询某一服务关联的实例列表
通过ServiceStorage#getData获取实例列表
ServiceStorage#getData是直接从ServiceStorage#serviceDataIndexes这个Map<Service, ServiceInfo>中获取服务实例列表,直接返回服务实例列表信息。
如果ServiceStorage#serviceDataIndexes没有这个key,则通过ServiceStorage#getPushData加载。
先通过ServiceManager#singletonRepositoryMap<Service, Service>,确认Service是否存在,不存在直接返回。
ServiceStorage#getAllInstancesFromIndex加载实例列表。
ClientServiceIndexesManager#publisherIndexes<Service, Set<String>>加载clientId列表
ConnectionBasedClientManager#clients<String, ConnectionBasedClient>获取Client
client.AbstractClient#publishers<Service, InstancePublishInfo>获取每个Client的实例地址
至此可以拿到服务对应的实例列表,最后放回ServiceStorage#serviceDataIndexes
- 订阅服务
客户端向某个nacos server实例订阅他关心的服务实例列表变化信息。
SubscribeServiceRequestgrpc消息,SubscribeServiceRequestHandler.handle处理该消息
ServiceStorage#getData获取服务及实例列表并返回客户端。
查询到Client,在Client#subscribers中加入这个Subscriber,发布ClientSubscribeServiceEvent事件。
ClientServiceIndexesManager响应事件,ClientServiceIndexesManager#subscriberIndexes中加入这个Subscriber。
如果是首次加入,则发布ServiceEvent.ServiceSubscribedEvent事件。
NamingSubscriberServiceV2Impl#onEvent响应这个事件,向刚刚订阅的nacos客户端推送服务端消息 NotifySubscriberRequest,包含了最新的服务及实例列表
所以首次订阅时,SubscribeServiceRequest订阅消息会返回实例列表,服务器还会立刻推送一次NotifySubscriberRequest消息,再次返回实例列表。
-
EmptyServiceAutoCleanerV2默认每60秒执行EmptyServiceAutoCleanerV2#cleanEmptyService
后面开始代码详解
查询接口
服务实例列表
GET http://{ip}:{port}/nacos/v1/ns/catalog/services?pageNo=1&pageSize=10&withInstances=true
com.alibaba.nacos.naming.controllers.CatalogController#listDetail 接受请求,后续全部为内存操作
CatalogController#listDetail => CatalogServiceV2Impl#pageListServiceDetail
从ServiceManager#namespaceSingletonMaps map中获取namespace key对应的全部Service列表。
针对每个Service
从NamingMetadataManager#serviceMetadataMap map中获取这个Service对应的metadata
从ServiceStorage#serviceDataIndexes map中获取这个Service对应的ServiceInfo,ServiceInfo#hosts记录了所有的实例信息。
grpc 接受请求
grpc服务定义于nacos_grpc_service.proto文件内,是一个通用的消息服务,
GrpcRequestAcceptor#request内部进行消息分发,最终由RequestHandler#handle处理。
RequestHandler的实现类继承RequestHandler抽象类时会通过泛型声明它关联的具体的消息类型
查询服务的实例列表消息 ServiceQueryRequest
与http的服务实例列表接口的逻辑大致一致,都是从一个数据源取的数据
服务实例注册消息 InstanceRequest
InstanceRequest
InstanceRequestHandler.handle => InstanceRequestHandler#registerInstance
EphemeralClientOperationServiceImpl#registerInstance
-
Service singleton = ServiceManager.getInstance().getSingleton(service);
ServiceManager#getSingleton
向ServiceManager#singletonRepository这个map中加入当前Service。key value均为 <Service, Service>。
如果原map中没有,是一个新的Service,会发布MetadataEvent.ServiceMetadataEvent事件。
ServiceManager#namespaceSingletonMaps中加入服务信息。他的key是namespace,value是Service Set。
由NamingMetadataManager响应MetadataEvent.ServiceMetadataEvent事件。仅处理该Service的Metadata的数据过期问题。
-
Client client = clientManager.getClient(clientId);
获取到ConnectionBasedClient,也就是当前服务实例对应的ConnectionBasedClient类对象。
-
client.addServiceInstance(singleton, instanceInfo);
向ConnectionBasedClient.publishers内加入<Service service, InstancePublishInfo instancePublishInfo> 条目信息。
3.1 发布ClientEvent.ClientChangedEvent事件
由DistroClientDataProcessor#onEvent响应。
=> DistroClientDataProcessor#syncToAllServer
=> DistroProtocol#sync
后续详细流程位于DistroProtocol#sync DistroProtocol#onReceive 两节
简单说就是将Client和新加入的微服务实例信息广播到其它nacos server节点。其他节点如果没有相同的微服务实例信息,也执行一遍与EphemeralClientOperationServiceImpl#registerInstance类似的微服务实例注册逻辑,代码位于DistroClientDataProcessor#upgradeClient。其它nacos server的微服务实例注册逻辑内部也会再次执行DistroProtocol#sync进行广播。但是接收方如果已经存在该实例相同的信息时,将不再执行微服务实例注册逻辑,也就不会继续进行广播了。
- 发布
ClientOperationEvent.ClientRegisterServiceEvent
由ClientServiceIndexesManager#onEvent响应
=> ClientServiceIndexesManager#handleClientOperation
=> ClientServiceIndexesManager#addPublisherIndexes
ClientServiceIndexesManager#publisherIndexes加入<Service, Set<clientId>>条目
发布ServiceEvent.ServiceChangedEvent事件
4.1 NamingSubscriberServiceV2Impl#onEvent 响应 ServiceEvent.ServiceChangedEvent事件
主要做的事情为
-
ServiceStorage记录这个服务的新实例信息,包括ServiceStorage#serviceClusterIndex和ServiceStorage#serviceDataIndexes。 - 向订阅该Service的所有
Subscriber推送NotifySubscriberRequest消息
详细的过程如下
4.1.1 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
后面的 DistroProtocol#sync 一节会描述类似的 delayTaskEngine.addTask的详细过程。我们现在直接介绍重要部分。
4.1.2 PushDelayTask默认是合并500ms内的相同key的任务,key为 Service,通过PushDelayTask#merge函数合并。合并逻辑为合并推送的targetClients列表或是推送全部。
这里的 delayTaskEngine 是 PushDelayTaskExecuteEngine。PushDelayTaskExecuteEngine.taskProcessors为空,只能使用默认的PushDelayTaskProcessor处理PushDelayTask。
4.1.3
PushDelayTaskProcessor#process =>
NamingExecuteTaskDispatcher.getInstance() .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
NacosExecuteTaskExecuteEngine#addTask
NamingExecuteTaskDispatcher.executeEngine中的taskProcessors为空,defaultTaskProcessor也为空。所以会使用NacosExecuteTaskExecuteEngine#executeWorkers执行任务。执行的是刚刚封装的PushExecuteTask。
4.1.4 PushExecuteTask.run
4.1.4.1 PushDataWrapper wrapper = generatePushData();
=> ServiceStorage#getPushData
Service数据复制为ServiceInfo,复制Service的基本信息
ServiceStorage#getAllInstancesFromIndex 生成该服务的所有实例列表
ClientServiceIndexesManager#publisherIndexes获取该服务的所有clientid列表
使用每个clientid从ConnectionBasedClientManager#clients获取对应的Client,然后从AbstractClient#publishers获取实例信息InstancePublishInfo。
InstancePublishInfo转为Instance,从NamingMetadataManager#instanceMetadataMap中获取meta信息。
ServiceStorage#serviceClusterIndex中加<Service service, Set<String> clusters>条目。clusters默认只有一个DEFAULT,来自于UtilsAndCommons#DEFAULT_CLUSTER_NAME
ServiceStorage#serviceDataIndexes 加入 <Service singleton, ServiceInfo result>条目,也就是服务对应服务实例列表。
继续回到generatePushData()
从NamingMetadataManager#instanceMetadataMap中获取meta信息,和刚刚的ServiceInfo合并为PushDataWrapper
4.1.4.2 getTargetClientIds()
ClientServiceIndexesManager#subscriberIndexes获取所以订阅该服务的clientId客户端id。
针对每一个客户端id,Client client = clientManager.getClient(each)
AbstractClient#subscribers获取Subscriber
4.1.4.3 针对每一个订阅者
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper, new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
PushExecutorDelegate.doPushWithCallback
=> getPushExecuteService(clientId, subscriber)
默认SpiImplPushExecutorHolder#pushExecutors为空,最终返回PushExecutorRpcImpl
PushExecutorRpcImpl#doPushWithCallback
根据PushDataWrapper生成ServiceInfo,根据规则过滤一些实例。
使用RpcPushService#pushWithCallback向订阅者客户端推送NotifySubscriberRequest消息
推送成功回调 PushExecuteTask.ServicePushCallback#onSuccess
发布PushServiceTraceEvent事件,默认没有监听该事件
- 发布
MetadataEvent.InstanceMetadataEvent
NamingMetadataManager#onEvent响应事件
=>NamingMetadataManager#handleInstanceMetadataEvent
NamingMetadataManager#expiredMetadataInfos移除该服务的metadata过期标记
- 发布
RegisterInstanceTraceEvent事件
回到InstanceRequestHandler#registerInstance最后还会发布RegisterInstanceTraceEvent事件
默认没有监听器监听此事件
订阅服务实例变更消息 SubscribeServiceRequest
SubscribeServiceRequestHandler.handle => InstanceRequestHandler#registerInstance
构建Subscriber对象
构建ServiceInfo对象,包含Service以及实例列表。
来自于ServiceStorage#getData,先从ServiceStorage#serviceDataIndexes直接获取ServiceInfo对象。如果没有使用ServiceStorage#getPushData构建ServiceInfo对象,从ServiceManager#getSingleton获取Service,从ClientServiceIndexesManager#publisherIndexes,ConnectionBasedClientManager#clients,AbstractClient#publishers等中获取实例列表,共同构建ServiceInfo对象。
判断SubscribeServiceRequest消息参数中的subscribe是true or false
true 订阅 EphemeralClientOperationServiceImpl#subscribeService 发布SubscribeServiceTraceEvent
false 取消订阅 EphemeralClientOperationServiceImpl#unsubscribeService 发布UnsubscribeServiceTraceEvent
然后返回ServiceInfo对象
-
EphemeralClientOperationServiceImpl#subscribeService
1.1ConnectionBasedClientManager#clients获取当前发送订阅消息的Client
AbstractClient#addServiceSubscriberAbstractClient#subscribers中加入Subscriber,这里包含了想要订阅的服务信息
1.2 发布 ClientOperationEvent.ClientSubscribeServiceEvent
ClientServiceIndexesManager#onEvent 响应事件
=> ClientServiceIndexesManager#handleClientOperation
=> ClientServiceIndexesManager#addSubscriberIndexes
ClientServiceIndexesManager#subscriberIndexes 中加入希望监听这个Service的ClientId, <Service service, Set<String clientId>>
如果是首次加入,Set<String clientId> 中首次加入这个ClientId,
则发布 ServiceEvent.ServiceSubscribedEvent 事件
1.2.1 ServiceEvent.ServiceSubscribedEvent 事件
NamingSubscriberServiceV2Impl#onEvent 响应这个事件
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));
构建默认延迟500毫秒的PushDelayTask,任务中包含了Service和新增订阅的ClientId,注意PushDelayTask.pushToAll = false,并且指定了接受的客户端PushDelayTask#targetClients为刚刚订阅的客户端
PushDelayTask的主要逻辑就是向刚刚订阅的nacos客户端推送服务端消息 NotifySubscriberRequest,包含了最新的服务及实例列表
PushDelayTask的具体执行逻辑如下
1.2.2 后面的 DistroProtocol#sync 一节会描述类似的 delayTaskEngine.addTask的详细过程。我们现在直接介绍重要部分。
1.2.2.1 PushDelayTask会进行合并 PushDelayTask#merge,合并的逻辑就是合并要推送的nacos server列表。
1.2.2.2 PushDelayTaskExecuteEngine#taskProcessors为空,由PushDelayTaskExecuteEngine#defaultTaskProcessor处理PushDelayTask,也就是PushDelayTaskExecuteEngine.PushDelayTaskProcessor
PushDelayTaskExecuteEngine.PushDelayTaskProcessor#process
=> NamingExecuteTaskDispatcher#dispatchAndExecuteTask 执行 PushExecuteTask任务
=> PushDelayTaskExecuteEngine#addTask
1.2.2.3 PushDelayTaskExecuteEngine#taskProcessors为空,PushDelayTaskExecuteEngine#defaultTaskProcessor为null
使用PushDelayTaskExecuteEngine#executeWorkers执行PushExecuteTask任务
1.2.2.4 PushExecuteTask#run
PushExecuteTask#generatePushData 生成 PushDataWrapper
ServiceInfo数据来自于ServiceStorage#getPushData,ServiceMetadata来自于NamingMetadataManager#getServiceMetadata,一起封装为PushDataWrapper
PushExecuteTask#getTargetClientIds 获取需要发送的客户端。
由于这个PushDelayTask构建时,PushDelayTask.pushToAll = false,并且指定了接受的客户端PushDelayTask#targetClients为刚刚订阅的客户端,所以只会向刚刚订阅的客户端发送
PushExecutorDelegate#doPushWithCallback
=> PushExecutorRpcImpl#doPushWithCallback
=> RpcPushService#pushWithCallback
向刚刚订阅的nacos客户端推送服务端消息 NotifySubscriberRequest,包含了最新的服务及实例列表
- 发布
SubscribeServiceTraceEvent
回到EphemeralClientOperationServiceImpl#subscribeService函数的末尾,发布SubscribeServiceTraceEvent
不过默认没有监听器响应这个事件
DistroProtocol
DistroProtocol#sync
给其他所有nacos server节点除了自身,发送消息。
针对每个nacos server节点和微服务实例,积攒默认1秒的消息,合并为1个消息。
经过层层异步调用到达DistroDelayTaskProcessor#process,根据DistroProtocol#sync中不同的action,生成不同的Task,DistroSyncDeleteTask或是DistroSyncChangeTask。然后在异步执行这个Task。最终通过DistroClientTransportAgent#syncData()向其它nacos server实例发送grpc消息DistroDataRequest,消息中包含了action以及必要的微服务实例信息。
详细流程如下:
- 封装为默认1秒的
DistroDelayTask,通过NacosDelayTaskExecuteEngine#addTask,将任务加入到NacosDelayTaskExecuteEngine#tasks中。这里NacosDelayTaskExecuteEngine的实现是DistroDelayTaskExecuteEngine。DistroTaskEngineHolder构建DistroDelayTaskExecuteEngine时,会加入默认的DistroDelayTaskProcessor`。
addTask函数中,先用key查找是否已存在任务,如果存在,newTask.merge(existTask);新旧任务合并。然后放回map。
key为DistroKey,使用resourceKey resourceType targetServer共同确定。
比如resourceKey = Client.getClientId() resourceType="Nacos:Naming:v2:ClientData" targetServer=target nacos server ip:port。
也就是同一个微服务实例,同一个nacos server实例,同一个resourceType,在1秒内的任务会被合并为一个。
DistroDelayTask的合并逻辑就是根据createTime字段,保留最新的任务的action,舍弃旧的action。
-
NacosDelayTaskExecuteEngine初始化时,会启动一个线程,每100ms执行一次ProcessRunnable。
也就是每100ms,ProcessRunnable中会遍历NacosDelayTaskExecuteEngine#tasks中所有的任务,取出已经到时间的任务。对每个已经到时间的任务执行
NacosTaskProcessor processor = getProcessor(taskKey);
processor.process(task)
DistroDelayTaskExecuteEngine中的NacosTaskProcessor processor = getProcessor(taskKey);实现是将DistroKey.resourceType作为key寻找NacosTaskProcessor,比如"Nacos:Naming:v2:ClientData"
默认DistroDelayTaskExecuteEngine#taskProcessors是空的,没有注册任何NacosTaskProcessor,最终所有getProcessor走的都是defaultTaskProcessor DistroDelayTaskProcessor
DistroDelayTaskProcessor#process
根据action,构建DistroSyncDeleteTask或是DistroSyncChangeTask,加入到DistroExecuteTaskExecuteEngine#addTask。
-
DistroExecuteTaskExecuteEngine#addTask内部根据DistroKey寻找NacosTaskProcessor,执行processor.process。
如果NacosTaskProcessor为null,则通过DistroKey在NacosExecuteTaskExecuteEngine#executeWorkers中找一个TaskExecuteWorker,执行worker.process。
默认DistroExecuteTaskExecuteEngine#taskProcessors也是空的,没有注册任何NacosTaskProcessor,而且DistroExecuteTaskExecuteEngine#defaultTaskProcessor也是null。最终均由TaskExecuteWorker#process执行。
TaskExecuteWorker#process则是将任务加入到TaskExecuteWorker#queue队列中。InnerWorker会不停的从这个队列中取出任务,并执行Runnable.run,也就是DistroSyncDeleteTask.run或是DistroSyncChangeTask.run。二者共用AbstractDistroExecuteTask#run
-
AbstractDistroExecuteTask#run内部
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
根据DistroKey.resourceType在DistroComponentHolder#transportAgentMap中寻找对应的DistroTransportAgent。默认只注册了DistroClientTransportAgent,也只用到了他。
如果transportAgent.supportCallbackTransport(),则执行doExecuteWithCallback(new DistroExecuteCallback());。默认DistroClientTransportAgent.supportCallbackTransport为true。
-
DistroSyncChangeTask#doExecuteWithCallback
DistroData distroData = getDistroData(type);传入DistroKey.resourceType,也就是"Nacos:Naming:v2:ClientData"。
DistroComponentHolder#dataStorageMap,这个map中只注册了一个条目,最终获取到DistroClientDataProcessor。
DistroClientDataProcessor#getDistroData根据DistroKey.resourceKey从ConnectionBasedClientManager#getClient获取到当前连接的微服务客户端数据ConnectionBasedClient。
ConnectionBasedClient#generateSyncData生成ClientSyncData,里边包含了微服务实例的相关完整数据,同时ClientSyncData#attributes加入了"revision"条目,也就是这一ClientSyncData的版本号
再封装为DistroData
-
DistroClientTransportAgent#syncData()
封装为DistroDataRequest,通过ClusterRpcClientProxy#asyncRequest经grpc发送出去。
DistroProtocol#onReceive
DistroDataRequest的请求处理,在接收方的nacos server实例内部
DistroDataRequestHandler#handle
=> DistroProtocol#onReceive
=> DistroClientDataProcessor#processData
从消息中反序列化出ClientSyncData
=> DistroClientDataProcessor#handlerClientSyncData
-
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
ClientManagerDelegate#syncClientConnected
=>ConnectionBasedClientManager#syncClientConnected
根据attributes中的"connectionType"获取不同类型的ClientFactory,默认ConnectionBasedClientFactory
ConnectionBasedClientFactory构建出ConnectionBasedClient
ConnectionBasedClientManager#clients中如果不存在,才加入新的<connectionId, Client> 条目,此时新的Client内部没有具体连接信息。
Client client = clientManager.getClient(clientSyncData.getClientId());
取出 上一步新建的ClientupgradeClient(client, clientSyncData);
DistroClientDataProcessor#upgradeClient
3.1 ServiceManager#getSingleton
服务实例注册消息 一节中提过
ServiceManager#singletonRepository ServiceManager#namespaceSingletonMaps 中加入Service
3.2 判断当前Client内部是否存在相同的微服务实例信息
如果不存在,则会执行一遍与EphemeralClientOperationServiceImpl#registerInstance内类似的服务实例注册逻辑。然后继续向其它nacos server实例广播该服务的注册消息。但是其它nacos server实例再次执行DistroProtocol#onReceive => upgradeClient(client, clientSyncData); 时,在当前这一步,发现已经存在相同的微服务实例信息,则不再执行后续的服务实例注册逻辑,也就阻止了无尽的消息广播。
接下来的逻辑与EphemeralClientOperationServiceImpl#registerInstance内的服务实例注册逻辑类似。
3.2.1 client.addServiceInstance(singleton, instancePublishInfo);
AbstractClient#publishers中加入微服务实例信息
发布ClientEvent.ClientChangedEvent事件。
该事件会再次触发
=> DistroClientDataProcessor#syncToAllServer
=> DistroProtocol#sync
一直到当前函数的流程,但是接收方如果已经有这个微服务实例信息的话,就不会执行client.addServiceInstance(singleton, instancePublishInfo);,也就不会继续产生ClientEvent.ClientChangedEvent事件
3.2.2 ClientOperationEvent.ClientRegisterServiceEvent事件
3.2.3 MetadataEvent.InstanceMetadataEvent事件
3.3 如果当前Client存在了外来ClientSyncData中没有的Service信息,则删除本地该实例的Service信息,发布ClientOperationEvent.ClientDeregisterServiceEvent事件。
也就是一个nacos client实例是可以提供多个Service的,这里在同步一个nacos client实例的Service列表。
3.4 client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
同步本地的Client的版本号
至此,没看到如果收到低版本号的Client消息时应该忽略的逻辑,猜测这里是有问题的。会将本地的客户端信息由新版本降低为低版本旧数据的可能。
一些问题
ServiceInfo#lastRefTime应该是nacos客户端用于定期轮询更新缓存用的。nacos server端应该是用不到这个字段。如果同一时刻,实例1向serverA发送deregister,实例2向serverB发送deregister,两个server互相广播消息,最终是如何一致的。
以serverA为例,收到实例1的变化时,会移除实例1的实例信息,然后更新ServiceStorage#serviceDataIndexes缓存。收到实例2的变化时,会移除实例2的实例信息,然后再更新缓存。无论这些消息、步骤如何异步,如何乱序。最终都可以保证有一个更新缓存的步骤是在最后发生的。这次更新缓存久一点更新出正确的结果。如果一个客户端快速重启,向serverA发送deregister,向serverB发送register。nacos server会如何显示这一实例的状态。
nacos-spring-cloud没有使用@RefreshScope注解,应该不会出现这种状况。
如果出现这种情况,感觉极端情况下是会不停的重复广播注册、去注册消息的。但是DelayTask的merge与心跳机制最终应该会统一稳定为已注册状态。
断点
- 发送 grpc请求
io.grpc.internal.ClientCallImpl#sendMessage
ClientStream clientStream = ((RetriableStream) this.stream).state.drainedSubstreams.iterator().next().stream;
String local = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString();
String remote = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();
return "send: " + method.getFullMethodName() + " " + local + " -> " + remote + "\n" + message;
- 接受 grpc 请求
io.grpc.stub.ServerCalls.UnaryServerCallHandler.UnaryServerCallListener#onHalfClosemethod.invoke(request, responseObserver);一行
String local = call.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString();
String remote = call.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();
return "receive: " + call.method.getFullMethodName() + " " + remote + " -> " + local + "\n" + request;
可以通过过滤掉频繁的jraft消息
!((ServerCallImpl) call).method.getFullMethodName().contains("jraft")
- 接受 http请求
org.springframework.web.servlet.DispatcherServlet#doDispatch
"receive: " + request.getMethod() + " " +request.getRequestURL() + " params: " + new Gson().toJson(request.getParameterMap())