你想了解的Nacos原理全在这里

1 什么是Nacos,Nacos可以干什么?

Nacos是微服务架构中的注册中心和配置中心,其他服务的服务信息(ip,端口等信息)可以注册到nacos服务端。nacos又为客户端提供了服务发现的功能。客户端会开启一个定时任务,定时向服务端获取最新的服务列表,加载到客户端本地缓存。客户端同时又开启一个定时心跳发送的任务,用于告知服务端,当前服务的健康状态。服务端启动的时候同样也会开启一个健康检查的定时任务,扫描服务列表,将长时间未与服务端发送心跳的服务的健康状态改为false,达到某个时间,会踢出该服务。
Nacos好处就是服务不需要记录其他服务的ip信息,通过nacos可以实时获取其他服务列表。仅仅只需从本地缓存中根据服务名找到服务列表,利用负载均衡算法从列表中拉取一个ip进行调用(比如Ribbon)。

2 服务心跳与服务注册原理

我们知道springboot项目启动的时候,会去扫描所有jar包下的META-INF下的spring.factories文件,并获取文件中key为EnableAutoConfiguration的value值(一般是javaConfig配置文件,java文件的包路径),并解析这些配置文件。
spring-cloud-alibaba-nacos-discovery这个jar包下的spring.factories文件引入了
一个关键的配置文件NacosDiscoveryAutoConfiguration。此配置文件往spring容器中注入了一个bean,如下:

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry,
                autoServiceRegistrationProperties, registration);
    }

通过查看NacosAutoServiceRegistration的继承图,可以发现它实现了ApplicationListener接口。


image.png

在spring中如果某个bean实现了ApplicationListener<WebServerInitializedEvent>接口,那么它就是一个监听器,监听WebServerInitializedEvent事件(容器启动事件),在spring容器启动的时候,会调用其重写的onApplicationEvent方法。因此我们进入此方法。

    @Override
    @SuppressWarnings("deprecation")
    public void onApplicationEvent(WebServerInitializedEvent event) {
        bind(event);
    }

    @Deprecated
    public void bind(WebServerInitializedEvent event) {
        ApplicationContext context = event.getApplicationContext();
        if (context instanceof ConfigurableWebServerApplicationContext) {
            if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                    .getServerNamespace())) {
                return;
            }
        }
        this.port.compareAndSet(0, event.getWebServer().getPort());
        //关键代码
        this.start();
    }

    public void start() {
        if (!isEnabled()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Discovery Lifecycle disabled. Not starting");
            }
            return;
        }

        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get()) {
            this.context.publishEvent(
                    new InstancePreRegisteredEvent(this, getRegistration()));
            //开始注册
            register();
            if (shouldRegisterManagement()) {
                registerManagement();
            }
            this.context.publishEvent(
                    new InstanceRegisteredEvent<>(this, getConfiguration()));
            this.running.compareAndSet(false, true);
        }

    }

关键代码是register方法,开始注册服务。进入此方法

    @Override
    public void register(Registration registration) {

        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }
        //获取服务标识
        String serviceId = registration.getServiceId();
        //获取服务所在的group
        String group = nacosDiscoveryProperties.getGroup();
        //获取服务的instance,就是把配置文件中配置的服务信息(服务名、IP、Port等等)封装到instance对象中
        Instance instance = getNacosInstanceFromRegistration(registration);

        try {
            //真正的开始注册
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                    instance.getIp(), instance.getPort());
        }
        catch (Exception e) {
            log.error("nacos registry, {} register failed...{},", serviceId,
                    registration.toString(), e);
        }
    }
    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        //判断是否是临时实例。一般来说为了性能,都用的临时实例
        if (instance.isEphemeral()) {
            //以下就是将instance中的信息封装到beatInfo中去
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            long instanceInterval = instance.getInstanceHeartBeatInterval();
            beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
            //发送心跳
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        //注册服务
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

因此,在spring容器启动的时候,nacos客户端会进行两步操作。
1 向nacos服务端发送心跳
2 向nacos服务端注册当前服务

2.1 服务心跳
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        // key的格式:服务名#ip#port
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        //向线程池中提交发送心跳的任务,实现异步发送
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

将心跳的信息包装为一个BeatTask任务,放到线程池中异步执行。我们主要看BeatTask的run方法。

class BeatTask implements Runnable {

        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            // 发送心跳
            long result = serverProxy.sendBeat(beatInfo);
            // 获取返回的下一次执行的间隔时间。0的话,就取默认的,默认是5s
            long nextTime = result > 0 ? result : beatInfo.getPeriod();
            // 循环提交心跳任务。默认就是5s发送一次心跳。
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }
    public long sendBeat(BeatInfo beatInfo) {
        try {
            if (NAMING_LOGGER.isDebugEnabled()) {
                NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
            }
            //以下就是我们常见的调用Http接口。下面是进行接口参数封装。
            Map<String, String> params = new HashMap<String, String>(4);
            params.put("beat", JSON.toJSONString(beatInfo));
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
            // 发起心跳接口Http调用。心跳接口url是/nacos/v1/ns/instance/beat
            String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
            JSONObject jsonObject = JSON.parseObject(result);

            if (jsonObject != null) {
                return jsonObject.getLong("clientBeatInterval");
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), e);
        }
        return 0L;
    }

我们可以看到,客户端默认会每隔5s发送一次心跳。心跳发送过程就是简单的Http接口调用。
接下来进入服务端的心跳接口代码。也就是查找url为/nacos/v1/ns/instance/beat的接口。我们在naming工程下的InstanceController中找到接口方法。

    @CanDistro
    @PutMapping("/beat")
    public JSONObject beat(HttpServletRequest request) throws Exception {

        JSONObject result = new JSONObject();
        //以下是解析请求参数,获取一些信息,包括namespaceId、serviceName等待
        result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
            Constants.DEFAULT_NAMESPACE_ID);
        String beat = WebUtils.required(request, "beat");
        RsInfo clientBeat = JSON.parseObject(beat, RsInfo.class);

        if (!switchDomain.isDefaultInstanceEphemeral() && !clientBeat.isEphemeral()) {
            return result;
        }
        //判断传过来的服务有没有指定cluster名字,如果没有就设置默认值DEFAULT
        if (StringUtils.isBlank(clientBeat.getCluster())) {
            clientBeat.setCluster(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
        }

        String clusterName = clientBeat.getCluster();

        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
        }
        // 获取服务对应的instance。第一次注册,这里肯定是null
        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(),
            clientBeat.getIp(),
            clientBeat.getPort());

        if (instance == null) {
            // 创建instance实例。将客户端传过来的服务信息设置到instance中
            instance = new Instance();
            instance.setPort(clientBeat.getPort());
            instance.setIp(clientBeat.getIp());
            instance.setWeight(clientBeat.getWeight());
            instance.setMetadata(clientBeat.getMetadata());
            instance.setClusterName(clusterName);
            instance.setServiceName(serviceName);
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(clientBeat.isEphemeral());
            // 注册服务。如果是第一次,那么会在发送心跳的时候完成服务注册。这里先跳过,到服务注册的接口探究。
            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }

        Service service = serviceManager.getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
        }
        // 真正处理心跳请求
        service.processClientBeat(clientBeat);
        result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
        return result;
    }
    public void processClientBeat(final RsInfo rsInfo) {
        ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
        clientBeatProcessor.setService(this);
        clientBeatProcessor.setRsInfo(rsInfo);
        //向健康检测的线程池中提交一个处理心跳请求的任务,并且立刻执行。
        HealthCheckReactor.scheduleNow(clientBeatProcessor);
    }

    public static ScheduledFuture<?> scheduleNow(Runnable task) {
        //delay值是0,因此立刻执行。
        return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
    }

我们主要看ClientBeatProcessor的run方法,查看处理心跳的逻辑。

    @Override
    public void run() {
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }
        
        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        // 获取当前cluster下的所有服务列表
        List<Instance> instances = cluster.allIPs(true);
        // 遍历服务列表
        for (Instance instance : instances) {
            // 从服务列表中找客户端发送心跳的那个服务
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                // 找到了,说明之前注册过了,那就这里更新下这个服务的最近心跳时间。后面健康检测就是根据这个心跳时间来的。
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {
                    if (!instance.isHealthy()) {
                        // 将服务的健康状态置为true
                        instance.setHealthy(true);
                        Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                            cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
                        // 服务发生变化,发布推送事件,触发服务端向客户端的推送任务
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }
    }

这里我们可以看出,心跳的目的就是更新这个服务的最后心跳时间。而服务端判定这个服务是否掉线,就是根据这个时间来判定的,如果最后心跳时间与当前时间差超过15s就会设置为false,也就是掉线。时间差超过30s就会将此服务踢出服务列表。当然这个服务健康检查到后面再细说。

服务心跳过程总结:

客户端在启动的时候,会开启一个心跳线程,每隔5s调用一次服务端的心跳接口(Http调用),服务端将心跳请求封装成一个task,放到线程池中。由服务端的线程池执行task,更新对应服务的最后心跳时间。

2.2 服务注册

回到2.1上面,客户端发送服务注册接口那块

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);
        //封装服务注册的接口参数
        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
        //发起Http接口调用。接口url是/nacos/v1/ns/instance
        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }

服务注册就是客户端启动的时候,发起Http接口调用。
接下来进入服务端的服务注册接口代码。也就是查找url为/nacos/v1/ns/instance的接口。我们在naming工程下的InstanceController中找到接口方法。

    @CanDistro
    @PostMapping
    public String register(HttpServletRequest request) throws Exception {
        // 获取服务名和服务所在的namespaceId
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        // 服务注册
        serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
        return "ok";
    }
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        // 因为是第一次注册,需要创建一个空的service
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        // 获取刚刚创建的service
        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        // 真正的注册服务
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

这里有两步,创建service,然后是注册服务到service中。

2.2.1 先看创建service
    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        createServiceIfAbsent(namespaceId, serviceName, local, null);
    }

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
        // 获取service。从serviceMap中获取。
        Service service = getService(namespaceId, serviceName);
        // 如果是第一次注册,那么这里肯定获取不到,会进入下面if分支
        if (service == null) {
            
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            // 创建一个service并设置一些属性
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            // 将service放到serviceMap中,并初始化service
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

    //此方法就是从serviceMap这个存放所有服务信息的map中获取service
    public Service getService(String namespaceId, String serviceName) {
        // 获取当前服务对应的namespaceId
        if (serviceMap.get(namespaceId) == null) {
            return null;
        }
        // 获取到了,就从中取出service
        return chooseServiceMap(namespaceId).get(serviceName);
    }


    private void putServiceAndInit(Service service) throws NacosException {
        // 把服务放到serviceMap中
        putService(service);
        // 服务初始化。主要是开启服务的健康检测。
        service.init();
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
    }

    public void putService(Service service) {
        //将一个空的服务放到serviceMap中。这里使用了双重校验加同步锁。
        //因为在判断此服务不存在之后,放入的时候可能恰好其他线程放入了一样的service。
        //所以需要加锁,并再次校验。类似单例模式创建的双重检测加锁。
        if (!serviceMap.containsKey(service.getNamespaceId())) {
            synchronized (putServiceLock) {
                if (!serviceMap.containsKey(service.getNamespaceId())) {
                    serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
                }
            }
        }
        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    }

    public void init() {
        // 开启服务健康检测。向线程池中提交健康检测任务。
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);

        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

所以创建service主要干了两件事情。
1 将service放到serviceMap中
2 开启当前服务的健康检查

2.2.2 真正注册服务
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        // key格式:com.alibaba.nacos.naming.iplist.ephemera.namespaceId##serviceName
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取之前创建的service
        Service service = getService(namespaceId, serviceName);

        synchronized (service) {
            // 获取当前service下的所有ip列表。因为如果客户端做集群,这里就是集群中的多个服务ip信息
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            // 将当前服务的所有instance放到Instances中
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 服务注册
            consistencyService.put(key, instances);
        }
    }
    @Override
    public void put(String key, Record value) throws NacosException {
        //将服务注册的通知放到阻塞队列中。
        //服务端有一个线程专门从阻塞队列中获取通知。处理通知。
        onPut(key, value);
        //向nacos服务端集群中的其他节点同步服务。这里先不看,下面服务同步再深入。
        taskDispatcher.addTask(key);
    }

    public void onPut(String key, Record value) {
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            // Instances信息放到dataStore中
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }
        // 发出服务列表修改的通知
        notifier.addTask(key, ApplyAction.CHANGE);
    }

notifier是一个实现Runnable接口的类,内部有一个阻塞队列,addTask方法就是往阻塞队列里添加服务变更的通知。
服务端有个类DistroConsistencyServiceImpl,里面定义了一个init方法,方法中开启了一个线程,并提交了一个notifier任务。也就是服务端启动的时候,就会开启一个线程执行notifier任务。我们来看notifier的run方法

public class Notifier implements Runnable {

        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

        private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

        public void addTask(String datumKey, ApplyAction action) {

            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
                return;
            }
            if (action == ApplyAction.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.add(Pair.with(datumKey, action));
        }

        public int getTaskSize() {
            return tasks.size();
        }

        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            //死循环,循环从阻塞队列中获取服务变更的通知。
            while (true) {
                try {
                    //从阻塞队列中获取服务变更的通知
                    Pair pair = tasks.take();

                    if (pair == null) {
                        continue;
                    }

                    String datumKey = (String) pair.getValue0();
                    //获取服务变更类型,有更新和下线删除两种
                    ApplyAction action = (ApplyAction) pair.getValue1();

                    services.remove(datumKey);

                    int count = 0;

                    if (!listeners.containsKey(datumKey)) {
                        continue;
                    }

                    for (RecordListener listener : listeners.get(datumKey)) {

                        count++;

                        try {
                            // 服务列表变更。因为我们这里是注册,相当于更新服务列表。因此走这个if分支。
                            if (action == ApplyAction.CHANGE) {
                                listener.onChange(datumKey, dataStore.get(datumKey).value);
                                continue;
                            }

                            if (action == ApplyAction.DELETE) {
                                listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {
                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                        }
                    }

                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
    }
    @Override
    public void onChange(String key, Instances value) throws Exception {

        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        
        for (Instance instance : value.getInstanceList()) {

            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }

            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }

            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        // 真正的更新服务注册列表
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

        recalculateChecksum();
    }

    public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        // 创建一个临时的服务对应的map。
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }
        // 遍历instance列表
        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }
                // 设置cluster名称
                if (StringUtils.isEmpty(instance.getClusterName())) {
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }
                // 创建一个空的Cluster,设置到clusterMap中
                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                        instance.getClusterName(), instance.toJSON());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }
                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }
                // 将instance添加到ipMap中此instance对应的clustername位置上
                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }
        // 遍历ipMap。将instance对应的cluster更新到clusterMap中。
        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            // 真正的更新服务列表
            clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral);
        }

        setLastModifiedMillis(System.currentTimeMillis());
        // 服务列表更新了,发生变化,发布推送事件,触发服务端向客户端的推送任务
        getPushService().serviceChanged(this);
        StringBuilder stringBuilder = new StringBuilder();

        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");
        }

        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}",
            getNamespaceId(), getName(), stringBuilder.toString());

    }
    public void updateIPs(List<Instance> ips, boolean ephemeral) {
        //此处省略。主要是合并老的服务列表和新的服务列表,合并为一个最新的服务列表。
        .....
        .....
        .....
        // 因为是临时实例。更新最新的服务列表到ephemeralInstances
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

服务注册的主要过程就是,将服务列表更新的通知放入阻塞队列。
服务端启动时候会开启一个线程,专门从这个阻塞队列中获取通知,拿到最新的服务列表,并更新到service中的clusterMap中去。也就是更新最底层Cluster中的ephemeralInstances变量,此变量就是存放当前cluster下的所有服务列表。
至此,服务注册完成。

    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

这里需要提一下服务端存储临时实例的结构。
本质上就是serverMap。它的结构是:
serverMap:
key是namespace
value是Map<group::serviceName, Service>
而Map<group::serviceName, Service>:
key是组名::服务名
value是service对象
而service中存了一个clusterMap,是一个hashmap。
Map<String, Cluster> clusterMap = new HashMap<>();
key是clustername
value是cluster对象
而cluster存了两个set集合:
一个存放临时服务列表(private Set<Instance> ephemeralInstances = new HashSet<>();)
一个存放持久服务列表(private Set<Instance> persistentInstances = new HashSet<>();)
服务注册最后更新的就是这两个set集合。

服务注册过程总结:

客户端启动的时候,向服务端发起Http接口调用,调用服务注册的接口。服务端收到注册请求,将新的注册信息和老的服务列表封装为一个Pair对象,并放入阻塞队列。服务端在启动的时候会创建一个线程池,并提交一个任务,这个任务就是循环从阻塞队列里拿Pair对象,对其解析,更新服务所在的service内部的clusterMap中的服务列表。

3 服务健康检查

我们从上面服务注册的时候,在进行创建空service的时候,会向服务健康检查线程池中,提交一个健康检查任务,此任务就是针对当前service下所有服务列表进行健康检查。

    public void init() {
        // 开启服务健康检测。向线程池中提交健康检测任务。
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);

        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

我们下面看ClientBeatCheckTask的run方法。

    @Override
    public void run() {
        try {
            // 判断当前服务器是否需要对此服务执行健康检查
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            // 获取当前service下的所有服务列表
            List<Instance> instances = service.allIPs(true);

            // 第一次检查
            for (Instance instance : instances) {
                // 判断最后一次心跳时间与当前时间差是否超过了15s。超过了就设置健康状态为false,表示服务已掉线
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
                                UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            // 服务发生变化,发布推送事件,触发服务端向客户端的推送任务
                            getPushService().serviceChanged(service);
                            SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }

            // 第二次检查
            for (Instance instance : instances) {

                if (instance.isMarked()) {
                    continue;
                }
                // 判断最后一次心跳时间与当前时间差是否超过了30s。超过了就踢出该服务
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
                    // 踢出服务
                    deleteIP(instance);
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }

    }

    public boolean responsible(String serviceName) {
        if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
            return true;
        }

        if (CollectionUtils.isEmpty(healthyList)) {
            // means distro config is not ready yet
            return false;
        }

        int index = healthyList.indexOf(NetUtils.localServer());
        int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
        if (lastIndex < 0 || index < 0) {
            return true;
        }
        // 计算服务的hash值与集群长度取模
        int target = distroHash(serviceName) % healthyList.size();
        // 如果是当前服务器在集群中的index,就返回false,也就是执行下面的健康检查
        return target >= index && target <= lastIndex;
    }

检查的方法就是:

如果最后心跳时间与当前时间差超过15s就会设置为false,也就是掉线。时间差超过30s就会将此服务踢出服务列表。

这里有个性能问题,如果服务列表很多很多,那么单台nacos服务器需要一次检查很多个服务。因此在使用nacos集群的时候,会对每个服务计算一个hash值,然后对集群列表的长度取模,如果等于当前nacos服务器在集群中的index,那么就执行对当前service的健康检查,如果不是,就不检查。这样的好处是,多台服务器可以分担健康检查的压力,类似分片的思想。

踢出服务列表就是执行 deleteIP(instance);我们下面看里面做了什么。

    private void deleteIP(Instance instance) {

        try {
            NamingProxy.Request request = NamingProxy.Request.newRequest();
            request.appendParam("ip", instance.getIp())
                .appendParam("port", String.valueOf(instance.getPort()))
                .appendParam("ephemeral", "true")
                .appendParam("clusterName", instance.getClusterName())
                .appendParam("serviceName", service.getName())
                .appendParam("namespaceId", service.getNamespaceId());

            String url = "http://127.0.0.1:" + RunningConfig.getServerPort() + RunningConfig.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();

            // 还是发起Http接口调用。url是http://127.0.0.1:port/nacos/v1/ns/instance.是delete请求
            HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {
                @Override
                public Object onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                            instance.toJSON(), response.getResponseBody(), response.getStatusCode());
                    }
                    return null;
                }
            });

        } catch (Exception e) {
            Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJSON(), e);
        }
    }

同样的,这里踢出服务列表,就是调用删除服务的接口。进入接口代码。

    @CanDistro
    @DeleteMapping
    public String deregister(HttpServletRequest request) throws Exception {
        Instance instance = getIPAddress(request);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
            Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        // 获取服务所在的service
        Service service = serviceManager.getService(namespaceId, serviceName);
        if (service == null) {
            Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
            return "ok";
        }
        // 开始删除服务
        serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);

        return "ok";
    }
    public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        Service service = getService(namespaceId, serviceName);
        // 对当前service加锁,防止并发问题。
        synchronized (service) {
            //剔除服务
            removeInstance(namespaceId, serviceName, ephemeral, service, ips);
        }
    }

    public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取删除当前服务后的服务列表
        List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);

        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        // 更新服务列表并同步数据到其他节点
        consistencyService.put(key, instances);
    }

    @Override
    public void put(String key, Record value) throws NacosException {
        onPut(key, value);
        taskDispatcher.addTask(key);
    }

    public void onPut(String key, Record value) {

        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            // Instances信息放到dataStore中
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }
        // 发出服务列表修改的通知
        notifier.addTask(key, ApplyAction.CHANGE);
    }

删除服务实例的接口最终调用的地方和服务注册时一模一样。就是将需要删除的服务和老的服务列表合并为一个新的服务列表,封装到dataStore里,提交服务改变的通知到阻塞队列。由服务端的线程从阻塞队列获取服务列表改变的通知,从dataStore拿出最新的服务列表,更新Cluster中的存放临时服务列表的set集合。

我们可以发现,删除服务和注册服务接口逻辑类似。都是将更新后的服务列表封装起来,然后添加一个通知,让其他线程去获取通知,将新的服务列表更新进去。

4 服务同步

nacos中临时实例集群同步采用的是AP模式,而持久实例采用的是CP模式。这里对临时实例的服务同步进行分析。
通过上面服务同步和服务删除的接口,我们可以看到本质上都是更新服务列表。更新完之后调用taskDispatcher.addTask(key);向其他节点发起服务同步请求。

    @Override
    public void put(String key, Record value) throws NacosException {
        //更新服务列表
        onPut(key, value);
        //服务同步
        taskDispatcher.addTask(key);
    }
    public void addTask(String key) {
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }


        public void addTask(String key) {
            // queue是一个阻塞队列。向阻塞队列中添加一个服务同步的通知。
            queue.offer(key);
        }

可以发现服务同步也是向阻塞队列中添加一个服务同步的通知。当然这个阻塞队列和上面的注册服务的阻塞队列不是同一个。我们可以猜想,肯定是服务端某个类初始化的时候,开启了一个线程,线程执行的任务就是循环从阻塞队列中获取通知。

@Component
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
    //初始化方法
    @PostConstruct
    public void init() {
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            // 向线程池中提交一个TaskScheduler任务
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }
......
......
......省略

果然TaskDispatcher这个类初始化的时候,开启了线程,执行TaskScheduler任务,我们来看run方法。

        @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {

                try {
                    // 从阻塞队列中取出要同步的key
                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);

                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}", key);
                    }
                    // 如果是单机,就不需要同步
                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }

                    if (StringUtils.isBlank(key)) {
                        continue;
                    }

                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }
                    //key放到keys集合中。
                    keys.add(key);
                    //对keys计数
                    dataSize++;
                    // 当数量满足批量同步的最小数量时,对存放的所有key,开始同步操作
                    // 或者上一次同步时间与当前时间差超过了指定时间,也进行同步
                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
                        // 获取所有服务器,遍历依次发送同步请求
                        for (Server member : dataSyncer.getServers()) {
                            // 如果是当前服务器,那么就不同步。
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            // 将同步的key封装到SyncTask任务对象中
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }
                            // 提交服务同步的任务
                            dataSyncer.submit(syncTask, 0);
                        }
                        // 记录最后同步的时间
                        lastDispatchTime = System.currentTimeMillis();
                        // 重置计数器
                        dataSize = 0;
                    }

                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }

我们可以看到同步的时候并不是每次有服务列表变更,就执行同步,这样很耗资源,影响性能。而是将同步的服务累积起来,等到某个值进行批量同步。或者超时了也执行同步(防止服务达不到批量同步的阈值而一直不同步)。

    public void submit(SyncTask task, long delay) {

        // If it's a new task:
        // 下面是将所有的key放到taskMap中管理
        if (task.getRetryCount() == 0) {
            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
                    // associated key already exist:
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync already in process, key: {}", key);
                    }
                    iterator.remove();
                }
            }
        }

        if (task.getKeys().isEmpty()) {
            // all keys are removed:
            return;
        }
        // 真正的提交同步请求的任务
        GlobalExecutor.submitDataSync(() -> {
            // 检查是否是单机
            if (getServers() == null || getServers().isEmpty()) {
                Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                return;
            }

            List<String> keys = task.getKeys();

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
            }
            // 2. get the datums by keys and check the datum is empty or not
            // 获取keys的所有对应的服务
            Map<String, Datum> datumMap = dataStore.batchGet(keys);
            if (datumMap == null || datumMap.isEmpty()) {
                // clear all flags of this task:
                for (String key : keys) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
                return;
            }
            // 序列化
            byte[] data = serializer.serialize(datumMap);
    
            long timestamp = System.currentTimeMillis();
            // 发起同步请求。里面就是一个Http接口调用。调用同步服务接口。
            // 接口url是/nacos/ns/distro/datum
            boolean success = NamingProxy.syncData(data, task.getTargetServer());
            if (!success) {
                SyncTask syncTask = new SyncTask();
                syncTask.setKeys(task.getKeys());
                syncTask.setRetryCount(task.getRetryCount() + 1);
                syncTask.setLastExecuteTime(timestamp);
                syncTask.setTargetServer(task.getTargetServer());
                // 如果不成功,重试
                retrySync(syncTask);
            } else {
                // clear all flags of this task:
                for (String key : task.getKeys()) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
            }
        }, delay);
    }

    public void retrySync(SyncTask syncTask) {

        Server server = new Server();
        server.setIp(syncTask.getTargetServer().split(":")[0]);
        server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
        if (!getServers().contains(server)) {
            // if server is no longer in healthy server list, ignore this task:
            //fix #1665 remove existing tasks
            if (syncTask.getKeys() != null) {
                for (String key : syncTask.getKeys()) {
                    taskMap.remove(buildKey(key, syncTask.getTargetServer()));
                }
            }
            return;
        }

        // TODO may choose other retry policy.
        // 又提交了一次同步服务的任务
        submit(syncTask, partitionConfig.getSyncRetryDelay());
    }

服务同步本质上也就是将更新的服务序列化,调用同步服务的接口,传递给集群中其他节点。
不过这里有个小问题。就是如果其他节点出现问题,比如挂了。那么这里肯定同步不成功,那么它会不断的重试,不断的执行同步调用,又不断的失败。这样其实是无意义的,作者在这里也标注了,之后会更新,使用其他重试机制。比如,重试的时间延长,失败次数如果达到某个次数,就停止同步。

下面我们看服务端的接受同步请求的接口。

    //同步服务的接口
    @PutMapping("/datum")
    public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
        //校验参数
        if (dataMap.isEmpty()) {
            Loggers.DISTRO.error("[onSync] receive empty entity!");
            throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
        }
        // 遍历获取key
        for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
            if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
                String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                String serviceName = KeyBuilder.getServiceName(entry.getKey());
                if (!serviceManager.containService(namespaceId, serviceName)
                    && switchDomain.isDefaultInstanceEphemeral()) {
                    serviceManager.createEmptyService(namespaceId, serviceName, true);
                }
                // 又是熟悉的调用服务列表更新的方法
                consistencyService.onPut(entry.getKey(), entry.getValue().value);
            }
        }
        return ResponseEntity.ok("ok");
    }

    public void onPut(String key, Record value) {

        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            // Instances信息放到dataStore中
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }
        // 发出服务列表修改的通知
        notifier.addTask(key, ApplyAction.CHANGE);
    }

同步服务的接口和之前的服务注册、删除过程一毛一样。同样的获取接口参数,拿出要同步的服务信息,将服务封装起来,添加一个服务变更通知到阻塞队列。让其他线程去阻塞队列取数据,然后完成服务列表的更新。

临时节点同步模式是AP模式的原因是,集群中无主节点,第一次收到数据的节点,向其他节点发起同步请求,其他节点就算没有同步到服务,整个集群也能正常对外客户端提供服务。因此满足了可用性,所以是AP模式。

5 服务发现

服务发现大致过程:
客户端定时从服务端拉取最新的服务列表数据,将服务列表加载到本地缓存。同样服务端也会定时向客户端推送服务列表数据。

5.1 客户端拉取服务列表

先从客户端入手。在使用Ribbon的情况下,我们获取服务列表,需要定义一个namingService,从spring容器中自动装配。然后手动调用namingService.getAllInstances

我们进入namingService的子类NacosNamingServer这个类,有个getAllInstances方法。

    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        // 判断是否是订阅模式
        if (subscribe) {
            // 定时任务,定时调用服务发现接口,获取服务列表
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            // 调用一次服务发现接口,获取服务列表
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }
    public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException {
        // 获取服务列表
        String result = serverProxy.queryList(serviceName, clusters, 0, false);
        if (StringUtils.isNotEmpty(result)) {
            return JSON.parseObject(result, ServiceInfo.class);
        }
        return null;
    }

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {

        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        // Http接口调用,调用服务获取列表。url是/nacos/v1/instance/list
        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
    }

如果客户端不是订阅模式,我们可以看出只是简单的发了一次Http接口调用。拉取一次服务列表。
如果是订阅模式,下面进入定时拉取服务信息的代码。这块还是比较复杂的。

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        // 从本地缓存serviceInfoMap获取服务列表
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        // 获取不到,发起一次服务列表拉取
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);

            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            // 设置到updatingMap,表示当前针对当前服务名,正在向服务端拉取服务列表数据
            updatingMap.put(serviceName, new Object());
            // 拉取结束。唤醒所有因等待当前线程拉取数据而阻塞的线程
            updateServiceNow(serviceName, clusters);
            // 此服务不在拉取数据了。表示其他线程可以进行接口调用拉取数据了
            updatingMap.remove(serviceName);

        } else if (updatingMap.containsKey(serviceName)) {
            // 表示当前有线程正在从服务端拉取服务列表
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        // 阻塞,等待某个线程拉取完数据。
                        // 这样确保了能获取到最新拉取的数据,也就是确保了数据的实时性
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        // 提交获取服务端服务列表的任务。里面会定时调用接口,拉取最新的服务列表。
        scheduleUpdateIfAbsent(serviceName, clusters);

        return serviceInfoMap.get(serviceObj.getKey());
    }

    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }

        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            // 添加一个拉取数据的任务
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }

从先从本地缓存中获取,获取不到就立马调用一次服务发现接口(这样做是确保最后能返回数据)。如果缓存中获取到了,会判断当前是否正在拉取数据,在拉取数据的话会阻塞等待数据拉取完,好处是确保我这一次返回正在拉取的最新的服务列表。但是不管缓存中有没有,最后都会提交一个更新本地服务列表缓存的UpdateTask任务到线程池。我们进入它的run方法。

        @Override
        public void run() {
            try {
                // 从缓存中获取数据
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

                if (serviceObj == null) {
                    // 从缓存中获取不到,那么就立即调用接口拉取服务列表
                    updateServiceNow(serviceName, clusters);
                    // 循环提交当前任务。延迟10s执行
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }
                // 当前service更新时间小于等于最后一次拉取的时间。
                // 说明服务端暂时没有推数据过来。那就自己拉取一次最新数据
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    //立即调用接口拉取服务列表
                    updateServiceNow(serviceName, clusters);
                    // 获取刚刚拉取的服务列表
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // 说明服务端推数据过来了。那就说明有可能服务列表发生了变化,或者正在发生变化。
                    // 所以这里调用一次接口,强制让服务端来推最新数据。
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }

                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
                // 更新最后拉取时间
                lastRefTime = serviceObj.getLastRefTime();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            }

        }

该定时任务主要作用就是,正常情况,每隔一段时间自己拉取一次。但是如果发现数据被修改掉了,那就说明服务端自己推送最新的数据过来了,这也表明服务端可能收到了该service下的其他客户端的更新列表的数据。那么这里就调用一下服务发现接口,让服务端再推送一次数据过来。这样提高了客户端服务列表的实时性。

下面看服务发现的接口。

    @GetMapping("/list")
    public JSONObject list(HttpServletRequest request) throws Exception {
        // 下面是解析请求,获取客户端信息
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
            Constants.DEFAULT_NAMESPACE_ID);

        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        String agent = WebUtils.getUserAgent(request);
        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
        Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

        String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

        String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

        boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
        // 获取服务列表
        return doSrvIPXT(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
    }


    public JSONObject doSrvIPXT(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
                                int udpPort,
                                String env, boolean isCheck, String app, String tid, boolean healthyOnly)
        throws Exception {

        ClientInfo clientInfo = new ClientInfo(agent);
        JSONObject result = new JSONObject();
        // 获取客户端服务所在的service实例
        Service service = serviceManager.getService(namespaceId, serviceName);
        // 非空校验
        if (service == null) {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.put("hosts", new JSONArray());
            return result;
        }
        // 校验是否可用
        checkIfDisabled(service);

        long cacheMillis = switchDomain.getDefaultCacheMillis();

        // 尝试开启向客户端推送服务列表的功能
        try {
             // 推送时的通信方式采用的是UDP,校验客户端是否开启UDP端口,以及是否满足推送的要求
            if (udpPort > 0 && pushService.canEnablePush(agent)) {
                // 将客户端添加到要推送的列表中
                pushService.addClient(namespaceId, serviceName,
                    clusters,
                    agent,
                    new InetSocketAddress(clientIP, udpPort),
                    pushDataSource,
                    tid,
                    app);
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-API] failed to added push client", e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }
        // 创建存放服务列表的集合
        List<Instance> srvedIPs;
        // 获取服务列表
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

        // 如果客户端设置了过滤器,这里需要根据客户端过滤器规则过滤一些服务
        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
            srvedIPs = service.getSelector().select(clientIP, srvedIPs);
        }
        // 服务列表为空,直接返回
        if (CollectionUtils.isEmpty(srvedIPs)) {

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }

            if (clientInfo.type == ClientInfo.ClientType.JAVA &&
                clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }

            result.put("hosts", new JSONArray());
            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL", false);
            result.put("clusters", clusters);
            result.put("env", env);
            result.put("metadata", service.getMetadata());
            return result;
        }

        Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
        // 设置两个key,一个存放健康服务列表,一个存放不健康服务列表
        ipMap.put(Boolean.TRUE, new ArrayList<>());
        ipMap.put(Boolean.FALSE, new ArrayList<>());
        // 将不同健康状态的服务分别放到ipMap中
        for (Instance ip : srvedIPs) {
            ipMap.get(ip.isHealthy()).add(ip);
        }

        if (isCheck) {
            result.put("reachProtectThreshold", false);
        }

        double threshold = service.getProtectThreshold();
        // 大概是服务列表中健康的比率小于某个阈值时候,将所有不健康的添加到健康的服务列表里。
        // 这里看不懂这步骚操作的意义。因为下面返回的时候,还是会剔除不健康的服务
        if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

            Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
            if (isCheck) {
                result.put("reachProtectThreshold", true);
            }

            ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
            ipMap.get(Boolean.FALSE).clear();
        }

        if (isCheck) {
            result.put("protectThreshold", service.getProtectThreshold());
            result.put("reachLocalSiteCallThreshold", false);

            return new JSONObject();
        }

        JSONArray hosts = new JSONArray();

        for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
            List<Instance> ips = entry.getValue();
            // 过滤不健康的
            if (healthyOnly && !entry.getKey()) {
                continue;
            }
            // 将所有instance添加到json数组中
            for (Instance instance : ips) {

                // remove disabled instance:
                if (!instance.isEnabled()) {
                    continue;
                }

                JSONObject ipObj = new JSONObject();

                ipObj.put("ip", instance.getIp());
                ipObj.put("port", instance.getPort());
                // deprecated since nacos 1.0.0:
                ipObj.put("valid", entry.getKey());
                ipObj.put("healthy", entry.getKey());
                ipObj.put("marked", instance.isMarked());
                ipObj.put("instanceId", instance.getInstanceId());
                ipObj.put("metadata", instance.getMetadata());
                ipObj.put("enabled", instance.isEnabled());
                ipObj.put("weight", instance.getWeight());
                ipObj.put("clusterName", instance.getClusterName());
                if (clientInfo.type == ClientInfo.ClientType.JAVA &&
                    clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                    ipObj.put("serviceName", instance.getServiceName());
                } else {
                    ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                }

                ipObj.put("ephemeral", instance.isEphemeral());
                hosts.add(ipObj);

            }
        }
        // 将服务列表的json数组放到result返回值中
        result.put("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA &&
            clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.put("metadata", service.getMetadata());
        return result;
    }

下面是获取服务列表的代码

    public List<Instance> srvIPs(List<String> clusters) {
        if (CollectionUtils.isEmpty(clusters)) {
            clusters = new ArrayList<>();
            clusters.addAll(clusterMap.keySet());
        }
        // 获取service下所有cluster下的服务列表
        // 因为一个service下clusterMap,也就是可以存放多个cluster,每个cluster下就是一组服务列表
        // 这也印证了,客户端可以获取其他cluster的服务,也就可以跨clster调用
        return allIPs(clusters);
    }

    public List<Instance> allIPs(List<String> clusters) {
        List<Instance> allIPs = new ArrayList<>();
        // 遍历cluster
        for (String cluster : clusters) {
            Cluster clusterObj = clusterMap.get(cluster);
            if (clusterObj == null) {
                continue;
            }
            // 从cluster中获取所有的对应的instance的set集合。包括临时的和持久的
            allIPs.addAll(clusterObj.allIPs());
        }

        return allIPs;
    }

    public List<Instance> allIPs() {
        List<Instance> allInstances = new ArrayList<>();
        // 添加临时的
        allInstances.addAll(persistentInstances);
        // 添加持久的
        allInstances.addAll(ephemeralInstances);
        return allInstances;
    }

服务发现总结:

客户端发起服务获取的请求。服务端获取客户端服务所在的service下的所有cluster下的服务列表,并剔除其中不健康的,将所有健康状态的服务列表返回给客户端。

5.2 服务端推送服务列表

下面再来研究下服务端向客户端推送服务列表的原理,采用的是UDP推送。
从上面的 pushService.addClient代码进去。

    public void addClient(String namespaceId,
                          String serviceName,
                          String clusters,
                          String agent,
                          InetSocketAddress socketAddr,
                          DataSource dataSource,
                          String tenant,
                          String app) {

        PushClient client = new PushClient(namespaceId,
            serviceName,
            clusters,
            agent,
            socketAddr,
            dataSource,
            tenant,
            app);
        // 添加要推送的客户端
        addClient(client);
    }

    public static void addClient(PushClient client) {
        // client is stored by key 'serviceName' because notify event is driven by serviceName change
        String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
        // 从客户端列表clientMap中获取当前service下的所有client
        ConcurrentMap<String, PushClient> clients =
            clientMap.get(serviceKey);
        // 不存在就创建
        if (clients == null) {
            clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<String, PushClient>(1024));
            clients = clientMap.get(serviceKey);
        }
        // 判断当前客户端有没有放入过
        PushClient oldClient = clients.get(client.toString());
        if (oldClient != null) {
            oldClient.refresh();
        } else {
            // 没放入,就将当前客户端放入clientMap中
            PushClient res = clients.putIfAbsent(client.toString(), client);
            if (res != null) {
                Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());
            }
            Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());
        }
    }

我们发现上面的代码仅仅是将要推送的客户端放入到clientMap中。那么肯定是有地方开启线程,对clientMap中的客户端完成服务列表的推送。
我们查看下PushService的类继承图


image.png

我们发现PushService实现了ApplicationListener接口(spring监听器)。

public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> 

再查看下PushService 的内部有个方法serviceChanged,方法作用就是发布推送服务列表的事件,如果某个地方调用了serviceChanged方法,发布ServiceChangeEvent事件,那么会触发PushService的onApplicationEvent方法。

    public void serviceChanged(Service service) {
        // merge some change events to reduce the push frequency:
        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
            return;
        }
        // 发布推送服务列表的事件
        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
    }

从上面的服务注册、心跳、健康检查的代码中,我们都能看到,每次服务列表发生变更或者是服务的健康状态发生变化,都会调用serviceChanged方法,进而触发了onApplicationEvent事件。因此我们看下onApplicationEvent方法干了什么?


    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        // 获取发生变化的service的信息
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();
        // 向线程池中提交一个推送服务列表的任务
        Future future = udpSender.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                    // 获取service下所有客户端。当然是已经添加到clientMap中的。
                    // 也就是已经向服务端调用过服务发现接口的client客户端。因为调用过接口,才会将client加入到clientMap中
                    ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                    if (MapUtils.isEmpty(clients)) {
                        return;
                    }

                    Map<String, Object> cache = new HashMap<>(16);
                    long lastRefTime = System.nanoTime();
                    for (PushClient client : clients.values()) {
                        if (client.zombie()) {
                            Loggers.PUSH.debug("client is zombie: " + client.toString());
                            clients.remove(client.toString());
                            Loggers.PUSH.debug("client is zombie: " + client.toString());
                            continue;
                        }
                        // 定义一个AckEntry,里面封装udp数据包
                        Receiver.AckEntry ackEntry;
                        Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                        String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                        byte[] compressData = null;
                        Map<String, Object> data = null;
                        // 从缓存中取服务列表数据,Pair里面封装的就是服务列表。
                        if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                            org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                            compressData = (byte[]) (pair.getValue0());
                            data = (Map<String, Object>) pair.getValue1();

                            Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                        }
                        // 缓存没命中,自己去获取服务列表
                        if (compressData != null) {
                            ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                        } else {
                            // prepareHostsData(client)就是获取要推送的服务列表数据
                            // prepareAckEntry就是将数据封装为udp数据包
                            ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                            if (ackEntry != null) {
                                cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                            }
                        }

                        Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));
                        // 开始udp推送
                        udpPush(ackEntry);
                    }
                } catch (Exception e) {
                    Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

                } finally {
                    futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                }

            }
        }, 1000, TimeUnit.MILLISECONDS);

        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

    }


    //真正的udp推送
    private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }

        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;
            return ackEntry;
        }

        try {
            if (!ackMap.containsKey(ackEntry.key)) {
                totalPush++;
            }
            ackMap.put(ackEntry.key, ackEntry);
            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            // 基于udp推送数据
            udpSocket.send(ackEntry.origin);

            ackEntry.increaseRetryTime();
            // 循环提交udp推送任务,也就是每隔10s进行一次服务列表推送到客户端
            executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),
                TimeUnit.MILLISECONDS);

            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",
                ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;

            return null;
        }
    }

下面,我们看下是如何获取服务列表数据的。先从缓存中拿,拿不到,再自己到serviceMap中获取。下面是到serviceMap中获取的代码。

    private static Map<String, Object> prepareHostsData(PushClient client) throws Exception {
        Map<String, Object> cmd = new HashMap<String, Object>(2);
        cmd.put("type", "dom");
        // getData是真正的获取服务列表
        cmd.put("data", client.getDataSource().getData(client));

        return cmd;
    }

    private DataSource pushDataSource = new DataSource() {

        @Override
        public String getData(PushService.PushClient client) {

            JSONObject result = new JSONObject();
            try {
                // doSrvIPXT方法就是熟悉的服务发现接口中获取当前service下的所有服务列表
                result = doSrvIPXT(client.getNamespaceId(), client.getServiceName(), client.getAgent(),
                    client.getClusters(), client.getSocketAddr().getAddress().getHostAddress(), 0, StringUtils.EMPTY,
                    false, StringUtils.EMPTY, StringUtils.EMPTY, false);
            } catch (Exception e) {
                Loggers.SRV_LOG.warn("PUSH-SERVICE: service is not modified", e);
            }

            // overdrive the cache millis to push mode
            result.put("cacheMillis", switchDomain.getPushCacheMillis(client.getServiceName()));

            return result.toJSONString();
        }
    };

服务端推送总结:

客户端向服务端拉取数据的同时,会将客户端信息注册到clientMap中。等到下一次发生心跳、服务列表数据变更、健康状态发生变化等,都会触发推送事件。在推送事件方法中,服务端将此客户端对应的service下的所有服务列表基于UDP推送给客户端,并开启一个定时任务,每隔10s定时推送数据到客户端。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342