微服务专题|Naocs 源码设计的精髓就在这了,给你一个手撕面试官的机会

Nacos 如何扛住高并发读写?

最近经常阅读源码,发现大部分框架在解决并发读写的时候,都会使用COW的思想来解决;
nacos也不例外。

解决方案

假设我们创建一个map来存储并发数据,我们先看下在并发场景下,从这个map中进行读写会出现什么问题:

在这里插入图片描述

针对超大的map进行写操作会很耗时,导致其他线程对这个map的读写操作会等待很久;

那么在nacos中,是如何进行解决的呢?

其实nacos处理的思路很简单,我简要概括下,然后跟踪下源码,带大家看看大佬是如何写代码的:

  1. 首先naocs 将内存中的注册列表map 复制一份当到map1
  2. 然后将客户端同步过来的注册key添加到map1中
  3. 处理完所有的key之后,将map1重新复制给内存中的注册列表map中
在这里插入图片描述

源码跟踪

通过阅读源码,我找到了nacos进行更新注册列表的方法:
com.alibaba.nacos.naming.core.Cluster.updateIPs()

  public void updateIPs(List<Instance> ips, boolean ephemeral) {
// 首先判断是需要更新临时注册列表还是持久化的注册列表(这个会在后面讲解ap/cp提到)
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 创建一个map,来保存内存中的注册列表
        HashMap<String, Instance> oldIPMap = new HashMap<>(toUpdateInstances.size());
// 遍历注册列表,依次添加到副本中
        for (Instance ip : toUpdateInstances) {
            oldIPMap.put(ip.getDatumKey(), ip);
        }

// 省略处理key的过程
        toUpdateInstances = new HashSet<>(ips);
// 将更新后的注册列表 重新复制到内存注册列表中
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

作为注册中心的Eureka是怎么实现高并发读写?

在eureka中,使用多级缓存结构来解决高并发读写的问题。
eureka会创建一个只读注册列表和一个读写注册列表:
如果客户端发起注册或退出的时候,eureka会先把最新的注册列表内容更新到读写注册列表中,同时在eureka启动时会创建一个定时任务,定时把读写的注册列表的内容同步到只读注册列表中,客户端在进行服务发现的时候,是从只读注册列表中获取可用的服务列表。

在这里插入图片描述

Nacos的ap和cp又是怎么回事

在学习分布式相关框架的时候,我们都离不开CAP理论,这里就不过多介绍CAP理论了;
令开发者疑惑的是为什么nacos既能支持ap又能支持cp呢,这在面试过程中经常会被问到。相信大家在看完这篇文章后,应该就可以手撕面试官了。

前言

在nacos中,ap和cp主要体现在集群中如何同步注册信息到其它集群节点的实现方式上;
nacos通过ephemeral 字段值来决定是使用ap方式同步还是cp方式同步,默认使用的的ap方式同步注册信息。
通过阅读源码,我们可以找到这段代码,关于如何找到这段代码,后面会在nacos源码解读的文章中讲解:
com.alibaba.nacos.naming.core.ServiceManager.addInstance()

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        // 生成服务的key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取服务
        Service service = getService(namespaceId, serviceName);
        // 使用同步锁处理
        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 调用consistencyService.put 处理同步过来的服务
            consistencyService.put(key, instances);
        }
    }

我们在进入到consistencyService.put方法中

在这里插入图片描述

点击put方法时,会看到有三个实现类,根据上下文(或者debug方式),可以推断出这里引用的是DelegateConsistencyServiceImpl实现类

    @Override
    public void put(String key, Record value) throws NacosException {
        // 进入到这个put方法后,就可以知道应该使用ap方式同步还是cp方式同步
        mapConsistencyService(key).put(key, value);
    }

从下面的方法中 可以判断通过key来判断使用ap还是cp来同步注册信息,其中key是由ephemeral字段组成;

   private ConsistencyService mapConsistencyService(String key) {
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

AP 方式同步的流程(ephemeralConsistencyService)

本地服务器处理注册信息&将注册信息同步到其它节点

    @Override
    public void put(String key, Record value) throws NacosException {
        // 处理本地注册列表
        onPut(key, value);
        // 添加阻塞任务,同步信息到其他集群节点
        taskDispatcher.addTask(key);
    }

处理本地注册节点

nacos将key做为一个task,添加到notifer中阻塞队列tasks中,并且使用单线程执行,其中notifer是初始化的时候,作为一个线程被放到线程池中(线程池只设置了一个核心线程);

这里有一个点需要告诉大家:在大多数分布式框架,都会采用单线程的阻塞队列来处理耗时的任务,一方面解决并发问题,另一方面能够解决并发带来的写写冲突问题。

线程中的主要处理逻辑就是,循环读取阻塞队列中的内容,然后处理注册信息,更新到内存注册列表中。

同步注册信息到其他集群节点

nacos同样也是把注册key作为一个task存放到 TaskDispatcher 中的taskShedule阻塞队列中,然后开启线程循环读取阻塞队列:

       @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {
                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);
                    // 省略判断代码
                    // 添加同步的key
                    keys.add(key);
                    // 计数
                    dataSize++;
                    // 判断同步的key大小是否等于 批量同步设置的限量 或者 判断据上次同步时间 是否大于 配置的间隔周期,如果满足任意一个,则开始同步
                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
                        // 遍历所有集群节点,直接调用http进行同步
                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }

                            dataSyncer.submit(syncTask, 0);
                        }
                        // 记录本次同步时间
                        lastDispatchTime = System.currentTimeMillis();
                        // 计数清零
                        dataSize = 0;
                    }
            }
        }
    }

使用ap方式作同步的过程很简单,但是这里面有两种设计思路来解决单个key同步的问题:
如果有新的key推送上来,nacos就发起一次同步,这会造成网络资源浪费,因为每次同步的就只有一个key或者几个key;

同步少量的key解决方案:
  1. 只有积累到指定数量的key,才发起批量同步
  2. 距离上次同步时间超过配置的限制时间,则忽略key数量,直接发起同步

CP 方式同步的流程(RaftConsistencyServiceImpl)

cp模式追求的是数据一致性,为了数据一致性,那么肯定得选出一个leader,由leader首先同步,然后再由leader通知follower前来获取最新的注册节点(或者主动推送给follower)

nacos使用raft协议来进行选举leader,来实现cp模式。

同样进入到 RaftConsistencyServiceImpl的put方法

    @Override
    public void put(String key, Record value) throws NacosException {
        try {
            raftCore.signalPublish(key, value);
        } catch (Exception e) {
            Loggers.RAFT.error("Raft put failed.", e);
            throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
        }
    }

进入到raftCore.signalPublish方法中,我提取几个关键的代码

// 首先判断当前nacos节点是否是leader,如果不是leader,则获取leader节点的ip,然后将请求转发到leader处理,否则往下走
if (!isLeader()) {
            JSONObject params = new JSONObject();
            params.put("key", key);
            params.put("value", value);
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);

            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
            return;
        }

// 同样采用同样队列的方式,去处理本地注册列表

onPublish(datum, peers.local());

public void onPublish(Datum datum, RaftPeer source) throws Exception {
       
        // 添加同步key任务到阻塞队列中
        notifier.addTask(datum.key, ApplyAction.CHANGE);

        Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
    }

遍历所有集群节点,发送http同步请求

 for (final String server : peers.allServersIncludeMyself()) {
                // 如果是leader,则不进行同步
                if (isLeader(server)) {
                    latch.countDown();
                    continue;
                }
                // 组装url 发送同步请求到其它集群节点
                final String url = buildURL(server, API_ON_PUB);
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                datum.key, server, response.getStatusCode());
                            return 1;
                        }
                        latch.countDown();
                        return 0;
                    }

                    @Override
                    public STATE onContentWriteCompleted() {
                        return STATE.CONTINUE;
                    }
                });

            }

各个集群节点处理同步请求这里就不过多介绍了,大家可以自行去看哈

微信搜一搜【乐哉开讲】关注帅气的我,回复【干货领取】,将会有大量面试资料和架构师必看书籍等你挑选,包括java基础、java并发、微服务、中间件等更多资料等你来取哦。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355

推荐阅读更多精彩内容