Nacos服务发现的领域模型
- Namespace:实现各环境的隔离(如开发、测试、预发、线上),默认public
- Group:不同服务可以分到同一个组,默认DEFAULT_GROUP
- Service:微服务
- Cluster:对指定微服务的一个虚拟划分,默认DEFAULT
- Instance:微服务实例
- persistentInstances:持久实例集合
- ephemeralInstances:临时实例集合
一、服务发现前
目前在Spring Cloud,RPC基本都是使用Feign去调用服务,Feign其实也是Ribbon的一个封装,主要功能,是将我们通常http请求服务这个过程帮我们封装起来,使我们使用时更加的简便,通过一个注解就能实现对服务的调用,对于ribbon的源码解析,参考这篇文章:https://www.jianshu.com/p/f3db11f045cc
ribbon最最底层也是实现spring cloud common包下的
org.springframework.cloud.alibaba.nacos.ribbon.NacosServerList
主要是ServiceInstanceChooser 下的继承类。org.springframework.cloud.client.loadbalancer.LoadBalancerClient
这是Ribbon实现负载均衡的父类接口,接下来一系列的接口实现最终会落到如何获取serverList这个问题是,答案在这个接口:com.netflix.loadbalancer.ServerList
接下来,就是服务发现组件的事情了
eureka对于这个接口的实现就是DiscoveryEnabledNIWSServerList
Nacos的实现就是org.springframework.cloud.alibaba.nacos.ribbon.NacosServerList,这也是我们的重点。
二、服务发现
Nacos Client发起RPC调用请求后,通过RibbonLoadBalancerClient的getLoadBalancer方法获取负载均衡器,因为Spring Cloud默认指定了ZoneAwareLoadBalancer,但是ZoneAwareLoadBalancer的构造函数初始化父类DynamicServerListLoadBalancer。
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。实现了下面两个功能:
- 服务实例在运行期间的动态更新。
- 对服务器实例清单的过滤功能,可以通过过滤器来选择地获取一批服务实例清单。
DynamicServerListLoadBalancer构造函数初始化的时候会调用restOfInit方法
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
//定时更新Eureka Client实例列表
enableAndInitLearnNewServersFeature();
//获取所有Eureka Client实例列表
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
}
enableAndInitLearnNewServersFeature()
每30秒定时更新Nacos Client实例列表
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
}
public class PollingServerListUpdater implements ServerListUpdater {
//更新服务实例在初始化之后延迟1秒后开始执行
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
//以30秒为周期重复执行
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
//以定时任务的方式进行服务列表的更新。
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
//创建一个Runnable的线程wrapperRunnable
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//具体更新服务实例列表的方法
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//为wrapperRunnable线程启动一个定时任务
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs, //1秒
refreshIntervalMs, //30秒
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
}
DynamicServerListLoadBalancer#updateListOfServers()
- 获取所有Nacos Client实例列表
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList();
if (this.serverListImpl != null) {
//实现从Nacos Server中获取服务可用实例列表
servers = this.serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
if (this.filter != null) {
servers = this.filter.getFilteredListOfServers((List)servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
}
}
//更新服务实例列表
this.updateAllServerList((List)servers);
}
}
从Nacos Server中获取服务可用实例列表,最终就调用到了NacosServerList类
public class NacosServerList extends AbstractServerList<NacosServer> {
private NacosDiscoveryProperties discoveryProperties;
private String serviceId;
@Override
public List<NacosServer> getUpdatedListOfServers() {
return getServers();
}
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
//获取服务实例列表
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
//将List<Instance>转换成List<NacosServer>数据,并返回。
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}
}
NacosNamingService.selectInstances(serviceName, new ArrayList<String>(), healthyOnly)
- HostReactor.getServiceInfo():拿到ServiceInfo对象
- 从ServiceInfo对象中取出List<Instance> hosts属性值并返回。
public class NacosNamingService implements NamingService {
private HostReactor hostReactor;
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
//erviceInfo封装了服务的集群实例信息
ServiceInfo serviceInfo;
if (subscribe) {
//获取ServiceInfo对象
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
//直接从Nacos服务端获取服务信息
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
//从ServiceInfo对象中取出List<Instance> hosts属性值并返回。
return selectInstances(serviceInfo, healthy);
}
}
hostReactor.getServiceInfoDirectlyFromServer
- 直接从Nacos服务端获取服务信息
public class HostReactor implements Closeable {
private final NamingProxy serverProxy;
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)
throws NacosException {
String result = serverProxy.queryList(serviceName, clusters, 0, false);
if (StringUtils.isNotEmpty(result)) {
return JacksonUtils.toObj(result, ServiceInfo.class);
}
return null;
}
}
public class NamingProxy implements Closeable {
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
}
hostReactor.getServiceInfo
- 获取ServiceInfo对象
public class HostReactor implements Closeable {
private final Map<String, ServiceInfo> serviceInfoMap;
private final Map<String, Object> updatingMap;
private static final long UPDATE_HOLD_INTERVAL = 5000L;
private final FailoverReactor failoverReactor;
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
//注册中心与服务提供者失联,会把该服务配置成failover状态
//从缓存serviceMap中获取信息
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//从本地缓存serviceInfoMap中获取服务信息
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
//如果本地缓存没有,则从注册中心获取,并更新本地缓存
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
//生成定时任务,延时1S后执行更新任务
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
//缓存中存在,直接返回
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
//生成定时任务,延迟1S执行,该任务会循环回调自己
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
}
HostReactor#updateServiceNow
- 从注册中心获取,并更新本地缓存
public class HostReactor implements Closeable {
private void updateServiceNow(String serviceName, String clusters) {
try {
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
public void updateService(String serviceName, String clusters) throws NacosException {
//先从本地缓存serviceInfoMap中获取
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
//从注册中心拉取服务列表 /instance/list
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
//解析结果,更新serviceInfoMap
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
}
UpdateTask
- 定时更新任务
public class HostReactor implements Closeable {
private static final long DEFAULT_DELAY = 1000L;
public class UpdateTask implements Runnable {
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
// 拿到当前的服务信息
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
//如果为null,说明本地没有,需要从服务端获取
if (serviceObj == null) {
//拉取最新的服务列表随后更新
updateService(serviceName, clusters);
return;
}
// 当前服务未及时更新 进行更新操作
//判断服务是否已过期,当前服务的最后一次更新时间 <= 全局的最后一次更新
if (serviceObj.getLastRefTime() <= lastRefTime) {
//调用updateService从服务端获取地址列表,更新服务列表
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
//如果服务已经被基于push机制的情况下做了更新,那么我们不需要覆盖本地服务。
//因为push过来的数据和pull数据不同,所以这里只是调用请求去刷新服务
refreshOnly(serviceName, clusters);
}
// 设置服务最新的更新时间
lastRefTime = serviceObj.getLastRefTime();
// 订阅被取消,如果没有实现订阅或者futureMap中不包含指定服务信息,则中断更新请求
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
// 继续下一次轮询 延后10s执行,实现重复轮询
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
}
}
服务端处理服务发现请求
- /instance/list方法
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
//获取服务
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
//如果服务为空
if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
checkIfDisabled(service);
List<Instance> srvedIPs;
//从clusterMap获取集群实例的IP集合
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// filter ips using selector:
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.set("hosts", JacksonUtils.createEmptyArrayNode());
result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
if (isCheck) {
result.put("reachProtectThreshold", false);
}
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return JacksonUtils.createEmptyJsonNode();
}
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
if (healthyOnly && !entry.getKey()) {
continue;
}
for (Instance instance : ips) {
// remove disabled instance:
if (!instance.isEnabled()) {
continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
}
ServiceManager#getService
获取服务
@Component
public class ServiceManager implements RecordListener<Service> {
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
public Service getService(String namespaceId, String serviceName) {
//namespaceId在注册中心不存在,直接返回null
if (serviceMap.get(namespaceId) == null) {
return null;
}
//根据namespaceId和serviceName获取服务
return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
}
Service.srvIPs
- 从clusterMap获取集群实例的IP集合
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
public List<Instance> srvIPs(List<String> clusters) {
if (CollectionUtils.isEmpty(clusters)) {
clusters = new ArrayList<>();
clusters.addAll(clusterMap.keySet());
}
return allIPs(clusters);
}
public List<Instance> allIPs(List<String> clusters) {
List<Instance> result = new ArrayList<>();
for (String cluster : clusters) {
Cluster clusterObj = clusterMap.get(cluster);
if (clusterObj == null) {
continue;
}
result.addAll(clusterObj.allIPs());
}
return result;
}
}
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
@JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>();
@JsonIgnore
private Set<Instance> ephemeralInstances = new HashSet<>();
//获取所有实例
public List<Instance> allIPs() {
List<Instance> allInstances = new ArrayList<>();
allInstances.addAll(persistentInstances);
allInstances.addAll(ephemeralInstances);
return allInstances;
}
}
至此,Nacos服务发现源码分析完毕!
参考:
https://blog.csdn.net/C18298182575/article/details/101549822