先来看看一个配置中心需要满足哪些功能?
- 客户端从服务端拉取配置
- 客户端缓存拉取到的配置
- 客户端监听配置变更(异步)
- @Value的动态注入
引题:Spring如何加载外部化配置?
首先 关于Spring Cloud 如何实现的外部化配置加载?
是
org.springframework.cloud.bootstrap.config.PropertySourceLocator
,在Nacos的spring-cloud-starter-alibaba-nacos-config-2.2.6.RELEASE.jar!\META-INF\spring.factories
中 配置了NacosConfigBootstrapConfiguration
,看一下这个配置类:@Configuration(proxyBeanMethods = false) @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true) public class NacosConfigBootstrapConfiguration { @Bean @ConditionalOnMissingBean public NacosConfigProperties nacosConfigProperties() { return new NacosConfigProperties(); } @Bean @ConditionalOnMissingBean public NacosConfigManager nacosConfigManager( NacosConfigProperties nacosConfigProperties) { return new NacosConfigManager(nacosConfigProperties); } @Bean public NacosPropertySourceLocator nacosPropertySourceLocator( NacosConfigManager nacosConfigManager) { return new NacosPropertySourceLocator(nacosConfigManager); } }
这里面的
NacosPropertySourceLocator
继承了PropertySourceLocator
,那问题来了,Spring Cloud是如何处理这些PropertySourceLocator
的呢?
PropertySourceLocator加载原理
在spring boot项目启动时,有一个prepareContext的方法,它会回调所有实现了ApplicationContextInitializer的实例,来做一些初始化工作。
ApplicationContextInitializer是Spring框架原有的东西, 它的主要作用就是在ConfigurableApplicationContext
类型(或者子类型)的ApplicationContext
做 refresh之前,允许我们对ConfiurableApplicationContext
的实例做进一步的设置和处理。
它可以用在需要对应用程序上下文进行编程初始化的web应用程序中,比如根据上下文环境注册propertySource,或者配置文件。而Config 的这个配置中心的需求恰好需要这样一个机制来完成。
PropertySourceBootstrapConfiguration
// 在Spring IoC容器刷新之前 回调 ApplicationContextInitializer.initialize 方法
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(PropertySourceBootstrapProperties.class)
public class PropertySourceBootstrapConfiguration implements
ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {
/**
* Bootstrap property source name.
*/
public static final String BOOTSTRAP_PROPERTY_SOURCE_NAME = "bootstrapProperties";
// 这里会将 PropertySourceLocator 的所有实现给注入进来 实现在BootstrapImportSelector里
@Autowired(required = false)
private List<PropertySourceLocator> propertySourceLocators = new ArrayList<>();
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
List<PropertySource<?>> composite = new ArrayList<>();
// 排序
AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
boolean empty = true;
ConfigurableEnvironment environment = applicationContext.getEnvironment();
for (PropertySourceLocator locator : this.propertySourceLocators) {
// 回调所有实现PropertySourceLocator接口实例的locate方法,并收集到source这个集合中。
Collection<PropertySource<?>> source = locator.locateCollection(environment);
if (source == null || source.size() == 0) {
continue;
}
List<PropertySource<?>> sourceList = new ArrayList<>();
for (PropertySource<?> p : source) {
if (p instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerable = (EnumerablePropertySource<?>) p;
sourceList.add(new BootstrapPropertySource<>(enumerable));
}
else {
sourceList.add(new SimpleBootstrapPropertySource(p));
}
}
logger.info("Located property source: " + sourceList);
composite.addAll(sourceList);
empty = false;
}
// 只有propertysource不为空的情况,才会设置到environment中
if (!empty) {
MutablePropertySources propertySources = environment.getPropertySources();
String logConfig = environment.resolvePlaceholders("${logging.config:}");
LogFile logFile = LogFile.get(environment);
for (PropertySource<?> p : environment.getPropertySources()) {
if (p.getName().startsWith(BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
propertySources.remove(p.getName());
}
}
insertPropertySources(propertySources, composite);
reinitializeLoggingSystem(environment, logConfig, logFile);
setLogLevels(applicationContext, environment);
handleIncludedProfiles(environment);
}
}
}
Nacos客户端加载配置
知道了Spring Cloud提供的接口,把目光放到Nacos里面。从上面的代码可知,Spring Cloud会分别调用每个
PropertySourceLocator
的locateCollection(org.springframework.core.env.Environment)
方法,那就看一下这个方法。其中:locateCollection
是接口的默认方法,最终的核心还是locate
!
NacosPropertySourceLocator#locate
public PropertySource<?> locate(Environment env) {
nacosConfigProperties.setEnvironment(env);
ConfigService configService = nacosConfigManager.getConfigService();
if (null == configService) {
log.warn("no instance of config service found, can't load config from nacos");
return null;
}
long timeout = nacosConfigProperties.getTimeout();
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties.getName();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
// 这里的dataId是项目名
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);
// 加载共享配置
loadSharedConfiguration(composite);
// 加载扩展配置
loadExtConfiguration(composite);
// 加载应用配置
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
NacosPropertySourceLocator#loadApplicationConfiguration
private void loadApplicationConfiguration( CompositePropertySource compositePropertySource,
String dataIdPrefix,
NacosConfigProperties properties,
Environment environment) {
String fileExtension = properties.getFileExtension(); //文件后缀 json、yml、properties等
String nacosGroup = properties.getGroup(); // DEFAULT_GROUP
// load directly once by default 没有后缀的
loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,
fileExtension, true);
// load with suffix, which have a higher priority than the default 加上后缀
loadNacosDataIfPresent(compositePropertySource,
dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension,
true);
// Loaded with profile, which have a higher priority than the suffix 加上激活的profile
for (String profile : environment.getActiveProfiles()) {
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
fileExtension, true);
}
}
NacosPropertySourceLocator#loadNacosDataIfPresent
private void loadNacosDataIfPresent(final CompositePropertySource composite,
final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
if (null == dataId || dataId.trim().length() < 1) {
return;
}
if (null == group || group.trim().length() < 1) {
return;
}
NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,
fileExtension, isRefreshable);
// 把属性源保存到compositePropertySource中
this.addFirstPropertySource(composite, propertySource, false);
}
NacosPropertySourceLocator#loadNacosPropertySource
private NacosPropertySource loadNacosPropertySource(final String dataId, final String group,
String fileExtension,
boolean isRefreshable) {
// 是否支持自动刷新 第一次需要从远端拿
if (NacosContextRefresher.getRefreshCount() != 0) {
if (!isRefreshable) {
return NacosPropertySourceRepository.getNacosPropertySource(dataId, group);
}
}
return nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);
}
非自动刷新
com.alibaba.cloud.nacos.NacosPropertySourceRepository#getNacosPropertySource
就是本地的一个Map, key的值是 dataId + "," + group
private final static ConcurrentHashMap<String, NacosPropertySource> NACOS_PROPERTY_SOURCE_REPOSITORY = new ConcurrentHashMap<>();
public static NacosPropertySource getNacosPropertySource(String dataId, String group) {
return NACOS_PROPERTY_SOURCE_REPOSITORY.get(getMapKey(dataId, group));
}
自动刷新
com.alibaba.cloud.nacos.client.NacosPropertySourceBuilder#build
NacosPropertySource build(String dataId, String group, String fileExtension,
boolean isRefreshable) {
// 请求http接口的方式 从nacos上拿数据 然后封装为PropertySource
// 请求地址是:/v1/cs/configs
List<PropertySource<?>> propertySources = loadNacosData(dataId, group, fileExtension);
NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources,
group, dataId,
new Date(),
isRefreshable);
// 插入到本地的Map中去
NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);
return nacosPropertySource;
}
NacosPropertySourceBuilder#loadNacosData
loadNacosData
这个方法的核心逻辑是configService.getConfig(dataId, group, timeout)
com.alibaba.nacos.client.config.NacosConfigService#getConfigInner
从这里开始 就是Nacos的代码 与Spring Cloud无关了。
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
// timeoutMs默认是3000
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
// group如果为空则是DEFAULT_GROUP 否则是传入的.trim()
group = blank2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// 优先使用本地配置
String content=LocalConfigInfoProcessor.getFailover(agent.getName(),dataId, group,tenant);
if (content != null) {
cr.setContent(content);
String encryptedDataKey = LocalEncryptedDataKeyProcessor
.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try {
// 这里就是与nacos server交互了
ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(response.getContent());
cr.setEncryptedDataKey(response.getEncryptedDataKey());
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
}
// 这里是从本地快照里面取
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
String encryptedDataKey = LocalEncryptedDataKeyProcessor
.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
cr.setEncryptedDataKey(encryptedDataKey);
// 没有具体实现 可以忽略
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig
public ConfigResponse getServerConfig(String dataId, String group, String tenant,
long readTimeout)throws NacosException {
ConfigResponse configResponse = new ConfigResponse();
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpRestResult<String> result = null;
try {
Map<String, String> params = new HashMap<String, String>(3);
if (StringUtils.isBlank(tenant)) {
params.put("dataId", dataId);
params.put("group", group);
} else {
params.put("dataId", dataId);
params.put("group", group);
params.put("tenant", tenant);
}
// CONFIG_CONTROLLER_PATH = /v1/cs/configs
// agent = ServerHttpAgent 里面默认用的是CloseableHttpClient
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params,
agent.getEncode(), readTimeout);
} catch (Exception ex) {
throw new NacosException(NacosException.SERVER_ERROR, ex);
}
switch (result.getCode()) {
case HttpURLConnection.HTTP_OK:
// 保存到本地快照 然后返回
LocalConfigInfoProcessor.saveSnapshot(agent.getName(),dataId,group,tenant,
result.getData());
configResponse.setContent(result.getData());
String configType;
if (result.getHeader().getValue(CONFIG_TYPE) != null) {
configType = result.getHeader().getValue(CONFIG_TYPE);
} else {
configType = ConfigType.TEXT.getType();
}
configResponse.setConfigType(configType);
String encryptedDataKey = result.getHeader().getValue(ENCRYPTED_DATA_KEY);
LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(),dataId,
group,tenant,
encryptedDataKey);
configResponse.setEncryptedDataKey(encryptedDataKey);
return configResponse;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(),dataId,group,tenant, null);
LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId,
group, tenant, null);
return configResponse;
case HttpURLConnection.HTTP_CONFLICT: {
throw new NacosException(NacosException.CONFLICT,
"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
throw new NacosException(result.getCode(), result.getMessage());
}
default: {
throw new NacosException(result.getCode(),
"http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant="
+ tenant);
}
}
}
Nacos服务端处理拉取配置请求
到服务端这里 需要下载源码去看,没有maven的GAV可以引入,因此 下面的代码都是粘贴自源码。
com.alibaba.nacos.config.server.controller.ConfigController#getConfig
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam String dataId, @RequestParam String group,
@RequestParam(required = false, defaultValue = "") String tenant,
@RequestParam(required = false) String tag)
throws IOException, ServletException, NacosException {
// check tenant
ParamUtils.checkTenant(tenant);
tenant = NamespaceUtil.processNamespaceParameter(tenant);
// check params
ParamUtils.checkParam(dataId, group, "datumId", "content");
ParamUtils.checkParam(tag);
final String clientIp = RequestUtil.getRemoteIp(request);
inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
com.alibaba.nacos.config.server.controller.ConfigServletInner#doGetConfig
public String doGetConfig(HttpServletRequest request, HttpServletResponse response,
String dataId, String group, String tenant, String tag,
String clientIp) throws IOException, ServletException {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
String autoTag = request.getHeader("Vipserver-Tag");
String requestIpApp = RequestUtil.getAppName(request);
// 加读写锁 lockResult>0加锁成功 =0表示缓存不存在 <0加锁失败
int lockResult = tryConfigReadLock(groupKey);
final String requestIp = RequestUtil.getRemoteIp(request);
boolean isBeta = false;
if (lockResult > 0) {
// LockResult > 0 means cacheItem is not null
// and other thread can`t delete this cacheItem 这里的代码拿下面分析 太TM长了
} else if (lockResult == 0) {
// 缓存项目前不存在 返回404
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
} else {
// 表示有其他线程正在写 返回409 呆会重试
response.setStatus(HttpServletResponse.SC_CONFLICT);
response.getWriter().println("requested file is being modified, please try later.");
return HttpServletResponse.SC_CONFLICT + "";
}
return HttpServletResponse.SC_OK + "";
}
加锁成功 准备读
这里面删除了一些逻辑 就按照正常的配置看源码 太TM长了
FileInputStream fis = null;
try {
String md5 = Constants.NULL;
long lastModified = 0L;
// ConcurrentHashMap<String, CacheItem> CACHE 从这里面取的值 groupKey -> cacheItem.
CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
// 是否是beta发布 在控制台页面配置
if (cacheItem.isBeta() && cacheItem.getIps4Beta().contains(clientIp)) {
isBeta = true;
}
// 配置格式 转换为对应的枚举 然后设置到响应头中 解释一下,为啥要用一下枚举转换一下 是为了保证像YML、Yml以及
// yml或者前后带了空格 这样的 都可以转换为yml 其实就是忽略大小写清空前后空格 来一次转换 那么 这么做不复杂么?
final String configType =
(null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();
response.setHeader("Config-Type", configType);
FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType);
String contentTypeHeader = fileTypeEnum.getContentType();
response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);
File file = null;
ConfigInfoBase configInfoBase = null;
PrintWriter out = null;
if (isBeta) { // 判断是否是beta 请求头中加上isBeta=true
// 这里的代码删了
} else {
if (StringUtils.isBlank(tag)) { // 标签为空
if (isUseTag(cacheItem, autoTag)) {
// 这里的代码删了
} else {
// 获取缓存项目的MD5 MD5的值再每一次配置更新会发生变化
md5 = cacheItem.getMd5();
lastModified = cacheItem.getLastModifiedTs();
if (PropertyUtil.isDirectRead()) { //如果单机模式 从数据库中取
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
// 集群模式下 从文件中取 速度比数据库快
file = DiskUtil.targetFile(dataId, group, tenant);
}
if (configInfoBase == null && fileNotExist(file)) { // 判断是否取出来了
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
ConfigTraceService.PULL_EVENT_NOTFOUND, -1,
requestIp);
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.getWriter().println("config data not exist");
return HttpServletResponse.SC_NOT_FOUND + "";
}
}
} else {
// 这里的代码删了 和上面大同小异
}
}
response.setHeader(Constants.CONTENT_MD5, md5);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
if (PropertyUtil.isDirectRead()) {
response.setDateHeader("Last-Modified", lastModified);
} else {
fis = new FileInputStream(file);
response.setDateHeader("Last-Modified", file.lastModified());
}
if (PropertyUtil.isDirectRead()) {
out = response.getWriter();
out.print(configInfoBase.getContent());
out.flush();
out.close();
} else {
fis.getChannel().transferTo(0L, fis.getChannel().size(),
Channels.newChannel(response.getOutputStream()));
}
final long delayed = System.currentTimeMillis() - lastModified;
// TODO distinguish pull-get && push-get
/*
Otherwise, delayed cannot be used as the basis of push delay directly,
because the delayed value of active get requests is very large.
*/
ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);
} finally {
releaseConfigReadLock(groupKey);
IoUtils.closeQuietly(fis);
}
Nacos客户端更新配置缓存
在客户端启动成功之后,在内存中做了缓存,但是后续配置的变化,客户端会取更新缓存,那么更新流程如下:
- 客户端发起长轮训请求
- 服务端收到请求以后,先比较服务端缓存中的数据是否相同
- 如果不同,则直接返回
- 如果相同,则通过schedule延迟29.5s之后再执行比较
- 为了保证当服务端在29.5s之内发生数据变化能够及时通知给客户端,服务端采用事件订阅的方式来监听服务端本地数据变化的事件,一旦收到事件,则触发DataChangeTask的通知,并且遍历allStubs队列中的ClientLongPolling,把结果写回到客户端,就完成了一次数据的推送
- 如果 DataChangeTask 任务完成了数据的 “推送” 之后,ClientLongPolling 中的调度任务又开始执行了怎么办呢? 很简单,只要在进行 “推送” 操作之前,先将原来等待执行的调度任务取消掉就可以了,这样就防止了推送操作写完响应数据之后,调度任务又去写响应数据,这时肯定会报错的。所以,在ClientLongPolling方法中,最开始的一个步骤就是删除订阅事件
缓存的更新一定是异步来做的,那就找一下哪里有异步的操作。在ClientWorker中,ClientWorker的初始化看这里
com.alibaba.nacos.client.config.impl.ClientWorker#ClientWorker
public ClientWorker(final HttpAgent agent,
final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
init(properties);
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 固定延迟时间的线程
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo(); // 检查配置
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
com.alibaba.nacos.client.config.impl.ClientWorker#checkConfigInfo
public void checkConfigInfo() {
// cacheMap = new ConcurrentHashMap<String, CacheData>(); groupKey -> cacheData
int listenerSize = cacheMap.size();
// Round up the longingTaskCount. ParamUtil.getPerTaskConfigSize() = 3000
// 分片
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// The task list is no order.So it maybe has issues when changing.
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable
LongPollingRunnable实现了Runnable,因此 只看run方法即可。这里面的代码 只能自己打断点去看。
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
// 内存数据与本地数据对比
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config 可能发生变化的数据与Nacos Server对比
// 这里面发送了长轮询请求。返回值是服务端明确告知 确实存在变更的key
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas,inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// 这里是客户端拉取配置的请求 在Nacos客户端加载配置那里
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(response.getContent()); // 更新本地缓存
cache.setEncryptedDataKey(response.getEncryptedDataKey());
if (null != response.getConfigType()) {
cache.setType(response.getConfigType());
}
} catch (NacosException ioe) {
// 删掉了日志打印
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() ||
inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId,
cacheData.group,
cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
// 再次执行
executorService.execute(this);
} catch (Throwable e) {
// 出现错误 就延时执行
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr
长轮询的实现 其实就是调用http的请求,在发送请求时,请求头设置了
Long-Pulling-Timeout
与Long-Pulling-Timeout-No-Hangup
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
Map<String, String> params = new HashMap<String, String>(2);
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap<String, String>(2);
headers.put("Long-Pulling-Timeout", "" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put("Long-Pulling-Timeout-No-Hangup", "true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
HttpRestResult<String> result = agent.httpPost("/v1/cs/configs/listener", headers,
params,agent.getEncode(),readTimeoutMs);
if (result.ok()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData());
} else {
setHealthServer(false);
}
} catch (Exception e) {
setHealthServer(false);
throw e;
}
return Collections.emptyList();
}
Nacos 服务端处理长轮询
com.alibaba.nacos.config.server.controller.ConfigController#listener
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException {
// Long polling. 判断当前请求是否支持长轮询
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map,
probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// Compatible with short polling logic. MD5 逐项对比 返回的是不相同的返回
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
// Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
Loggers.AUTH.info("new content:" + newResult);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp,
Map<String, String> clientMd5Map, int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// and one response is returned 500 ms in advance to avoid client timeout.
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) { // 默认false
timeout = Math.max(10000, getFixedPollingInterval());
} else {
long start = System.currentTimeMillis();
// 先比较 是否发生了变更
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// Must be called by http thread, or send response. 将请求变成异步的
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L);
// 获取当前的会话 这个会话的掌控权 完全在服务端 通过线程池提交一个任务
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip,
probeRequestSize, timeout, appName,
tag));
}
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#run
public void run() {
// 这里依旧是提交任务 在29.5s 之后执行 = newSingleScheduledExecutorService
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
// Delete subsciber's relations.
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
List<String> changedGroups = MD5Util
.compareMd5((HttpServletRequest) asyncContext.getRequest(),
(HttpServletResponse) asyncContext.getResponse(),
clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
sendResponse(null);
}
} catch (Throwable t) {
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
// 记录当前任务
allSubs.add(this);
}
但是这里有一个问题 就是 在这29.5s内 发生了数据的变更 难道这个长轮询还要一直等到29.5s结束才返回数据么?
当然不是,客户端会话保存在了allSubs
中,只需要注册一个监听 去响应发生变化的事件即可。
Nacos客户端监听配置的变化
这里想写一些东西 但是不知道写什么 就是代码寻找的过程 也是没有的
com.alibaba.nacos.config.server.service.LongPollingService#LongPollingService
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
// Register LocalDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
// 这里就看到了注册监听者 监听 LocalDataChangeEvent
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
// 这里又提交了一个任务
ConfigExecutor.executeLongPolling(
new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return LocalDataChangeEvent.class;
}
});
}
com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run
public void run() {
try {
ConfigCacheService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// If published tag is not in the beta list, then it skipped.
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue;
}
// If published tag is not in the tag list, then it skipped.
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
// 这里就是给客户端响应了
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {}
}
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#sendResponse
asyncContext.complete(); 返回异步请求用的
void sendResponse(List<String> changedGroups) {
// 将29.5s的定时任务取消
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
generateResponse(changedGroups);
}
void generateResponse(List<String> changedGroups) {
if (null == changedGroups) { // 无变化的情况下
// Tell web container to send http response.
asyncContext.complete();
return;
}
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
try {
final String respString = MD5Util.compareMd5ResultString(changedGroups);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(respString);
asyncContext.complete();
} catch (Exception ex) {
PULL_LOG.error(ex.toString(), ex);
asyncContext.complete();
}
}
SpringCloud如何同步更新缓存
在 Nacos客户端加载配置这里,有一个很重要的判断,就是NacosContextRefresher.getRefreshCount() != 0
,这个refreshCount在哪里做的更新呢?看一下这个类的定义吧:
class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>
,这里其实监听了事件ApplicationReadyEvent
,那就看一下他的onApplicationEvent
NacosContextRefresher#onApplicationEvent
public void onApplicationEvent(ApplicationReadyEvent event) {
// many Spring context
if (this.ready.compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}
private void registerNacosListenersForApplications() {
// 判断 配置spring.cloud.nacos.config.refresh-enabled 默认是true
if (isRefreshEnabled()) {
for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) {
// propertySource.isRefreshable()默认是true
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
// refreshCount自增1
refreshCountIncrement();
// 记录刷新历史 供spring acurator使用
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// 发布刷新 Nacos配置 的事件 这个事件是Spring Cloud提供刷新事件 分别刷新了
// 1. Environment上下文 2.@RefreshScope标注的properties以及对应的Bean
applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh config"));
}
});
try {
configService.addListener(dataKey, groupKey, listener);
} catch (NacosException e) {
}
}
org.springframework.cloud.endpoint.event.RefreshEventListener
这里少贴一点代码 还是看
onApplicationEvent
方法,这个方法调用了handle(RefreshEvent event)
,而方法里面调用了this.refresh.refresh()
,即org.springframework.cloud.context.refresh.ContextRefresher#refresh
org.springframework.cloud.context.refresh.ContextRefresher#refresh
public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll();
return keys;
}
// 更新 environment上下文
public synchronized Set<String> refreshEnvironment() {
Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
addConfigFilesToEnvironment();
Map<String, Object> after = extract(this.context.getEnvironment().getPropertySources());
Set<String> keys = changes(before, after.keySet());
// org.springframework.cloud.context.properties.ConfigurationPropertiesRebinder 负责监听
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
return keys;
}
org.springframework.cloud.context.scope.refresh.RefreshScope#refreshAll
public void refreshAll() {
// 销毁Bean
super.destroy();
// 这里没有找到事件的监听者
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
Nacos集群间数据同步
Nacos发布配置的接口处理在
com.alibaba.nacos.config.server.controller.ConfigController#publishConfig
,具体看这里面的逻辑,代码不粘了,直接引到数据同步的位置。
ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
// 集群间节点数据
Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (Member member : ipList) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs,
member.getAddress(), evt.isBeta));
}
// 再次提交异步任务
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate,queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncTask#run
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (memberManager.hasMember(targetIp)) {
// 判断节点是否健康 不健康 则延时执行
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
if (unHealthNeedDelay) {
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(),task.getGroup(),
task.getTenant(),null,task.getLastModified(),
InetUtils.getSelfIP(),
ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, task.target);
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP,
InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
AuthHeaderUtil.addIdentityToHeader(header);
// 其实还是发送了http的请求 url=/v1/cs/communication/dataChange
restTemplate.get(task.url, header, Query.EMPTY, String.class,
new AsyncNotifyCallBack(task));
}
}
}
}
com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam String dataId,
@RequestParam String group,
@RequestParam(required=false,defaultValue ="") String tenant,
@RequestParam(required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs =StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
com.alibaba.nacos.config.server.service.dump.DumpService#dump
public void dump(String dataId, String group, String tenant, String tag, long lastModified,
String handleIp, boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
}
com.alibaba.nacos.config.server.manager.TaskManager#addTask
public void addTask(Object key, AbstractDelayTask newTask) {
// 这一部分代码是 super.addTask(key, newTask);
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
// 将任务丢到一个Map 让定时任务轮询处理
tasks.put(key, newTask);
} finally {
lock.unlock();
}
// super.addTask(key, newTask);代码结束
// 指标监控
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}
com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasks
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed 这里是具体的处理逻辑
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
这里面的代码就不贴了 好多 大概知道这么个流程吧
总结
磁盘缓存与内存缓存有啥关系
他们属于两套生态 磁盘缓存属于Nacos本身的,但是内存缓存属于Spring Cloud,Nacos可以独立于Spring Cloud使用。
内存缓存的位置:
com.alibaba.cloud.nacos.NacosPropertySourceRepository#NACOS_PROPERTY_SOURCE_REPOSITORY
磁盘缓存的位置:
com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor#getFailover
C:\Users\lilg\nacos\config\fixed-42.193.97.198_8848_nacos\data\config-data\DEFAULT_GROUP\nacos-config
ClientWorker初始化路径
总结一下,路径的链路从
com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration#nacosConfigManager
到
com.alibaba.cloud.nacos.NacosConfigManager#createConfigService
再到com.alibaba.nacos.api.config.ConfigFactory#createConfigService
,这里面通过反射调用了com.alibaba.nacos.client.config.NacosConfigService#NacosConfigService
的构造方法,然后实例化了ClientWorker
// NacosConfigManager的构造方法在NacosConfigBootstrapConfiguration中有调用
// com.alibaba.cloud.nacos.NacosConfigManager#NacosConfigManager
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
this.nacosConfigProperties = nacosConfigProperties;
// Compatible with older code in NacosConfigProperties,It will be deleted in the future.
createConfigService(nacosConfigProperties);
}
static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {
if (Objects.isNull(service)) {
synchronized (NacosConfigManager.class) {
try {
if (Objects.isNull(service)) {
service = NacosFactory.createConfigService(
nacosConfigProperties.assembleConfigServiceProperties());
}
}
catch (NacosException e) {
log.error(e.getMessage());
throw new NacosConnectionFailureException(
nacosConfigProperties.getServerAddr(), e.getMessage(), e);
}
}
}
return service;
}
// com.alibaba.nacos.api.config.ConfigFactory#createConfigService(java.util.Properties)
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
// 根据上面的反射
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
initNamespace(properties);
this.configFilterChainManager = new ConfigFilterChainManager(properties);
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}