- 配置优先级
bootstrap.yml
application.yml
分为启动时获取配置,和客户端获取发布后获取配置.
while (retryTimes < rpcClientConfig.retryTimes() && System.currentTimeMillis() < timeoutMills + start) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected, current status:" + rpcClientStatus.get());
}
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
throw new NacosException(SERVER_ERROR, "Unknown Exception.");
}
if (response instanceof ErrorResponse) {
if (response.getErrorCode() == NacosException.UN_REGISTER) {
synchronized (this) {
waitReconnect = true;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
switchServerAsync();
}
}
}
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// return response.
lastActiveTimeStamp = System.currentTimeMillis();
return response;
}
While循环+轮训,轮训不到数据改变重试的间隔.
并且将数据同步给其他的服务节点.
https://www.processon.com/view/link/62d678c31e08531cf8db16ef
配置中西源码图.
本地无->从nacos config server中获取,拉去远程文件,保存到本地。比较文件md5值,配置更改的时候会有NacosConfigChangeEvent事件.
服务启动会dump 从数据库load配置到本地.
refreshscope会清空bean缓存,导致定时任务失效.
比较md5值,如果不同则认为更新了文件,保存到磁盘中.
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
String type, String encryptedDataKey) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
CacheItem ci = makeSure(groupKey, encryptedDataKey, false);
ci.setType(type);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey)) {
DUMP_LOG.warn("[dump-ignore] the content is old. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
lastModifiedTs);
return true;
}
if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) && DiskUtil.targetFile(dataId, group, tenant).exists()) {
DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
lastModifiedTs);
} else if (!PropertyUtil.isDirectRead()) {
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);
return true;
} catch (IOException ioe) {
DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
.contains(DISK_QUATA_EN)) {
// Protect from disk full.
FATAL_LOG.error("磁盘满自杀退出", ioe);
System.exit(0);
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
- 接下来更新Event事件
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey, false, null, tag));
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());
push(rpcPushRetryTask);
-- 推送
connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void onResponse(Response response) {
if (response.isSuccess()) {
requestCallBack.onSuccess();
} else {
requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
}
}
@Override
public void onException(Throwable e) {
requestCallBack.onFail(e);
}
});
grpc推送过去的。
会放入到客户端的queue中,客户端会自己去轮训.
放入客户端的阻塞队列,客户端会从队列中获取任务并且执行.
public void addListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
group = blank2defaultGroup(group);
CacheData cache = addCacheDataIfAbsent(dataId, group);
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setDiscard(false);
cache.setSyncWithServer(false);
//listenExecutebell.offer(bellItem); 放入到阻塞队列中,
agent.notifyListenConfig();
// 间隔Interval去获取
/**
* public void startInternal() {
* executor.schedule(() -> {
* while (!executor.isShutdown() && !executor.isTerminated()) {
* try {
* listenExecutebell.poll(5L, TimeUnit.SECONDS);
* if (executor.isShutdown() || executor.isTerminated()) {
* continue;
* }
* executeConfigListen();
* } catch (Throwable e) {
* LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
* }
* }
* }, 0L, TimeUnit.MILLISECONDS);
*
* }
*/
}
}
更改配置 比较md5值,从而确定文件内容是否发生了改变,放入rpc阻塞队列 通知其他节点,持久化。publisher发送事件,事件中通过grpc connection往client push数据,client端的addListener接受到事件,放入到queue阻塞队列中,客户端interval间隔的去取数据,然后更新缓存》