1 什么是Nacos,Nacos可以干什么?
Nacos是微服务架构中的注册中心和配置中心,其他服务的服务信息(ip,端口等信息)可以注册到nacos服务端。nacos又为客户端提供了服务发现的功能。客户端会开启一个定时任务,定时向服务端获取最新的服务列表,加载到客户端本地缓存。客户端同时又开启一个定时心跳发送的任务,用于告知服务端,当前服务的健康状态。服务端启动的时候同样也会开启一个健康检查的定时任务,扫描服务列表,将长时间未与服务端发送心跳的服务的健康状态改为false,达到某个时间,会踢出该服务。
Nacos好处就是服务不需要记录其他服务的ip信息,通过nacos可以实时获取其他服务列表。仅仅只需从本地缓存中根据服务名找到服务列表,利用负载均衡算法从列表中拉取一个ip进行调用(比如Ribbon)。
2 服务心跳与服务注册原理
我们知道springboot项目启动的时候,会去扫描所有jar包下的META-INF下的spring.factories文件,并获取文件中key为EnableAutoConfiguration的value值(一般是javaConfig配置文件,java文件的包路径),并解析这些配置文件。
spring-cloud-alibaba-nacos-discovery这个jar包下的spring.factories文件引入了
一个关键的配置文件NacosDiscoveryAutoConfiguration。此配置文件往spring容器中注入了一个bean,如下:
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
通过查看NacosAutoServiceRegistration的继承图,可以发现它实现了ApplicationListener接口。
在spring中如果某个bean实现了ApplicationListener<WebServerInitializedEvent>接口,那么它就是一个监听器,监听WebServerInitializedEvent事件(容器启动事件),在spring容器启动的时候,会调用其重写的onApplicationEvent方法。因此我们进入此方法。
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
//关键代码
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
//开始注册
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
关键代码是register方法,开始注册服务。进入此方法
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
//获取服务标识
String serviceId = registration.getServiceId();
//获取服务所在的group
String group = nacosDiscoveryProperties.getGroup();
//获取服务的instance,就是把配置文件中配置的服务信息(服务名、IP、Port等等)封装到instance对象中
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//真正的开始注册
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
//判断是否是临时实例。一般来说为了性能,都用的临时实例
if (instance.isEphemeral()) {
//以下就是将instance中的信息封装到beatInfo中去
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
//发送心跳
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
//注册服务
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
因此,在spring容器启动的时候,nacos客户端会进行两步操作。
1 向nacos服务端发送心跳
2 向nacos服务端注册当前服务
2.1 服务心跳
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
// key的格式:服务名#ip#port
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
//向线程池中提交发送心跳的任务,实现异步发送
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
将心跳的信息包装为一个BeatTask任务,放到线程池中异步执行。我们主要看BeatTask的run方法。
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
// 发送心跳
long result = serverProxy.sendBeat(beatInfo);
// 获取返回的下一次执行的间隔时间。0的话,就取默认的,默认是5s
long nextTime = result > 0 ? result : beatInfo.getPeriod();
// 循环提交心跳任务。默认就是5s发送一次心跳。
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
public long sendBeat(BeatInfo beatInfo) {
try {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
//以下就是我们常见的调用Http接口。下面是进行接口参数封装。
Map<String, String> params = new HashMap<String, String>(4);
params.put("beat", JSON.toJSONString(beatInfo));
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
// 发起心跳接口Http调用。心跳接口url是/nacos/v1/ns/instance/beat
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) {
return jsonObject.getLong("clientBeatInterval");
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), e);
}
return 0L;
}
我们可以看到,客户端默认会每隔5s发送一次心跳。心跳发送过程就是简单的Http接口调用。
接下来进入服务端的心跳接口代码。也就是查找url为/nacos/v1/ns/instance/beat的接口。我们在naming工程下的InstanceController中找到接口方法。
@CanDistro
@PutMapping("/beat")
public JSONObject beat(HttpServletRequest request) throws Exception {
JSONObject result = new JSONObject();
//以下是解析请求参数,获取一些信息,包括namespaceId、serviceName等待
result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String beat = WebUtils.required(request, "beat");
RsInfo clientBeat = JSON.parseObject(beat, RsInfo.class);
if (!switchDomain.isDefaultInstanceEphemeral() && !clientBeat.isEphemeral()) {
return result;
}
//判断传过来的服务有没有指定cluster名字,如果没有就设置默认值DEFAULT
if (StringUtils.isBlank(clientBeat.getCluster())) {
clientBeat.setCluster(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
String clusterName = clientBeat.getCluster();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
}
// 获取服务对应的instance。第一次注册,这里肯定是null
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(),
clientBeat.getIp(),
clientBeat.getPort());
if (instance == null) {
// 创建instance实例。将客户端传过来的服务信息设置到instance中
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
// 注册服务。如果是第一次,那么会在发送心跳的时候完成服务注册。这里先跳过,到服务注册的接口探究。
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
// 真正处理心跳请求
service.processClientBeat(clientBeat);
result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
return result;
}
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
//向健康检测的线程池中提交一个处理心跳请求的任务,并且立刻执行。
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
public static ScheduledFuture<?> scheduleNow(Runnable task) {
//delay值是0,因此立刻执行。
return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
}
我们主要看ClientBeatProcessor的run方法,查看处理心跳的逻辑。
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取当前cluster下的所有服务列表
List<Instance> instances = cluster.allIPs(true);
// 遍历服务列表
for (Instance instance : instances) {
// 从服务列表中找客户端发送心跳的那个服务
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 找到了,说明之前注册过了,那就这里更新下这个服务的最近心跳时间。后面健康检测就是根据这个心跳时间来的。
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
// 将服务的健康状态置为true
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
// 服务发生变化,发布推送事件,触发服务端向客户端的推送任务
getPushService().serviceChanged(service);
}
}
}
}
}
这里我们可以看出,心跳的目的就是更新这个服务的最后心跳时间。而服务端判定这个服务是否掉线,就是根据这个时间来判定的,如果最后心跳时间与当前时间差超过15s就会设置为false,也就是掉线。时间差超过30s就会将此服务踢出服务列表。当然这个服务健康检查到后面再细说。
服务心跳过程总结:
客户端在启动的时候,会开启一个心跳线程,每隔5s调用一次服务端的心跳接口(Http调用),服务端将心跳请求封装成一个task,放到线程池中。由服务端的线程池执行task,更新对应服务的最后心跳时间。
2.2 服务注册
回到2.1上面,客户端发送服务注册接口那块
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
//封装服务注册的接口参数
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
//发起Http接口调用。接口url是/nacos/v1/ns/instance
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
服务注册就是客户端启动的时候,发起Http接口调用。
接下来进入服务端的服务注册接口代码。也就是查找url为/nacos/v1/ns/instance的接口。我们在naming工程下的InstanceController中找到接口方法。
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
// 获取服务名和服务所在的namespaceId
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 服务注册
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 因为是第一次注册,需要创建一个空的service
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 获取刚刚创建的service
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 真正的注册服务
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
这里有两步,创建service,然后是注册服务到service中。
2.2.1 先看创建service
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
// 获取service。从serviceMap中获取。
Service service = getService(namespaceId, serviceName);
// 如果是第一次注册,那么这里肯定获取不到,会进入下面if分支
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
// 创建一个service并设置一些属性
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 将service放到serviceMap中,并初始化service
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
//此方法就是从serviceMap这个存放所有服务信息的map中获取service
public Service getService(String namespaceId, String serviceName) {
// 获取当前服务对应的namespaceId
if (serviceMap.get(namespaceId) == null) {
return null;
}
// 获取到了,就从中取出service
return chooseServiceMap(namespaceId).get(serviceName);
}
private void putServiceAndInit(Service service) throws NacosException {
// 把服务放到serviceMap中
putService(service);
// 服务初始化。主要是开启服务的健康检测。
service.init();
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
}
public void putService(Service service) {
//将一个空的服务放到serviceMap中。这里使用了双重校验加同步锁。
//因为在判断此服务不存在之后,放入的时候可能恰好其他线程放入了一样的service。
//所以需要加锁,并再次校验。类似单例模式创建的双重检测加锁。
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
public void init() {
// 开启服务健康检测。向线程池中提交健康检测任务。
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
所以创建service主要干了两件事情。
1 将service放到serviceMap中
2 开启当前服务的健康检查
2.2.2 真正注册服务
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
// key格式:com.alibaba.nacos.naming.iplist.ephemera.namespaceId##serviceName
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取之前创建的service
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// 获取当前service下的所有ip列表。因为如果客户端做集群,这里就是集群中的多个服务ip信息
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 将当前服务的所有instance放到Instances中
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 服务注册
consistencyService.put(key, instances);
}
}
@Override
public void put(String key, Record value) throws NacosException {
//将服务注册的通知放到阻塞队列中。
//服务端有一个线程专门从阻塞队列中获取通知。处理通知。
onPut(key, value);
//向nacos服务端集群中的其他节点同步服务。这里先不看,下面服务同步再深入。
taskDispatcher.addTask(key);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// Instances信息放到dataStore中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 发出服务列表修改的通知
notifier.addTask(key, ApplyAction.CHANGE);
}
notifier是一个实现Runnable接口的类,内部有一个阻塞队列,addTask方法就是往阻塞队列里添加服务变更的通知。
服务端有个类DistroConsistencyServiceImpl,里面定义了一个init方法,方法中开启了一个线程,并提交了一个notifier任务。也就是服务端启动的时候,就会开启一个线程执行notifier任务。我们来看notifier的run方法
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(String datumKey, ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
//死循环,循环从阻塞队列中获取服务变更的通知。
while (true) {
try {
//从阻塞队列中获取服务变更的通知
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
//获取服务变更类型,有更新和下线删除两种
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
// 服务列表变更。因为我们这里是注册,相当于更新服务列表。因此走这个if分支。
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
// 真正的更新服务注册列表
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 创建一个临时的服务对应的map。
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
// 遍历instance列表
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
// 设置cluster名称
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 创建一个空的Cluster,设置到clusterMap中
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJSON());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 将instance添加到ipMap中此instance对应的clustername位置上
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
// 遍历ipMap。将instance对应的cluster更新到clusterMap中。
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 真正的更新服务列表
clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
// 服务列表更新了,发生变化,发布推送事件,触发服务端向客户端的推送任务
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}",
getNamespaceId(), getName(), stringBuilder.toString());
}
public void updateIPs(List<Instance> ips, boolean ephemeral) {
//此处省略。主要是合并老的服务列表和新的服务列表,合并为一个最新的服务列表。
.....
.....
.....
// 因为是临时实例。更新最新的服务列表到ephemeralInstances
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
服务注册的主要过程就是,将服务列表更新的通知放入阻塞队列。
服务端启动时候会开启一个线程,专门从这个阻塞队列中获取通知,拿到最新的服务列表,并更新到service中的clusterMap中去。也就是更新最底层Cluster中的ephemeralInstances变量,此变量就是存放当前cluster下的所有服务列表。
至此,服务注册完成。
/**
* Map<namespace, Map<group::serviceName, Service>>
*/
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
这里需要提一下服务端存储临时实例的结构。
本质上就是serverMap。它的结构是:
serverMap:
key是namespace
value是Map<group::serviceName, Service>
而Map<group::serviceName, Service>:
key是组名::服务名
value是service对象
而service中存了一个clusterMap,是一个hashmap。
Map<String, Cluster> clusterMap = new HashMap<>();
key是clustername
value是cluster对象
而cluster存了两个set集合:
一个存放临时服务列表(private Set<Instance> ephemeralInstances = new HashSet<>();)
一个存放持久服务列表(private Set<Instance> persistentInstances = new HashSet<>();)
服务注册最后更新的就是这两个set集合。
服务注册过程总结:
客户端启动的时候,向服务端发起Http接口调用,调用服务注册的接口。服务端收到注册请求,将新的注册信息和老的服务列表封装为一个Pair对象,并放入阻塞队列。服务端在启动的时候会创建一个线程池,并提交一个任务,这个任务就是循环从阻塞队列里拿Pair对象,对其解析,更新服务所在的service内部的clusterMap中的服务列表。
3 服务健康检查
我们从上面服务注册的时候,在进行创建空service的时候,会向服务健康检查线程池中,提交一个健康检查任务,此任务就是针对当前service下所有服务列表进行健康检查。
public void init() {
// 开启服务健康检测。向线程池中提交健康检测任务。
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
我们下面看ClientBeatCheckTask的run方法。
@Override
public void run() {
try {
// 判断当前服务器是否需要对此服务执行健康检查
if (!getDistroMapper().responsible(service.getName())) {
return;
}
// 获取当前service下的所有服务列表
List<Instance> instances = service.allIPs(true);
// 第一次检查
for (Instance instance : instances) {
// 判断最后一次心跳时间与当前时间差是否超过了15s。超过了就设置健康状态为false,表示服务已掉线
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
// 服务发生变化,发布推送事件,触发服务端向客户端的推送任务
getPushService().serviceChanged(service);
SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// 第二次检查
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 判断最后一次心跳时间与当前时间差是否超过了30s。超过了就踢出该服务
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
// 踢出服务
deleteIP(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
public boolean responsible(String serviceName) {
if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
return true;
}
if (CollectionUtils.isEmpty(healthyList)) {
// means distro config is not ready yet
return false;
}
int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) {
return true;
}
// 计算服务的hash值与集群长度取模
int target = distroHash(serviceName) % healthyList.size();
// 如果是当前服务器在集群中的index,就返回false,也就是执行下面的健康检查
return target >= index && target <= lastIndex;
}
检查的方法就是:
如果最后心跳时间与当前时间差超过15s就会设置为false,也就是掉线。时间差超过30s就会将此服务踢出服务列表。
这里有个性能问题,如果服务列表很多很多,那么单台nacos服务器需要一次检查很多个服务。因此在使用nacos集群的时候,会对每个服务计算一个hash值,然后对集群列表的长度取模,如果等于当前nacos服务器在集群中的index,那么就执行对当前service的健康检查,如果不是,就不检查。这样的好处是,多台服务器可以分担健康检查的压力,类似分片的思想。
踢出服务列表就是执行 deleteIP(instance);我们下面看里面做了什么。
private void deleteIP(Instance instance) {
try {
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp())
.appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true")
.appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName())
.appendParam("namespaceId", service.getNamespaceId());
String url = "http://127.0.0.1:" + RunningConfig.getServerPort() + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
// 还是发起Http接口调用。url是http://127.0.0.1:port/nacos/v1/ns/instance.是delete请求
HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {
@Override
public Object onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJSON(), response.getResponseBody(), response.getStatusCode());
}
return null;
}
});
} catch (Exception e) {
Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJSON(), e);
}
}
同样的,这里踢出服务列表,就是调用删除服务的接口。进入接口代码。
@CanDistro
@DeleteMapping
public String deregister(HttpServletRequest request) throws Exception {
Instance instance = getIPAddress(request);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
// 获取服务所在的service
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
return "ok";
}
// 开始删除服务
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return "ok";
}
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
Service service = getService(namespaceId, serviceName);
// 对当前service加锁,防止并发问题。
synchronized (service) {
//剔除服务
removeInstance(namespaceId, serviceName, ephemeral, service, ips);
}
}
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取删除当前服务后的服务列表
List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 更新服务列表并同步数据到其他节点
consistencyService.put(key, instances);
}
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// Instances信息放到dataStore中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 发出服务列表修改的通知
notifier.addTask(key, ApplyAction.CHANGE);
}
删除服务实例的接口最终调用的地方和服务注册时一模一样。就是将需要删除的服务和老的服务列表合并为一个新的服务列表,封装到dataStore里,提交服务改变的通知到阻塞队列。由服务端的线程从阻塞队列获取服务列表改变的通知,从dataStore拿出最新的服务列表,更新Cluster中的存放临时服务列表的set集合。
我们可以发现,删除服务和注册服务接口逻辑类似。都是将更新后的服务列表封装起来,然后添加一个通知,让其他线程去获取通知,将新的服务列表更新进去。
4 服务同步
nacos中临时实例集群同步采用的是AP模式,而持久实例采用的是CP模式。这里对临时实例的服务同步进行分析。
通过上面服务同步和服务删除的接口,我们可以看到本质上都是更新服务列表。更新完之后调用taskDispatcher.addTask(key);向其他节点发起服务同步请求。
@Override
public void put(String key, Record value) throws NacosException {
//更新服务列表
onPut(key, value);
//服务同步
taskDispatcher.addTask(key);
}
public void addTask(String key) {
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}
public void addTask(String key) {
// queue是一个阻塞队列。向阻塞队列中添加一个服务同步的通知。
queue.offer(key);
}
可以发现服务同步也是向阻塞队列中添加一个服务同步的通知。当然这个阻塞队列和上面的注册服务的阻塞队列不是同一个。我们可以猜想,肯定是服务端某个类初始化的时候,开启了一个线程,线程执行的任务就是循环从阻塞队列中获取通知。
@Component
public class TaskDispatcher {
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
//初始化方法
@PostConstruct
public void init() {
for (int i = 0; i < cpuCoreCount; i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
// 向线程池中提交一个TaskScheduler任务
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}
......
......
......省略
果然TaskDispatcher这个类初始化的时候,开启了线程,执行TaskScheduler任务,我们来看run方法。
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
// 从阻塞队列中取出要同步的key
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}", key);
}
// 如果是单机,就不需要同步
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
//key放到keys集合中。
keys.add(key);
//对keys计数
dataSize++;
// 当数量满足批量同步的最小数量时,对存放的所有key,开始同步操作
// 或者上一次同步时间与当前时间差超过了指定时间,也进行同步
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
// 获取所有服务器,遍历依次发送同步请求
for (Server member : dataSyncer.getServers()) {
// 如果是当前服务器,那么就不同步。
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
// 将同步的key封装到SyncTask任务对象中
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
// 提交服务同步的任务
dataSyncer.submit(syncTask, 0);
}
// 记录最后同步的时间
lastDispatchTime = System.currentTimeMillis();
// 重置计数器
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
我们可以看到同步的时候并不是每次有服务列表变更,就执行同步,这样很耗资源,影响性能。而是将同步的服务累积起来,等到某个值进行批量同步。或者超时了也执行同步(防止服务达不到批量同步的阈值而一直不同步)。
public void submit(SyncTask task, long delay) {
// If it's a new task:
// 下面是将所有的key放到taskMap中管理
if (task.getRetryCount() == 0) {
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process, key: {}", key);
}
iterator.remove();
}
}
}
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
// 真正的提交同步请求的任务
GlobalExecutor.submitDataSync(() -> {
// 检查是否是单机
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
List<String> keys = task.getKeys();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
}
// 2. get the datums by keys and check the datum is empty or not
// 获取keys的所有对应的服务
Map<String, Datum> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : keys) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
// 序列化
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
// 发起同步请求。里面就是一个Http接口调用。调用同步服务接口。
// 接口url是/nacos/ns/distro/datum
boolean success = NamingProxy.syncData(data, task.getTargetServer());
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
// 如果不成功,重试
retrySync(syncTask);
} else {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
}, delay);
}
public void retrySync(SyncTask syncTask) {
Server server = new Server();
server.setIp(syncTask.getTargetServer().split(":")[0]);
server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
if (!getServers().contains(server)) {
// if server is no longer in healthy server list, ignore this task:
//fix #1665 remove existing tasks
if (syncTask.getKeys() != null) {
for (String key : syncTask.getKeys()) {
taskMap.remove(buildKey(key, syncTask.getTargetServer()));
}
}
return;
}
// TODO may choose other retry policy.
// 又提交了一次同步服务的任务
submit(syncTask, partitionConfig.getSyncRetryDelay());
}
服务同步本质上也就是将更新的服务序列化,调用同步服务的接口,传递给集群中其他节点。
不过这里有个小问题。就是如果其他节点出现问题,比如挂了。那么这里肯定同步不成功,那么它会不断的重试,不断的执行同步调用,又不断的失败。这样其实是无意义的,作者在这里也标注了,之后会更新,使用其他重试机制。比如,重试的时间延长,失败次数如果达到某个次数,就停止同步。
下面我们看服务端的接受同步请求的接口。
//同步服务的接口
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
//校验参数
if (dataMap.isEmpty()) {
Loggers.DISTRO.error("[onSync] receive empty entity!");
throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
}
// 遍历获取key
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
String serviceName = KeyBuilder.getServiceName(entry.getKey());
if (!serviceManager.containService(namespaceId, serviceName)
&& switchDomain.isDefaultInstanceEphemeral()) {
serviceManager.createEmptyService(namespaceId, serviceName, true);
}
// 又是熟悉的调用服务列表更新的方法
consistencyService.onPut(entry.getKey(), entry.getValue().value);
}
}
return ResponseEntity.ok("ok");
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// Instances信息放到dataStore中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 发出服务列表修改的通知
notifier.addTask(key, ApplyAction.CHANGE);
}
同步服务的接口和之前的服务注册、删除过程一毛一样。同样的获取接口参数,拿出要同步的服务信息,将服务封装起来,添加一个服务变更通知到阻塞队列。让其他线程去阻塞队列取数据,然后完成服务列表的更新。
临时节点同步模式是AP模式的原因是,集群中无主节点,第一次收到数据的节点,向其他节点发起同步请求,其他节点就算没有同步到服务,整个集群也能正常对外客户端提供服务。因此满足了可用性,所以是AP模式。
5 服务发现
服务发现大致过程:
客户端定时从服务端拉取最新的服务列表数据,将服务列表加载到本地缓存。同样服务端也会定时向客户端推送服务列表数据。
5.1 客户端拉取服务列表
先从客户端入手。在使用Ribbon的情况下,我们获取服务列表,需要定义一个namingService,从spring容器中自动装配。然后手动调用namingService.getAllInstances
我们进入namingService的子类NacosNamingServer这个类,有个getAllInstances方法。
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// 判断是否是订阅模式
if (subscribe) {
// 定时任务,定时调用服务发现接口,获取服务列表
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
// 调用一次服务发现接口,获取服务列表
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException {
// 获取服务列表
String result = serverProxy.queryList(serviceName, clusters, 0, false);
if (StringUtils.isNotEmpty(result)) {
return JSON.parseObject(result, ServiceInfo.class);
}
return null;
}
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
// Http接口调用,调用服务获取列表。url是/nacos/v1/instance/list
return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
如果客户端不是订阅模式,我们可以看出只是简单的发了一次Http接口调用。拉取一次服务列表。
如果是订阅模式,下面进入定时拉取服务信息的代码。这块还是比较复杂的。
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 从本地缓存serviceInfoMap获取服务列表
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 获取不到,发起一次服务列表拉取
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// 设置到updatingMap,表示当前针对当前服务名,正在向服务端拉取服务列表数据
updatingMap.put(serviceName, new Object());
// 拉取结束。唤醒所有因等待当前线程拉取数据而阻塞的线程
updateServiceNow(serviceName, clusters);
// 此服务不在拉取数据了。表示其他线程可以进行接口调用拉取数据了
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
// 表示当前有线程正在从服务端拉取服务列表
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
// 阻塞,等待某个线程拉取完数据。
// 这样确保了能获取到最新拉取的数据,也就是确保了数据的实时性
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 提交获取服务端服务列表的任务。里面会定时调用接口,拉取最新的服务列表。
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
// 添加一个拉取数据的任务
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
从先从本地缓存中获取,获取不到就立马调用一次服务发现接口(这样做是确保最后能返回数据)。如果缓存中获取到了,会判断当前是否正在拉取数据,在拉取数据的话会阻塞等待数据拉取完,好处是确保我这一次返回正在拉取的最新的服务列表。但是不管缓存中有没有,最后都会提交一个更新本地服务列表缓存的UpdateTask任务到线程池。我们进入它的run方法。
@Override
public void run() {
try {
// 从缓存中获取数据
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
// 从缓存中获取不到,那么就立即调用接口拉取服务列表
updateServiceNow(serviceName, clusters);
// 循环提交当前任务。延迟10s执行
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
// 当前service更新时间小于等于最后一次拉取的时间。
// 说明服务端暂时没有推数据过来。那就自己拉取一次最新数据
if (serviceObj.getLastRefTime() <= lastRefTime) {
//立即调用接口拉取服务列表
updateServiceNow(serviceName, clusters);
// 获取刚刚拉取的服务列表
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// 说明服务端推数据过来了。那就说明有可能服务列表发生了变化,或者正在发生变化。
// 所以这里调用一次接口,强制让服务端来推最新数据。
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
// 更新最后拉取时间
lastRefTime = serviceObj.getLastRefTime();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
该定时任务主要作用就是,正常情况,每隔一段时间自己拉取一次。但是如果发现数据被修改掉了,那就说明服务端自己推送最新的数据过来了,这也表明服务端可能收到了该service下的其他客户端的更新列表的数据。那么这里就调用一下服务发现接口,让服务端再推送一次数据过来。这样提高了客户端服务列表的实时性。
下面看服务发现的接口。
@GetMapping("/list")
public JSONObject list(HttpServletRequest request) throws Exception {
// 下面是解析请求,获取客户端信息
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
// 获取服务列表
return doSrvIPXT(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
public JSONObject doSrvIPXT(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort,
String env, boolean isCheck, String app, String tid, boolean healthyOnly)
throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
JSONObject result = new JSONObject();
// 获取客户端服务所在的service实例
Service service = serviceManager.getService(namespaceId, serviceName);
// 非空校验
if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("hosts", new JSONArray());
return result;
}
// 校验是否可用
checkIfDisabled(service);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// 尝试开启向客户端推送服务列表的功能
try {
// 推送时的通信方式采用的是UDP,校验客户端是否开启UDP端口,以及是否满足推送的要求
if (udpPort > 0 && pushService.canEnablePush(agent)) {
// 将客户端添加到要推送的列表中
pushService.addClient(namespaceId, serviceName,
clusters,
agent,
new InetSocketAddress(clientIP, udpPort),
pushDataSource,
tid,
app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-API] failed to added push client", e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
// 创建存放服务列表的集合
List<Instance> srvedIPs;
// 获取服务列表
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// 如果客户端设置了过滤器,这里需要根据客户端过滤器规则过滤一些服务
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
// 服务列表为空,直接返回
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("hosts", new JSONArray());
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.put("metadata", service.getMetadata());
return result;
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
// 设置两个key,一个存放健康服务列表,一个存放不健康服务列表
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
// 将不同健康状态的服务分别放到ipMap中
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
if (isCheck) {
result.put("reachProtectThreshold", false);
}
double threshold = service.getProtectThreshold();
// 大概是服务列表中健康的比率小于某个阈值时候,将所有不健康的添加到健康的服务列表里。
// 这里看不懂这步骚操作的意义。因为下面返回的时候,还是会剔除不健康的服务
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return new JSONObject();
}
JSONArray hosts = new JSONArray();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
// 过滤不健康的
if (healthyOnly && !entry.getKey()) {
continue;
}
// 将所有instance添加到json数组中
for (Instance instance : ips) {
// remove disabled instance:
if (!instance.isEnabled()) {
continue;
}
JSONObject ipObj = new JSONObject();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.put("metadata", instance.getMetadata());
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
// 将服务列表的json数组放到result返回值中
result.put("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.put("metadata", service.getMetadata());
return result;
}
下面是获取服务列表的代码
public List<Instance> srvIPs(List<String> clusters) {
if (CollectionUtils.isEmpty(clusters)) {
clusters = new ArrayList<>();
clusters.addAll(clusterMap.keySet());
}
// 获取service下所有cluster下的服务列表
// 因为一个service下clusterMap,也就是可以存放多个cluster,每个cluster下就是一组服务列表
// 这也印证了,客户端可以获取其他cluster的服务,也就可以跨clster调用
return allIPs(clusters);
}
public List<Instance> allIPs(List<String> clusters) {
List<Instance> allIPs = new ArrayList<>();
// 遍历cluster
for (String cluster : clusters) {
Cluster clusterObj = clusterMap.get(cluster);
if (clusterObj == null) {
continue;
}
// 从cluster中获取所有的对应的instance的set集合。包括临时的和持久的
allIPs.addAll(clusterObj.allIPs());
}
return allIPs;
}
public List<Instance> allIPs() {
List<Instance> allInstances = new ArrayList<>();
// 添加临时的
allInstances.addAll(persistentInstances);
// 添加持久的
allInstances.addAll(ephemeralInstances);
return allInstances;
}
服务发现总结:
客户端发起服务获取的请求。服务端获取客户端服务所在的service下的所有cluster下的服务列表,并剔除其中不健康的,将所有健康状态的服务列表返回给客户端。
5.2 服务端推送服务列表
下面再来研究下服务端向客户端推送服务列表的原理,采用的是UDP推送。
从上面的 pushService.addClient代码进去。
public void addClient(String namespaceId,
String serviceName,
String clusters,
String agent,
InetSocketAddress socketAddr,
DataSource dataSource,
String tenant,
String app) {
PushClient client = new PushClient(namespaceId,
serviceName,
clusters,
agent,
socketAddr,
dataSource,
tenant,
app);
// 添加要推送的客户端
addClient(client);
}
public static void addClient(PushClient client) {
// client is stored by key 'serviceName' because notify event is driven by serviceName change
String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
// 从客户端列表clientMap中获取当前service下的所有client
ConcurrentMap<String, PushClient> clients =
clientMap.get(serviceKey);
// 不存在就创建
if (clients == null) {
clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<String, PushClient>(1024));
clients = clientMap.get(serviceKey);
}
// 判断当前客户端有没有放入过
PushClient oldClient = clients.get(client.toString());
if (oldClient != null) {
oldClient.refresh();
} else {
// 没放入,就将当前客户端放入clientMap中
PushClient res = clients.putIfAbsent(client.toString(), client);
if (res != null) {
Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());
}
Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());
}
}
我们发现上面的代码仅仅是将要推送的客户端放入到clientMap中。那么肯定是有地方开启线程,对clientMap中的客户端完成服务列表的推送。
我们查看下PushService的类继承图
我们发现PushService实现了ApplicationListener接口(spring监听器)。
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent>
再查看下PushService 的内部有个方法serviceChanged,方法作用就是发布推送服务列表的事件,如果某个地方调用了serviceChanged方法,发布ServiceChangeEvent事件,那么会触发PushService的onApplicationEvent方法。
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
// 发布推送服务列表的事件
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
从上面的服务注册、心跳、健康检查的代码中,我们都能看到,每次服务列表发生变更或者是服务的健康状态发生变化,都会调用serviceChanged方法,进而触发了onApplicationEvent事件。因此我们看下onApplicationEvent方法干了什么?
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
// 获取发生变化的service的信息
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
// 向线程池中提交一个推送服务列表的任务
Future future = udpSender.schedule(new Runnable() {
@Override
public void run() {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
// 获取service下所有客户端。当然是已经添加到clientMap中的。
// 也就是已经向服务端调用过服务发现接口的client客户端。因为调用过接口,才会将client加入到clientMap中
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
// 定义一个AckEntry,里面封装udp数据包
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
// 从缓存中取服务列表数据,Pair里面封装的就是服务列表。
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
// 缓存没命中,自己去获取服务列表
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
// prepareHostsData(client)就是获取要推送的服务列表数据
// prepareAckEntry就是将数据封装为udp数据包
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));
// 开始udp推送
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
//真正的udp推送
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
// 基于udp推送数据
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
// 循环提交udp推送任务,也就是每隔10s进行一次服务列表推送到客户端
executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),
TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",
ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
下面,我们看下是如何获取服务列表数据的。先从缓存中拿,拿不到,再自己到serviceMap中获取。下面是到serviceMap中获取的代码。
private static Map<String, Object> prepareHostsData(PushClient client) throws Exception {
Map<String, Object> cmd = new HashMap<String, Object>(2);
cmd.put("type", "dom");
// getData是真正的获取服务列表
cmd.put("data", client.getDataSource().getData(client));
return cmd;
}
private DataSource pushDataSource = new DataSource() {
@Override
public String getData(PushService.PushClient client) {
JSONObject result = new JSONObject();
try {
// doSrvIPXT方法就是熟悉的服务发现接口中获取当前service下的所有服务列表
result = doSrvIPXT(client.getNamespaceId(), client.getServiceName(), client.getAgent(),
client.getClusters(), client.getSocketAddr().getAddress().getHostAddress(), 0, StringUtils.EMPTY,
false, StringUtils.EMPTY, StringUtils.EMPTY, false);
} catch (Exception e) {
Loggers.SRV_LOG.warn("PUSH-SERVICE: service is not modified", e);
}
// overdrive the cache millis to push mode
result.put("cacheMillis", switchDomain.getPushCacheMillis(client.getServiceName()));
return result.toJSONString();
}
};
服务端推送总结:
客户端向服务端拉取数据的同时,会将客户端信息注册到clientMap中。等到下一次发生心跳、服务列表数据变更、健康状态发生变化等,都会触发推送事件。在推送事件方法中,服务端将此客户端对应的service下的所有服务列表基于UDP推送给客户端,并开启一个定时任务,每隔10s定时推送数据到客户端。