Nacos配置中心

  • 配置优先级

bootstrap.yml

application.yml

分为启动时获取配置,和客户端获取发布后获取配置.

  while (retryTimes < rpcClientConfig.retryTimes() && System.currentTimeMillis() < timeoutMills + start) {
            boolean waitReconnect = false;
            try {
                if (this.currentConnection == null || !isRunning()) {
                    waitReconnect = true;
                    throw new NacosException(NacosException.CLIENT_DISCONNECT,
                            "Client not connected, current status:" + rpcClientStatus.get());
                }
                response = this.currentConnection.request(request, timeoutMills);
                if (response == null) {
                    throw new NacosException(SERVER_ERROR, "Unknown Exception.");
                }
                if (response instanceof ErrorResponse) {
                    if (response.getErrorCode() == NacosException.UN_REGISTER) {
                        synchronized (this) {
                            waitReconnect = true;
                            if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                                LoggerUtils.printIfErrorEnabled(LOGGER,
                                        "Connection is unregistered, switch server, connectionId = {}, request = {}",
                                        currentConnection.getConnectionId(), request.getClass().getSimpleName());
                                switchServerAsync();
                            }
                        }
                        
                    }
                    throw new NacosException(response.getErrorCode(), response.getMessage());
                }
                // return response.
                lastActiveTimeStamp = System.currentTimeMillis();
                return response;
                
            }

While循环+轮训,轮训不到数据改变重试的间隔.
并且将数据同步给其他的服务节点.

https://www.processon.com/view/link/62d678c31e08531cf8db16ef
配置中西源码图.

本地无->从nacos config server中获取,拉去远程文件,保存到本地。比较文件md5值,配置更改的时候会有NacosConfigChangeEvent事件.

服务启动会dump 从数据库load配置到本地.

refreshscope会清空bean缓存,导致定时任务失效.

比较md5值,如果不同则认为更新了文件,保存到磁盘中.

 public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
            String type, String encryptedDataKey) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        CacheItem ci = makeSure(groupKey, encryptedDataKey, false);
        ci.setType(type);
        final int lockResult = tryWriteLock(groupKey);
        assert (lockResult != 0);
        
        if (lockResult < 0) {
            DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
            return false;
        }
        
        try {
            final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
            if (lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey)) {
                DUMP_LOG.warn("[dump-ignore] the content is old. groupKey={}, md5={}, lastModifiedOld={}, "
                                + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                        lastModifiedTs);
                return true;
            }
            if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) && DiskUtil.targetFile(dataId, group, tenant).exists()) {
                DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                                + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                        lastModifiedTs);
            } else if (!PropertyUtil.isDirectRead()) {
                DiskUtil.saveToDisk(dataId, group, tenant, content);
            }
            updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);
            return true;
        } catch (IOException ioe) {
            DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe);
            if (ioe.getMessage() != null) {
                String errMsg = ioe.getMessage();
                if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
                        .contains(DISK_QUATA_EN)) {
                    // Protect from disk full.
                    FATAL_LOG.error("磁盘满自杀退出", ioe);
                    System.exit(0);
                }
            }
            return false;
        } finally {
            releaseWriteLock(groupKey);
        }
  • 接下来更新Event事件
        NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey, false, null, tag));
      RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());
            push(rpcPushRetryTask);

-- 推送

  connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
                    
                    @Override
                    public Executor getExecutor() {
                        return executor;
                    }
                    
                    @Override
                    public void onResponse(Response response) {
                        if (response.isSuccess()) {
                            requestCallBack.onSuccess();
                        } else {
                            requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
                        }
                    }
                    
                    @Override
                    public void onException(Throwable e) {
                        requestCallBack.onFail(e);
                    }
                });

grpc推送过去的。

会放入到客户端的queue中,客户端会自己去轮训.

放入客户端的阻塞队列,客户端会从队列中获取任务并且执行.

public void addListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
        group = blank2defaultGroup(group);
        CacheData cache = addCacheDataIfAbsent(dataId, group);
        synchronized (cache) {
            
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setDiscard(false);
            cache.setSyncWithServer(false);
            //listenExecutebell.offer(bellItem); 放入到阻塞队列中,
            agent.notifyListenConfig();
            // 间隔Interval去获取
            /**
             *  public void startInternal() {
             *             executor.schedule(() -> {
             *                 while (!executor.isShutdown() && !executor.isTerminated()) {
             *                     try {
             *                         listenExecutebell.poll(5L, TimeUnit.SECONDS);
             *                         if (executor.isShutdown() || executor.isTerminated()) {
             *                             continue;
             *                         }
             *                         executeConfigListen();
             *                     } catch (Throwable e) {
             *                         LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
             *                     }
             *                 }
             *             }, 0L, TimeUnit.MILLISECONDS);
             *
             *         }
             */
        }
    }

更改配置 比较md5值,从而确定文件内容是否发生了改变,放入rpc阻塞队列 通知其他节点,持久化。publisher发送事件,事件中通过grpc connection往client push数据,client端的addListener接受到事件,放入到queue阻塞队列中,客户端interval间隔的去取数据,然后更新缓存》

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容