SpringCloud Alibaba Nacos 配置中心源码导读

Nacos配置中心

基于版本1.0.0

Nacos Client

使用Client的Demo代码

public class ClientDemo {
    public static void main(String[] args) {
        try {
            String serverAddr = "127.0.0.1:8848";
            String dataId = "nacos-demo.properties";
            String group = "DEFAULT_GROUP";
            Properties properties = new Properties();
            properties.put("serverAddr", serverAddr);
            ConfigService configService = NacosFactory.createConfigService(properties);
            //没有添加listener 直接获取服务器配置
            String content = configService.getConfig(dataId, group, 5000);
            System.out.println(content);
        } catch (NacosException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

整个过程创建了ConfigService,查看NacosFactory.createConfigService(properties);

public class NacosFactory {
    public static ConfigService createConfigService(Properties properties) 
        throws NacosException {
        //查看方法
        return ConfigFactory.createConfigService(properties);
    }
//.....
}

可以得知通过ConfigFactory工厂类创建ConfigService,继续查看,通过反射创建实例

public class ConfigFactory {
    public static ConfigService createConfigService(Properties properties) 
        throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            //反射创建实例方法
            ConfigService vendorImpl = 
                (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(-400, e.getMessage());
        }
    }
}

通过反射形式创建了NacosConfigService实例。ConfigService是个接口,其构造如下

public interface ConfigService {
    //获取配置信息
    String getConfig(String dataId, String group, long timeoutMs) 
        throws NacosException;
    //添加配置更新的监听器
    void addListener(String dataId, String group, Listener listener) 
        throws NacosException;
   //发布配置信息
    boolean publishConfig(String dataId, String group, String content) 
        throws NacosException;
   //删除配置
    boolean removeConfig(String dataId, String group) 
        throws NacosException;
    //移除监听器
    void removeListener(String dataId, String group, Listener listener);
    String getServerStatus();
}

NacosConfigService实现如下:

public class NacosConfigService implements ConfigService {
    private final long POST_TIMEOUT = 3000L;
    private static final String EMPTY = "";
    private HttpAgent agent;
    private ClientWorker worker;// 负责执行长轮询任务
    private String namespace;
    private String encode;
    private ConfigFilterChainManager configFilterChainManager = 
        new ConfigFilterChainManager();
    public NacosConfigService(Properties properties) throws NacosException {
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        initNamespace(properties);
        //服务信息,用于鉴权、服务地址等的配置信息设置
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        //启动了ServerHttpAgent,作用是监听服务器列表是否变化。同步方法。具体查看阅读源码了解
        agent.start();
        //长轮询任务实例,监听配置更新的关键
        worker = new ClientWorker(agent, configFilterChainManager);
    }
    //...
}

ClientWorker源码

    public ClientWorker(final HttpAgent agent, 
                        final ConfigFilterChainManager configFilterChainManager) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        executorService = Executors.newCachedThreadPool(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        executor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    //检查配置是否变化,通过cacheData(linstener)记录配置文件md5信息比较
                    checkConfigInfo();
                } catch (Throwable e) {
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);//每10毫秒执行一次
    }
    public void checkConfigInfo() {
        // cacheMap保存的 groupKey -> cacheData。groupkey以dataId+group定义
        int listenerSize = cacheMap.get().size();
        int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
                // 执行配置信息更新线程
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

LongPollingRunnable结构

    class LongPollingRunnable implements Runnable {
        private int taskId;
        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }
        public void run() {
            try {
                List<CacheData> cacheDatas = new ArrayList<CacheData>();
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            // 本地缓存检查
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                // 更新linstener记录的md5值
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                        }
                    }
                }
                List<String> inInitializingCacheList = new ArrayList<String>();
                //获取配置中心变化的配置信息,对inInitializingCacheList初始化
                List<String> changedGroupKeys = 
                    checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                // 配置发生变化则获取服务器最新配置
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        //获取服务器配置信息
              String content = getServerConfig(dataId, group, tenant, 3000L);
              CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(content);//设置listener变化配置内容
                    } catch (NacosException ioe) {
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        //判断监听器md5是否相同,不同更新最新md5  
                        //有变化这发起listener的receiveConfigInfo通知
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();
            } catch (Throwable e) {
            } finally {
                //再次执行任务,实现循环
                executorService.execute(this);
            }
        }
    }

checkUpdateDataIds代码中调用 checkUpdateConfigStr


    /**
     * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
     */
//probeUpdateString以dataId group md5 tenant命名
List<String> checkUpdateConfigStr(String probeUpdateString, 
                                  boolean isInitializingCacheList) {
    List<String> params = 
        Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    long timeout = TimeUnit.SECONDS.toMillis(30L);// 30秒超时
    List<String> headers = new ArrayList<String>(2);
    headers.add("Long-Pulling-Timeout");
    headers.add("" + timeout);
    // told server do not hang me up if new initializing cacheData added in
    // 初始化时直接返回不等待,具体看服务端实现
    if (isInitializingCacheList) {
        headers.add("Long-Pulling-Timeout-No-Hangup");
        headers.add("true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }
    try {
      //调用nacos Api Constants.CONFIG_CONTROLLER_PATH + "/listener 获取服务器配置更新信息
        HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
            agent.getEncode(), timeout);
        if (HttpURLConnection.HTTP_OK == result.code) {
            setHealthServer(true);
            // 返回有跟新的groupkey
            return parseUpdateDataIdResponse(result.content);
        } else {
            setHealthServer(false);
        }
    } catch (IOException e) {
    }
    return Collections.emptyList();
}

checkLocalConfig方法

    private void checkLocalConfig(CacheData cacheData) {
        final String dataId = cacheData.dataId;
        final String group = cacheData.group;
        final String tenant = cacheData.tenant;
//获取本地缓存的文件   
//LOCAL_SNAPSHOT_PATH = //System.getProperty("JM.SNAPSHOT.PATH",System.getProperty("user.home")) + //File.separator+ "nacos" + File.separator + "config/data"下是否有缓存;
        File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
        if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            return;
        }
        // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
        if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
            cacheData.setUseLocalConfigInfo(false);
            return;
        }
        // 有变更
        if (cacheData.isUseLocalConfigInfo() && path.exists()
            && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            return;
        }
    }

服务端代码导读

API接口/nacos/v1/cs/configs/listener,接口代码

    @RequestMapping(value = "/listener", method = RequestMethod.POST)
    public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        // client传过来的groupkey
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
        Map<String, String> clientMd5Map;
        try {
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        // do long-polling
        //inner为ConfigServletInner
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }

ConfigServletInner实现

public class ConfigServletInner {
    @Autowired
    private LongPollingService longPollingService;
    @Autowired
    private PersistService persistService;
    private static final int TRY_GET_LOCK_TIMES = 9;
    private static final int START_LONGPOLLING_VERSION_NUM = 204;
    /**
     * 轮询接口
     */
    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize)
        throws IOException, ServletException {
        // 长轮询
        if (LongPollingService.isSupportLongPolling(request)) {
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }
       // ...省略短轮询代码
        return HttpServletResponse.SC_OK + "";
    }

longPollingService.addLongPollingClient

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
        asyncContext.setTimeout(0L);
       // 延迟30s-delayTime执行,再看ClientLongPolling代码
        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

ClientLongPolling代码

ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {
    this.asyncContext = ac;
    this.clientMd5Map = clientMd5Map;
    this.probeRequestSize = probeRequestSize;
    this.createTime = System.currentTimeMillis();
    this.ip = ip;
    this.timeoutTime = timeoutTime;
    this.appName = appName;
    this.tag = tag;
}        
/**
*其中final Queue<ClientLongPolling> allSubs; 是一个队列
* 这个队列维持客户端的请求任务,当调用配置修改api时会触发入队
* 
*/
public void run() {
    asyncTimeoutFuture = scheduler.schedule(new Runnable() {
        public void run() {
            try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                //删除订阅关系
                allSubs.remove(ClientLongPolling.this);
                if (isFixedPolling()) {
       // 通过比较md5值获取配置变更groupkey
       List<String> changedGroups = MD5Util.compareMd5(
       (HttpServletRequest)asyncContext.getRequest(),
       (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                    if (changedGroups.size() > 0) {
                        sendResponse(changedGroups);
                    } else {
                        sendResponse(null);
                    }
                } else {
                    sendResponse(null);
                }
            } catch (Throwable t) {
            }
        }
    }, timeoutTime, TimeUnit.MILLISECONDS);
    allSubs.add(this);
}

配置修改API

    @RequestMapping(method = RequestMethod.POST)
    @ResponseBody
    public Boolean publishConfig(...)
        throws NacosException {
       //...
        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                // 关键查看这个方法
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }

EventDispatcher.fireEvent

    static public void fireEvent(Event event) {
        if (null == event) {
            throw new IllegalArgumentException();
        }
//从linstener中获取变更事件触发实例
        for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                log.error(e.toString(), e);
            }
        }
    }

而listeners维护在Entry中,Entry重LISTENER_HUB中获取,而LISTENER_HUB则通过addEventListener添加linstener

// AbstractEventListener的构造方法会触发   EventDispatcher.addEventListener。 因此查看
//AbstractEventListener的实现
static public abstract class AbstractEventListener {
    public AbstractEventListener() {
        EventDispatcher.addEventListener(this);
    }
}

查看LongPollingService,当客户端添加了linstener时allSub维护的订阅关系,而配置变更时会触发onEvent事件,onEvent执行DataChangeTask任务。

    @Override
    public void onEvent(Event event) {
        if (isFixedPolling()) {
            // ignore
        } else {
            if (event instanceof LocalDataChangeEvent) {
                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
            }
        }
    }

DataChangeTask 实现

        public void run() {
            try {
                ConfigService.getContentBetaMd5(groupKey);
                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
                    //找到客户端订阅的groupkey任务
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        // ...
                        getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                        iter.remove(); // 删除订阅关系
                        //通过长轮询,返回变更消息通知客户端
                        clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {
            }
        }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容