Nacos Config源码

先来看看一个配置中心需要满足哪些功能?

  • 客户端从服务端拉取配置
  • 客户端缓存拉取到的配置
  • 客户端监听配置变更(异步)
  • @Value的动态注入

引题:Spring如何加载外部化配置?

首先 关于Spring Cloud 如何实现的外部化配置加载?

org.springframework.cloud.bootstrap.config.PropertySourceLocator,在Nacos的spring-cloud-starter-alibaba-nacos-config-2.2.6.RELEASE.jar!\META-INF\spring.factories中 配置了NacosConfigBootstrapConfiguration,看一下这个配置类:

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {

  @Bean
  @ConditionalOnMissingBean
  public NacosConfigProperties nacosConfigProperties() {
      return new NacosConfigProperties();
  }

  @Bean
  @ConditionalOnMissingBean
  public NacosConfigManager nacosConfigManager(
          NacosConfigProperties nacosConfigProperties) {
      return new NacosConfigManager(nacosConfigProperties);
  }

  @Bean
  public NacosPropertySourceLocator nacosPropertySourceLocator(
          NacosConfigManager nacosConfigManager) {
      return new NacosPropertySourceLocator(nacosConfigManager);
  }

}

这里面的NacosPropertySourceLocator继承了PropertySourceLocator,那问题来了,Spring Cloud是如何处理这些PropertySourceLocator的呢?

PropertySourceLocator加载原理

在spring boot项目启动时,有一个prepareContext的方法,它会回调所有实现了ApplicationContextInitializer的实例,来做一些初始化工作。

ApplicationContextInitializer是Spring框架原有的东西, 它的主要作用就是在ConfigurableApplicationContext类型(或者子类型)的ApplicationContext做 refresh之前,允许我们对ConfiurableApplicationContext的实例做进一步的设置和处理。

它可以用在需要对应用程序上下文进行编程初始化的web应用程序中,比如根据上下文环境注册propertySource,或者配置文件。而Config 的这个配置中心的需求恰好需要这样一个机制来完成。

PropertySourceBootstrapConfiguration

// 在Spring IoC容器刷新之前 回调 ApplicationContextInitializer.initialize 方法
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(PropertySourceBootstrapProperties.class)
public class PropertySourceBootstrapConfiguration implements
        ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {

    /**
     * Bootstrap property source name.
     */
    public static final String BOOTSTRAP_PROPERTY_SOURCE_NAME = "bootstrapProperties";

    // 这里会将 PropertySourceLocator 的所有实现给注入进来 实现在BootstrapImportSelector里
    @Autowired(required = false)
    private List<PropertySourceLocator> propertySourceLocators = new ArrayList<>();

    @Override
    public void initialize(ConfigurableApplicationContext applicationContext) {
        List<PropertySource<?>> composite = new ArrayList<>();
        // 排序
        AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
        boolean empty = true;
        ConfigurableEnvironment environment = applicationContext.getEnvironment();
        for (PropertySourceLocator locator : this.propertySourceLocators) {
            // 回调所有实现PropertySourceLocator接口实例的locate方法,并收集到source这个集合中。
            Collection<PropertySource<?>> source = locator.locateCollection(environment);
            if (source == null || source.size() == 0) {
                continue;
            }
            List<PropertySource<?>> sourceList = new ArrayList<>();
            for (PropertySource<?> p : source) {
                if (p instanceof EnumerablePropertySource) {
                    EnumerablePropertySource<?> enumerable = (EnumerablePropertySource<?>) p;
                    sourceList.add(new BootstrapPropertySource<>(enumerable));
                }
                else {
                    sourceList.add(new SimpleBootstrapPropertySource(p));
                }
            }
            logger.info("Located property source: " + sourceList);
            composite.addAll(sourceList);
            empty = false;
        }
        // 只有propertysource不为空的情况,才会设置到environment中
        if (!empty) {
            MutablePropertySources propertySources = environment.getPropertySources();
            String logConfig = environment.resolvePlaceholders("${logging.config:}");
            LogFile logFile = LogFile.get(environment);
            for (PropertySource<?> p : environment.getPropertySources()) {
                if (p.getName().startsWith(BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
                    propertySources.remove(p.getName());
                }
            }
            insertPropertySources(propertySources, composite);
            reinitializeLoggingSystem(environment, logConfig, logFile);
            setLogLevels(applicationContext, environment);
            handleIncludedProfiles(environment);
        }
    }
}

Nacos客户端加载配置

知道了Spring Cloud提供的接口,把目光放到Nacos里面。从上面的代码可知,Spring Cloud会分别调用每个PropertySourceLocatorlocateCollection(org.springframework.core.env.Environment)方法,那就看一下这个方法。其中:locateCollection是接口的默认方法,最终的核心还是locate

NacosPropertySourceLocator#locate

public PropertySource<?> locate(Environment env) {
    nacosConfigProperties.setEnvironment(env);
    ConfigService configService = nacosConfigManager.getConfigService();

    if (null == configService) {
        log.warn("no instance of config service found, can't load config from nacos");
        return null;
    }
    long timeout = nacosConfigProperties.getTimeout();
    nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
                                                                timeout);
    String name = nacosConfigProperties.getName();

    String dataIdPrefix = nacosConfigProperties.getPrefix();
    if (StringUtils.isEmpty(dataIdPrefix)) {
        dataIdPrefix = name;
    }
    // 这里的dataId是项目名
    if (StringUtils.isEmpty(dataIdPrefix)) {
        dataIdPrefix = env.getProperty("spring.application.name");
    }

    CompositePropertySource composite = new CompositePropertySource(
        NACOS_PROPERTY_SOURCE_NAME);

    // 加载共享配置
    loadSharedConfiguration(composite);
    // 加载扩展配置
    loadExtConfiguration(composite);
    // 加载应用配置
    loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
    return composite;
}

NacosPropertySourceLocator#loadApplicationConfiguration

private void loadApplicationConfiguration( CompositePropertySource compositePropertySource,
                                          String dataIdPrefix,
                                          NacosConfigProperties properties, 
                                          Environment environment) {
    String fileExtension = properties.getFileExtension(); //文件后缀 json、yml、properties等
    String nacosGroup = properties.getGroup(); // DEFAULT_GROUP
    // load directly once by default 没有后缀的
    loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,
                           fileExtension, true);
    // load with suffix, which have a higher priority than the default 加上后缀
    loadNacosDataIfPresent(compositePropertySource,
                           dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension,
                           true);
    // Loaded with profile, which have a higher priority than the suffix 加上激活的profile
    for (String profile : environment.getActiveProfiles()) {
        String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
        loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
                               fileExtension, true);
    }

}

NacosPropertySourceLocator#loadNacosDataIfPresent

private void loadNacosDataIfPresent(final CompositePropertySource composite,
      final String dataId, final String group, String fileExtension,
      boolean isRefreshable) {
    if (null == dataId || dataId.trim().length() < 1) {
        return;
    }
    if (null == group || group.trim().length() < 1) {
        return;
    }
    NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,
         fileExtension, isRefreshable);
    // 把属性源保存到compositePropertySource中
    this.addFirstPropertySource(composite, propertySource, false);
}

NacosPropertySourceLocator#loadNacosPropertySource

private NacosPropertySource loadNacosPropertySource(final String dataId, final String group, 
                                                    String fileExtension,
                                                    boolean isRefreshable) {
    // 是否支持自动刷新 第一次需要从远端拿
    if (NacosContextRefresher.getRefreshCount() != 0) {
        if (!isRefreshable) {
            return NacosPropertySourceRepository.getNacosPropertySource(dataId, group);
        }
    }
    return nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);
}

非自动刷新

com.alibaba.cloud.nacos.NacosPropertySourceRepository#getNacosPropertySource

就是本地的一个Map, key的值是 dataId + "," + group

private final static ConcurrentHashMap<String, NacosPropertySource> NACOS_PROPERTY_SOURCE_REPOSITORY = new ConcurrentHashMap<>();
public static NacosPropertySource getNacosPropertySource(String dataId, String group) {
    return NACOS_PROPERTY_SOURCE_REPOSITORY.get(getMapKey(dataId, group));
}

自动刷新

com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder#build

NacosPropertySource build(String dataId, String group, String fileExtension,
                          boolean isRefreshable) {
    // 请求http接口的方式 从nacos上拿数据 然后封装为PropertySource
    // 请求地址是:/v1/cs/configs
    List<PropertySource<?>> propertySources = loadNacosData(dataId, group, fileExtension);
    NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources,
                                                                      group, dataId, 
                                                                      new Date(), 
                                                                      isRefreshable);
    // 插入到本地的Map中去
    NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);
    return nacosPropertySource;
}

NacosPropertySourceBuilder#loadNacosData

loadNacosData这个方法的核心逻辑是configService.getConfig(dataId, group, timeout)

com.alibaba.nacos.client.config.NacosConfigService#getConfigInner

从这里开始 就是Nacos的代码 与Spring Cloud无关了。

public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
    return getConfigInner(namespace, dataId, group, timeoutMs);
}
// timeoutMs默认是3000
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    // group如果为空则是DEFAULT_GROUP 否则是传入的.trim()
    group = blank2defaultGroup(group);
    ParamUtils.checkKeyParam(dataId, group);
    ConfigResponse cr = new ConfigResponse();

    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);

    // 优先使用本地配置
    String content=LocalConfigInfoProcessor.getFailover(agent.getName(),dataId, group,tenant);
    if (content != null) {
        
        cr.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
            .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cr.setEncryptedDataKey(encryptedDataKey);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }

    try {
        // 这里就是与nacos server交互了
        ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs);
        cr.setContent(response.getContent());
        cr.setEncryptedDataKey(response.getEncryptedDataKey());

        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();

        return content;
    } catch (NacosException ioe) {
        if (NacosException.NO_RIGHT == ioe.getErrCode()) {
            throw ioe;
        }
    }
    // 这里是从本地快照里面取
    content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
    cr.setContent(content);
    String encryptedDataKey = LocalEncryptedDataKeyProcessor
        .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
    cr.setEncryptedDataKey(encryptedDataKey);
    // 没有具体实现 可以忽略
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
}

com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig

public ConfigResponse getServerConfig(String dataId, String group, String tenant, 
                                      long readTimeout)throws NacosException {
    ConfigResponse configResponse = new ConfigResponse();
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }

    HttpRestResult<String> result = null;
    try {
        Map<String, String> params = new HashMap<String, String>(3);
        if (StringUtils.isBlank(tenant)) {
            params.put("dataId", dataId);
            params.put("group", group);
        } else {
            params.put("dataId", dataId);
            params.put("group", group);
            params.put("tenant", tenant);
        }
        // CONFIG_CONTROLLER_PATH = /v1/cs/configs
        // agent = ServerHttpAgent 里面默认用的是CloseableHttpClient
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, 
                               agent.getEncode(), readTimeout);
    } catch (Exception ex) {
        
        throw new NacosException(NacosException.SERVER_ERROR, ex);
    }

    switch (result.getCode()) {
        case HttpURLConnection.HTTP_OK:
            // 保存到本地快照 然后返回
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(),dataId,group,tenant,
                                                  result.getData());
            configResponse.setContent(result.getData());
            String configType;
            if (result.getHeader().getValue(CONFIG_TYPE) != null) {
                configType = result.getHeader().getValue(CONFIG_TYPE);
            } else {
                configType = ConfigType.TEXT.getType();
            }
            configResponse.setConfigType(configType);
            String encryptedDataKey = result.getHeader().getValue(ENCRYPTED_DATA_KEY);
            LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(),dataId, 
                                                                      group,tenant,
                                                                      encryptedDataKey);
            configResponse.setEncryptedDataKey(encryptedDataKey);
            return configResponse;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(),dataId,group,tenant, null);
            LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId,
                                                                      group, tenant, null);
            return configResponse;
        case HttpURLConnection.HTTP_CONFLICT: {
            throw new NacosException(NacosException.CONFLICT,
                                     "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }
        case HttpURLConnection.HTTP_FORBIDDEN: {
            throw new NacosException(result.getCode(), result.getMessage());
        }
        default: {
            throw new NacosException(result.getCode(),
                                     "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant="
                                     + tenant);
        }
    }
}

Nacos服务端处理拉取配置请求

到服务端这里 需要下载源码去看,没有maven的GAV可以引入,因此 下面的代码都是粘贴自源码。

com.alibaba.nacos.config.server.controller.ConfigController#getConfig

@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
                      @RequestParam String dataId, @RequestParam String group,
                      @RequestParam(required = false, defaultValue = "") String tenant,
                      @RequestParam(required = false) String tag)
    throws IOException, ServletException, NacosException {
    // check tenant
    ParamUtils.checkTenant(tenant);
    tenant = NamespaceUtil.processNamespaceParameter(tenant);
    // check params
    ParamUtils.checkParam(dataId, group, "datumId", "content");
    ParamUtils.checkParam(tag);

    final String clientIp = RequestUtil.getRemoteIp(request);
    inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}

com.alibaba.nacos.config.server.controller.ConfigServletInner#doGetConfig

public String doGetConfig(HttpServletRequest request, HttpServletResponse response, 
                          String dataId, String group, String tenant, String tag, 
                          String clientIp) throws IOException, ServletException {
    final String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String autoTag = request.getHeader("Vipserver-Tag");
    String requestIpApp = RequestUtil.getAppName(request);
    // 加读写锁 lockResult>0加锁成功 =0表示缓存不存在 <0加锁失败
    int lockResult = tryConfigReadLock(groupKey);

    final String requestIp = RequestUtil.getRemoteIp(request);
    boolean isBeta = false;
    if (lockResult > 0) {
        // LockResult > 0 means cacheItem is not null 
        // and other thread can`t delete this cacheItem 这里的代码拿下面分析 太TM长了
    } else if (lockResult == 0) {

        // 缓存项目前不存在 返回404
        ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1, 
                                        ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);

        response.setStatus(HttpServletResponse.SC_NOT_FOUND);
        response.getWriter().println("config data not exist");
        return HttpServletResponse.SC_NOT_FOUND + "";

    } else {
        // 表示有其他线程正在写 返回409 呆会重试
        response.setStatus(HttpServletResponse.SC_CONFLICT);
        response.getWriter().println("requested file is being modified, please try later.");
        return HttpServletResponse.SC_CONFLICT + "";

    }

    return HttpServletResponse.SC_OK + "";
}

加锁成功 准备读

这里面删除了一些逻辑 就按照正常的配置看源码 太TM长了

FileInputStream fis = null;
try {
    String md5 = Constants.NULL;
    long lastModified = 0L;
    // ConcurrentHashMap<String, CacheItem> CACHE 从这里面取的值 groupKey -> cacheItem.
    CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
    // 是否是beta发布 在控制台页面配置
    if (cacheItem.isBeta() && cacheItem.getIps4Beta().contains(clientIp)) {
        isBeta = true;
    }
    // 配置格式 转换为对应的枚举 然后设置到响应头中 解释一下,为啥要用一下枚举转换一下 是为了保证像YML、Yml以及
    // yml或者前后带了空格 这样的 都可以转换为yml 其实就是忽略大小写清空前后空格 来一次转换 那么 这么做不复杂么?
    final String configType =
        (null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();
    response.setHeader("Config-Type", configType);
    FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType);
    String contentTypeHeader = fileTypeEnum.getContentType();
    response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);

    File file = null;
    ConfigInfoBase configInfoBase = null;
    PrintWriter out = null;
    if (isBeta) { // 判断是否是beta 请求头中加上isBeta=true
        // 这里的代码删了
    } else {
        if (StringUtils.isBlank(tag)) { // 标签为空
            if (isUseTag(cacheItem, autoTag)) {
                // 这里的代码删了
            } else {
                // 获取缓存项目的MD5 MD5的值再每一次配置更新会发生变化
                md5 = cacheItem.getMd5();
                lastModified = cacheItem.getLastModifiedTs();
                if (PropertyUtil.isDirectRead()) { //如果单机模式 从数据库中取
                    configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
                } else {
                    // 集群模式下 从文件中取 速度比数据库快
                    file = DiskUtil.targetFile(dataId, group, tenant);
                }
                if (configInfoBase == null && fileNotExist(file)) { // 判断是否取出来了
                    
                    ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
                                                    ConfigTraceService.PULL_EVENT_NOTFOUND, -1,
                                                    requestIp);
                    response.setStatus(HttpServletResponse.SC_NOT_FOUND);
                    response.getWriter().println("config data not exist");
                    return HttpServletResponse.SC_NOT_FOUND + "";
                }
            }
        } else {
            // 这里的代码删了 和上面大同小异
        }
    }

    response.setHeader(Constants.CONTENT_MD5, md5);

    // Disable cache.
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    if (PropertyUtil.isDirectRead()) {
        response.setDateHeader("Last-Modified", lastModified);
    } else {
        fis = new FileInputStream(file);
        response.setDateHeader("Last-Modified", file.lastModified());
    }

    if (PropertyUtil.isDirectRead()) {
        out = response.getWriter();
        out.print(configInfoBase.getContent());
        out.flush();
        out.close();
    } else {
        fis.getChannel().transferTo(0L, fis.getChannel().size(), 
                                    Channels.newChannel(response.getOutputStream()));
    }

    final long delayed = System.currentTimeMillis() - lastModified;

    // TODO distinguish pull-get && push-get
    /*
     Otherwise, delayed cannot be used as the basis of push delay directly,
     because the delayed value of active get requests is very large.
    */
    ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
                                    ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);

} finally {
    releaseConfigReadLock(groupKey);
    IoUtils.closeQuietly(fis);
}

Nacos客户端更新配置缓存

在客户端启动成功之后,在内存中做了缓存,但是后续配置的变化,客户端会取更新缓存,那么更新流程如下:

  • 客户端发起长轮训请求
  • 服务端收到请求以后,先比较服务端缓存中的数据是否相同
    • 如果不同,则直接返回
    • 如果相同,则通过schedule延迟29.5s之后再执行比较
  • 为了保证当服务端在29.5s之内发生数据变化能够及时通知给客户端,服务端采用事件订阅的方式来监听服务端本地数据变化的事件,一旦收到事件,则触发DataChangeTask的通知,并且遍历allStubs队列中的ClientLongPolling,把结果写回到客户端,就完成了一次数据的推送
  • 如果 DataChangeTask 任务完成了数据的 “推送” 之后,ClientLongPolling 中的调度任务又开始执行了怎么办呢? 很简单,只要在进行 “推送” 操作之前,先将原来等待执行的调度任务取消掉就可以了,这样就防止了推送操作写完响应数据之后,调度任务又去写响应数据,这时肯定会报错的。所以,在ClientLongPolling方法中,最开始的一个步骤就是删除订阅事件

缓存的更新一定是异步来做的,那就找一下哪里有异步的操作。在ClientWorker中,ClientWorker的初始化看这里

com.alibaba.nacos.client.config.impl.ClientWorker#ClientWorker

public ClientWorker(final HttpAgent agent, 
                    final ConfigFilterChainManager configFilterChainManager,
                    final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;
    init(properties);

    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    this.executorService = Executors
        .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
    // 固定延迟时间的线程
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo(); // 检查配置
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

com.alibaba.nacos.client.config.impl.ClientWorker#checkConfigInfo

public void checkConfigInfo() {
    // cacheMap = new ConcurrentHashMap<String, CacheData>();  groupKey -> cacheData
    int listenerSize = cacheMap.size();
    // Round up the longingTaskCount. ParamUtil.getPerTaskConfigSize() = 3000 
    // 分片
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable

LongPollingRunnable实现了Runnable,因此 只看run方法即可。这里面的代码 只能自己打断点去看。

public void run() {

    List<CacheData> cacheDatas = new ArrayList<CacheData>();
    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // check failover config
        for (CacheData cacheData : cacheMap.values()) {
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    // 内存数据与本地数据对比
                    checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }

        // check server config 可能发生变化的数据与Nacos Server对比 
        // 这里面发送了长轮询请求。返回值是服务端明确告知 确实存在变更的key
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas,inInitializingCacheList);
        
        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                // 这里是客户端拉取配置的请求 在Nacos客户端加载配置那里
                ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(response.getContent()); // 更新本地缓存
                cache.setEncryptedDataKey(response.getEncryptedDataKey());
                if (null != response.getConfigType()) {
                    cache.setType(response.getConfigType());
                }
            } catch (NacosException ioe) {
                // 删掉了日志打印
            }
        }
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() ||
                inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId,
                                                                       cacheData.group,
                                                                       cacheData.tenant))) {
                
                cacheData.checkListenerMd5();
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();
        // 再次执行
        executorService.execute(this);

    } catch (Throwable e) {

        // 出现错误 就延时执行
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr

长轮询的实现 其实就是调用http的请求,在发送请求时,请求头设置了Long-Pulling-TimeoutLong-Pulling-Timeout-No-Hangup

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {

    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);

    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }

    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }

    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.

        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        HttpRestResult<String> result = agent.httpPost("/v1/cs/configs/listener", headers,
                                                       params,agent.getEncode(),readTimeoutMs);

        if (result.ok()) {
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.getData());
        } else {
            setHealthServer(false);
        }
    } catch (Exception e) {
        setHealthServer(false);
        throw e;
    }
    return Collections.emptyList();
}

Nacos 服务端处理长轮询

com.alibaba.nacos.config.server.controller.ConfigController#listener

@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }

    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

    Map<String, String> clientMd5Map;
    try {
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }

    // do long-polling
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
                              Map<String, String> clientMd5Map, int probeRequestSize) 
    throws IOException {

    // Long polling. 判断当前请求是否支持长轮询
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map,
                                                probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }

    // Compatible with short polling logic. MD5 逐项对比 返回的是不相同的返回
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);

    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);

    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);

    // Before 2.0.4 version, return value is put into header.
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }

    Loggers.AUTH.info("new content:" + newResult);

    // Disable cache.
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}

com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, 
                                 Map<String, String> clientMd5Map, int probeRequestSize) {

    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    String tag = req.getHeader("Vipserver-Tag");
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);

    // and one response is returned 500 ms in advance to avoid client timeout.
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) { // 默认false
        timeout = Math.max(10000, getFixedPollingInterval());
    } else {
        long start = System.currentTimeMillis();
        // 先比较 是否发生了变更
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
            return;
        }
    }
    String ip = RequestUtil.getRemoteIp(req);

    // Must be called by http thread, or send response. 将请求变成异步的
    final AsyncContext asyncContext = req.startAsync();

    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L);
    // 获取当前的会话 这个会话的掌控权 完全在服务端 通过线程池提交一个任务
    ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip,
                                                            probeRequestSize, timeout, appName,
                                                            tag));
}

com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#run

public void run() {
    // 这里依旧是提交任务 在29.5s 之后执行 = newSingleScheduledExecutorService
    asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
        @Override
        public void run() {
            try {
                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());

                // Delete subsciber's relations.
                allSubs.remove(ClientLongPolling.this);

                if (isFixedPolling()) {
                    
                    List<String> changedGroups = MD5Util
                        .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                    (HttpServletResponse) asyncContext.getResponse(),
                                    clientMd5Map);
                    if (changedGroups.size() > 0) {
                        sendResponse(changedGroups);
                    } else {
                        sendResponse(null);
                    }
                } else {
                    
                    sendResponse(null);
                }
            } catch (Throwable t) {
            }

        }

    }, timeoutTime, TimeUnit.MILLISECONDS);
    // 记录当前任务
    allSubs.add(this);
}

但是这里有一个问题 就是 在这29.5s内 发生了数据的变更 难道这个长轮询还要一直等到29.5s结束才返回数据么?

当然不是,客户端会话保存在了allSubs中,只需要注册一个监听 去响应发生变化的事件即可。

Nacos客户端监听配置的变化

这里想写一些东西 但是不知道写什么 就是代码寻找的过程 也是没有的

com.alibaba.nacos.config.server.service.LongPollingService#LongPollingService

public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();

    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);

    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);

    // 这里就看到了注册监听者 监听 LocalDataChangeEvent
    NotifyCenter.registerSubscriber(new Subscriber() {

        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) {
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    // 这里又提交了一个任务
                    ConfigExecutor.executeLongPolling(
                        new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }

        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });

}

com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run

public void run() {
    try {
        ConfigCacheService.getContentBetaMd5(groupKey);
        for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
            ClientLongPolling clientSub = iter.next();
            if (clientSub.clientMd5Map.containsKey(groupKey)) {
                // If published tag is not in the beta list, then it skipped.
                if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
                    continue;
                }

                // If published tag is not in the tag list, then it skipped.
                if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                    continue;
                }

                getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                iter.remove(); // Delete subscribers' relationships.
                // 这里就是给客户端响应了
                clientSub.sendResponse(Arrays.asList(groupKey));
            }
        }
    } catch (Throwable t) {}
}

com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#sendResponse

asyncContext.complete(); 返回异步请求用的

void sendResponse(List<String> changedGroups) {     
    // 将29.5s的定时任务取消
    if (null != asyncTimeoutFuture) {
        asyncTimeoutFuture.cancel(false);
    }
    generateResponse(changedGroups);
}
void generateResponse(List<String> changedGroups) {
    if (null == changedGroups) { // 无变化的情况下

        // Tell web container to send http response.
        asyncContext.complete();
        return;
    }

    HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();

    try {
        final String respString = MD5Util.compareMd5ResultString(changedGroups);

        // Disable cache.
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        response.getWriter().println(respString);
        asyncContext.complete();
    } catch (Exception ex) {
        PULL_LOG.error(ex.toString(), ex);
        asyncContext.complete();
    }
}

SpringCloud如何同步更新缓存

Nacos客户端加载配置这里,有一个很重要的判断,就是NacosContextRefresher.getRefreshCount() != 0,这个refreshCount在哪里做的更新呢?看一下这个类的定义吧:

class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>,这里其实监听了事件ApplicationReadyEvent,那就看一下他的onApplicationEvent

NacosContextRefresher#onApplicationEvent

public void onApplicationEvent(ApplicationReadyEvent event) {
    // many Spring context
    if (this.ready.compareAndSet(false, true)) {
        this.registerNacosListenersForApplications();
    }
}
private void registerNacosListenersForApplications() {
    // 判断 配置spring.cloud.nacos.config.refresh-enabled 默认是true
    if (isRefreshEnabled()) {
        for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) {
            // propertySource.isRefreshable()默认是true
            if (!propertySource.isRefreshable()) {
                continue;
            }
            String dataId = propertySource.getDataId();
            registerNacosListener(propertySource.getGroup(), dataId);
        }
    }
}
private void registerNacosListener(final String groupKey, final String dataKey) {
    String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
    Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() {
        @Override
        public void innerReceive(String dataId, String group,
                                 String configInfo) {
            // refreshCount自增1
            refreshCountIncrement();
            // 记录刷新历史 供spring acurator使用
            nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
            // 发布刷新 Nacos配置 的事件 这个事件是Spring Cloud提供刷新事件 分别刷新了
            // 1. Environment上下文 2.@RefreshScope标注的properties以及对应的Bean
            applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh config"));
        }
    });
    try {
        configService.addListener(dataKey, groupKey, listener);
    } catch (NacosException e) {
        
    }
}

org.springframework.cloud.endpoint.event.RefreshEventListener

这里少贴一点代码 还是看onApplicationEvent方法,这个方法调用了handle(RefreshEvent event),而方法里面调用了this.refresh.refresh(),即org.springframework.cloud.context.refresh.ContextRefresher#refresh

org.springframework.cloud.context.refresh.ContextRefresher#refresh

public synchronized Set<String> refresh() {
    Set<String> keys = refreshEnvironment();
    this.scope.refreshAll();
    return keys;
}
// 更新 environment上下文
public synchronized Set<String> refreshEnvironment() {
    Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
    addConfigFilesToEnvironment();
    Map<String, Object> after = extract(this.context.getEnvironment().getPropertySources());
    Set<String> keys = changes(before, after.keySet());
    // org.springframework.cloud.context.properties.ConfigurationPropertiesRebinder 负责监听
    this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
    return keys;
}

org.springframework.cloud.context.scope.refresh.RefreshScope#refreshAll

public void refreshAll() {
    // 销毁Bean
    super.destroy();
    // 这里没有找到事件的监听者
    this.context.publishEvent(new RefreshScopeRefreshedEvent());
}

Nacos集群间数据同步

Nacos发布配置的接口处理在com.alibaba.nacos.config.server.controller.ConfigController#publishConfig,具体看这里面的逻辑,代码不粘了,直接引到数据同步的位置。

ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));

com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService

public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;

    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);

    // Register A Subscriber to subscribe ConfigDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {

        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                // 集群间节点数据
                Collection<Member> ipList = memberManager.allMembers();

                // In fact, any type of queue here can be
                Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                for (Member member : ipList) {
                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, 
                                                   member.getAddress(), evt.isBeta));
                }
                // 再次提交异步任务
                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate,queue));
            }
        }

        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncTask#run

public void run() {
    executeAsyncInvoke();
}
private void executeAsyncInvoke() {
    while (!queue.isEmpty()) {
        NotifySingleTask task = queue.poll();
        String targetIp = task.getTargetIP();
        if (memberManager.hasMember(targetIp)) {
            // 判断节点是否健康 不健康 则延时执行
            boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
            if (unHealthNeedDelay) {
                // target ip is unhealthy, then put it in the notification list
                ConfigTraceService.logNotifyEvent(task.getDataId(),task.getGroup(),
                                                  task.getTenant(),null,task.getLastModified(),
                                                  InetUtils.getSelfIP(), 
                                                  ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                                                  0, task.target);
                // get delay time and set fail count to the task
                asyncTaskExecute(task);
            } else {
                Header header = Header.newInstance();
                header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
                                String.valueOf(task.getLastModified()));
                header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP,
                                InetUtils.getSelfIP());
                if (task.isBeta) {
                    header.addParam("isBeta", "true");
                }
                AuthHeaderUtil.addIdentityToHeader(header);
                // 其实还是发送了http的请求 url=/v1/cs/communication/dataChange
                restTemplate.get(task.url, header, Query.EMPTY, String.class, 
                                 new AsyncNotifyCallBack(task));
            }
        }
    }
}

com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam String dataId,
                                @RequestParam String group,
                                @RequestParam(required=false,defaultValue ="") String tenant,
                                @RequestParam(required = false) String tag) {
    dataId = dataId.trim();
    group = group.trim();
    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    long lastModifiedTs =StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    String isBetaStr = request.getHeader("isBeta");
    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
        dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    } else {
        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    }
    return true;
}

com.alibaba.nacos.config.server.service.dump.DumpService#dump

public void dump(String dataId, String group, String tenant, String tag, long lastModified, 
                 String handleIp, boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
}

com.alibaba.nacos.config.server.manager.TaskManager#addTask

public void addTask(Object key, AbstractDelayTask newTask) {
    // 这一部分代码是 super.addTask(key, newTask);
    lock.lock();
    try {
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            newTask.merge(existTask);
        }
        // 将任务丢到一个Map 让定时任务轮询处理
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
    // super.addTask(key, newTask);代码结束
    // 指标监控
    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}

com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasks

protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed 这里是具体的处理逻辑
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process

这里面的代码就不贴了 好多 大概知道这么个流程吧

总结

磁盘缓存与内存缓存有啥关系

他们属于两套生态 磁盘缓存属于Nacos本身的,但是内存缓存属于Spring Cloud,Nacos可以独立于Spring Cloud使用。

内存缓存的位置:

com.alibaba.cloud.nacos.NacosPropertySourceRepository#NACOS_PROPERTY_SOURCE_REPOSITORY

磁盘缓存的位置:

com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor#getFailover

C:\Users\lilg\nacos\config\fixed-42.193.97.198_8848_nacos\data\config-data\DEFAULT_GROUP\nacos-config

ClientWorker初始化路径

总结一下,路径的链路从com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration#nacosConfigManager

com.alibaba.cloud.nacos.NacosConfigManager#createConfigService再到com.alibaba.nacos.api.config.ConfigFactory#createConfigService,这里面通过反射调用了com.alibaba.nacos.client.config.NacosConfigService#NacosConfigService的构造方法,然后实例化了ClientWorker

// NacosConfigManager的构造方法在NacosConfigBootstrapConfiguration中有调用
// com.alibaba.cloud.nacos.NacosConfigManager#NacosConfigManager
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
    this.nacosConfigProperties = nacosConfigProperties;
    // Compatible with older code in NacosConfigProperties,It will be deleted in the future.
    createConfigService(nacosConfigProperties);
}
static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {
    if (Objects.isNull(service)) {
        synchronized (NacosConfigManager.class) {
            try {
                if (Objects.isNull(service)) {
                    service = NacosFactory.createConfigService(
                        nacosConfigProperties.assembleConfigServiceProperties());
                }
            }
            catch (NacosException e) {
                log.error(e.getMessage());
                throw new NacosConnectionFailureException(
                    nacosConfigProperties.getServerAddr(), e.getMessage(), e);
            }
        }
    }
    return service;
}
// com.alibaba.nacos.api.config.ConfigFactory#createConfigService(java.util.Properties)
public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
// 根据上面的反射
public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    initNamespace(properties);
    this.configFilterChainManager = new ConfigFilterChainManager(properties);

    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start();
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容