建议看这篇文章前看一下Raft实现,我也写了一篇文章Nacos集群
官网概念:名字服务 (Naming Service)
提供分布式系统中所有对象(Object)、实体(Entity)的“名字”到关联的元数据之间的映射管理服务,例如 ServiceName -> Endpoints Info, Distributed Lock Name -> Lock Owner/Status Info, DNS Domain Name -> IP List, 服务发现和 DNS 就是名字服务的2大场景。
对于我们来讲用的多的就是作为服务发现来了,这篇文中也是主要解析服务发现功能,对于DNS会简单的提到;
在开始我们分析源码前,还是先回忆nacos的模型:
Nacos 数据模型 Key 由三元组唯一确定, Namespace默认是空串,公共命名空间(public),分组默认是 DEFAULT_GROUP。服务发现用的是Service服务了

服务领域模型
nacos的服务也是基于service->cluster->instances,依次包含关系,service包含多个cluster,cluster包含多个instances;看官网的模型图:

同样的逻辑我们从官网的demo开始分析
public class NamingExample {
public static void main(String[] args) throws NacosException {
Properties properties = new Properties();
properties.setProperty("serverAddr", "127.0.0.1");
properties.setProperty("namespace", "public");
//创建名字服务
NamingService naming = NamingFactory.createNamingService(properties);
//注册实例
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
System.out.println(naming.getAllInstances("nacos.test.3"));
//注销实例
naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
System.out.println(naming.getAllInstances("nacos.test.3"));
//订阅服务事件
naming.subscribe("nacos.test.3", new EventListener() {
@Override
public void onEvent(Event event) {
System.out.println(((NamingEvent)event).getServiceName());
System.out.println(((NamingEvent)event).getInstances());
}
});
}
}
通过上面demo看和配置中心类似,这里我们不禁会提出类似的疑问:
- 1.服务实例注册数据是否有持久化
- 2.远端服务变化后,是采用服务端push,还是客户端pull获取变化数据
- 3.服务启动会加载服务实例吗?如果有怎么加载?
- 4.集群的服务间数据一致性怎么保证?
- 5....
我们带着问题去看源码,更具有导向性,理解也更快
1.客户端NameService构建
NamingService naming = NamingFactory.createNamingService(properties);这一步会创建NacosNamingService服务
创建服务的时候应该是把所有该初始化的都给初始化了;简单粗暴直接上源码
public NacosNamingService(Properties properties) {
init(properties);
}
private void init(Properties properties) {
namespace = InitUtils.initNamespaceForNaming(properties);
//通过获取集群 server列表,如果endpoint存在,则使用endpoint获取列表,取消自定义的服务列表
initServerAddr(properties);
InitUtils.initWebRootContext();
//启动本地缓存目录文件
initCacheDir();
//本地naming.log日志文件
initLogName(properties);
//有一个后台处理线程 --事件分发器,变化的服务会添加到服务变化队列中,不停的获取服务分别通知监听改服务的监听器
//通过阻塞队列实现,阻塞时间是5分钟;com.alibaba.nacos.naming.client.listener 线程名
eventDispatcher = new EventDispatcher();
//定时任务 30s执行一次;根据endpoint获取更新后service列表;还有服务发现的代理
//com.alibaba.nacos.client.naming.serverlist.updater 线程名
serverProxy = new NamingProxy(namespace, endpoint, serverList);
serverProxy.setProperties(properties);
// 注册服务的时候 做非持久的服务定时检查更新,如果服务没有注册上会再次注册
//后台线程池 com.alibaba.nacos.naming.beat.sender
beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
//两个任务:
//1.failover服务 的持久化处理(故障转移)
//2.使用 DatagramSocket() 监听服务端的服务变化(这里是服务端推送),
//如果有变化会更新本地磁盘和本地服务列表;以及触发服务变化事件
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties),
initPollingThreadCount(properties));
}
1.1 分析BeatReactor
//心跳检测
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
我们发现构建了一个线程池,那么我们猜想这个线程池会在某个条件使用的;搜索这个线程的使用发现在添加BeatInfo的时候会触发一个task
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
//这里会执行该BeatTask任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
//其实在这里我们还建议看一下addBeatInfo时候时候调用的,发现只有一个地方调用就是注册实例的时候调用
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
//这里发现是非持久化的时候回添加beatInfo
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
//添加beatInfo信息
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
分析总结:当我们的服务实例使用非持久化模式就会触发beatTask,猜想也应该如此,非持久化那么我们就要保证我们的当前服务一定要注册上;比如服务挂了再重启我们要保证服务实例一定要注册上服务中心供其他服务发现。
这里的beatTask就应该是这个功能,继续回到之前我们的定时任务上
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
//表示该beatInfo停止了,不用心跳检测了
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
//服务端返回的心跳时间作为下次延时心跳时间
long interval = result.getIntValue("clientBeatInterval");
boolean lightBeatEnabled = false;
if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.containsKey(CommonParams.CODE)) {
code = result.getIntValue(CommonParams.CODE);
}
//服务中心没有当前实例会把当前信息再次注册上
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
//注册服务
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ne) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg());
}
//下次服务心跳发送
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
//顺带看下服务端对于这个beatTask请求做了什么处理,这里请求的是InstanceController /instance/beat接口,put方法
@CanDistro
@PutMapping("/beat")
public JSONObject beat(HttpServletRequest request) throws Exception {
JSONObject result = new JSONObject();
result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME,
UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JSON.parseObject(beat, RsInfo.class);
}
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
}
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
if (instance == null) {
if (clientBeat == null) {
//这里返回服务未发现
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
//没有实例会直接注册
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
//这里会对心跳数据做继续的处理,如果有数据变化会通过UDP发送数据包给客户端,这里后面分析;
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
稍微总结下:
beanReactor构建线程池会在每次注册非持久化实例的时候,构建这个实例的心跳检测,如果服务端检测没有这个服务会注册服务;
1.2 分析HostReactor
上面我们看完beatReactor,再看一下hostReactor
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {
this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {
executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.updatingMap = new ConcurrentHashMap<String, Object>();
//failover服务 的持久化处理(故障转移)
this.failoverReactor = new FailoverReactor(this, cacheDir);
//使用 DatagramSocket() 监听服务端的服务变化,如果有变化会更新本地磁盘和本地服务列表;并产生恢复
this.pushReceiver = new PushReceiver(this);
}
//先看failoverReactor
public void init() {
//把failoverdir的文件读取出来放在内存中,主要更新failover-mode模式的开关
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
//把从远程的获取服务实例信息从内存中 往failoverDir文件 写备份好做故障转移,30min一次
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
// backup file on startup if failover directory is empty.
executorService.schedule(new Runnable() {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
}
}
}, 10000L, TimeUnit.MILLISECONDS);
}
//在看 PushReceiver具体执行
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
//新建了一个udp做数据监听
udpSocket = new DatagramSocket();
executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
//这里执行自己实现的run方法
executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
@Override
public void run() {
while (true) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
//这里就很明显了通过udp来监听服务端的配置信息了
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
//处理本地保存的远程服务实例是否更改,本地缓存修改及事件触发
hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
//回复
udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
总结创建NamingService初识化的功能:
- 1.通过beatReactor创建心跳保证自己的注册服务能够注册上
- 2.failoverReactor定时将缓存的服务实例写入磁盘以作故障转移
- 3.PushReciver通过创建udpSocket来监听服务端的实例变化以响应本地数据变化
2.创建实例
这个代码注册服务实例,注册过程有发生了什么呢?
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
源码分析:
我们会创建一个Instance;我们脑袋里需要再次装上service-cluster-instance的模型
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
//这个就是我们上面对于BeatReactor做的分析,保证本地服务给注册到服务中心上,这里具体就参考上面了
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
//代理发送服务请求 ;接口:instance,post方法了
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
//继续深入看一下具体哪些信息给注册到服务中心了;这里也注意一点注册的时候也是从多个服务中选择一个发送请求,如果失败了在换一个发送;
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
这里就是一个简单的实例创建;我们主要看服务端 做了哪些处理
InstanceController
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//具体分析在这里面实现
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}
继续深入看
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//1.没有服务会创建服务,然后有一系列事件触发
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//2.添加实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
我们创建服务的过程
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
Service service = getService(namespaceId, serviceName);
//这里只有在服务端没有这个服务才会创建服务
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
//验证服务名是否有效
service.validate();
//主要是这里处理数据
putServiceAndInit(service);
if (!local) { //这里local的是就是持久化,这里默认也不会走,表示非持久
//会通过持久化做数据存储
addOrReplaceService(service);
}
}
}
//继续看这里做服务的存储和初始化
private void putServiceAndInit(Service service) throws NacosException {
//这个比较简单,就是在内存中通过namespace-serviceName-service存储起来
putService(service);
service.init();
//这里注册了两个监听器:是否持久化监听器
//持久化的通过RaftConsistencyServiceImpl处理
//非持久化是DistroConsistencyServiceImpl做处理;另外一个就是他俩的代理而已
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
}
我们分为持久化和非持久化,其实我们发现代码,持久化的数据会包含所有非持久化的操作;
继续看service的init方法发生了什么?
public void init() {
//好家伙这里还有一个心跳检测
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
//这里更新cluster中引用的service数据
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
//看一下这个ClientBeatCheckTask,从名字上看就是客户端心跳检测
@Override
public void run() {
try {
//这里单机就不走这里了,其他可以稍微研究下
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
//获取所有的非持久化实例
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
//这里会通过pushService发布ServiceChangeEvent事件
getPushService().serviceChanged(service);
SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
//删除超过过期时间的实例
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
deleteIP(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
//这里就是发布事件
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
//PushService本身实现了监听功能
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = udpSender.schedule(new Runnable() {
@Override
public void run() {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
//这里会将持有对于当前服务的有监听的客户端udp做信息通知返回
//至于这里的pushClient的值怎么来,猜测也肯定是我们客户端需要监听某一个服务实例的时候创建的。
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
...省略了细节
//udp发送更新的信息
udpPush(ackEntry);
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
上面分析了一大串;就讲了非持久数据数据处理流程,服务端创建后会触发定时任务;实例数据变化了通过udp发送变化数据通知给客户端;如果有实例心跳检测不通过会删掉服务存储的数据
那么非持久怎么保证集群所有的节点信息统一呢?
我们实际获取的都是实例,我们来看添加实例的流程。
//上面添加实例的处理
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//这一步就是关键,因为这里分为持久化和非持久存储
consistencyService.put(key, instances);
}
}
1.先看我们默认的实现,非持久化处理DistroConsistencyServiceImpl
@Override
public void put(String key, Record value) throws NacosException {
//内存存储给实例,添加这个实例的监听列表,这里产生了变化会发送udp通知给客户端
onPut(key, value);
//给相同的节点发送实例变化通知,继续看这个
taskDispatcher.addTask(key);
}
public class TaskDispatcher {
public void addTask(String key) {
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}
//对于当前实例生成的key会加入到一个队列中,那么肯定有线程处理了;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public void addTask(String key) {
queue.offer(key);
}
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}", key);
}
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
keys.add(key);
dataSize++;
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
//循环发送给各个服务节点
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
//通知其他节点实例发生变化,这里就不展开分析了
//1.会通过http("/distro/datum")打包数据给其他节点更新
//2.如果发送失败,会记录下来,重新发送数据
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
}
}
2.看持久化存储RaftConsistencyServiceImpl
这里简单看一下就行了,前面分析RaftCore一致性也分析了;数据更新是通过转发给leader处理,处理后发送给其他follower节点,超过过半节点表示数据添加成功
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
}
}
总结下:创建实例(没有看删除,更新源码,猜测是一样)带来的一系列影响:
- 1.服务端会创建service,并启动后台线程监听这个服务的变化,如果一旦有变化会对客户端发送udp通知
- 2.服务端还有创建instance,分为持久化和非持久化处理
- 非持久化是直接存储在内存(distro)中,然后 打包当前实例给其他节点更新实例信息;如果失败隔一定时间会重复发送请求处理
- 持久化数据 使用的Raft算法处理,通过leader处理数据(内存和磁盘存储),然后广播其他follower,待超半数成功后返回处理成功
数据一致性问题:持久化和非持久化都会有这个问题,如果某些节点没有更新到数据,后期还会更新吗?
3.获取实例
上面我们还有一个问题就是,我们怎么把自己的udp的ip和端口发送给服务端,当我们需要的服务实例变化了好通知客户端;结论就是获取实例的时候回传递udp的ip+port
客户端请求
naming.getAllInstances("nacos.test.3");
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) { //默认是true
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
//这里直接从服务器获取,不会走本地缓存
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
//接着看
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
//失效转移给开启了,就走这一步
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//本地内存中读取
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
//如果任然没有,会从远程服务器读取
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
//参数组装请求
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort)); //端口,这是本地生成的一个端口
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
上面客户端分析的话就这样,服务端就是读数据返回;当然客户端和服务端应该应该还有心跳检测,如果没有检测服务端就会把udp设为无效状态。
服务端InstanceController代码:
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public JSONObject list(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
return doSrvIPXT(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
//具体的服务实例组装
public JSONObject doSrvIPXT(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort,
String env, boolean isCheck, String app, String tid, boolean healthyOnly)
throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
JSONObject result = new JSONObject();
Service service = serviceManager.getService(namespaceId, serviceName);
//服务都没有发现就直接返回了
if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("hosts", new JSONArray());
return result;
}
checkIfDisabled(service);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
//注册udp客户端服务
pushService.addClient(namespaceId, serviceName,
clusters,
agent,
new InetSocketAddress(clientIP, udpPort),
pushDataSource,
tid,
app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
List<Instance> srvedIPs;
//获取服务内存中所有的实例了
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// filter ips using selector:这里应该是服务端通过selector来过滤
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
//没有服务实例,组装数据返回
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("hosts", new JSONArray());
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.put("metadata", service.getMetadata());
return result;
}
//下面应该就是一些保护机制健康检查的一些过滤,具体就不扯远了.
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
if (isCheck) {
result.put("reachProtectThreshold", false);
}
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return new JSONObject();
}
JSONArray hosts = new JSONArray();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
if (healthyOnly && !entry.getKey()) {
continue;
}
for (Instance instance : ips) {
// remove disabled instance:
if (!instance.isEnabled()) {
continue;
}
JSONObject ipObj = new JSONObject();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.put("metadata", instance.getMetadata());
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
result.put("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.put("metadata", service.getMetadata());
return result;
}
4.数据一致性
之前我们也分析了,服务实例也是分为持久化和非持久化,那么在数据更新后会发送广播一个通知出去,如果还是失败了会有后续同步吗?
4.1非持久化
首先,非持久化服务器启动怎么获取数据?
DistroConsistencyServiceImpl初始化会启动数据加载
@PostConstruct
public void init() {
GlobalExecutor.submit(loadDataTask); //数据初始化
GlobalExecutor.submitDistroNotifyTask(notifier); //做数据变化的通知处理
}
//我们继续看数据初始化LoadDataTask
private class LoadDataTask implements Runnable {
@Override
public void run() {
try {
load();
if (!initialized) { //如果还没有初始化会延迟再次执行
GlobalExecutor.submit(this, globalConfig.getLoadDataRetryDelayMillis());
}
} catch (Exception e) {
Loggers.DISTRO.error("load data failed.", e);
}
}
}
//这段代码很容易理解了
public void load() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return;
}
// size = 1 means only myself in the list, we need at least one another server alive:
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
//遍历获取一个健康服务器
for (Server server : serverListManager.getHealthyServers()) {
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + server);
}
//从其他的健康服务器获取服务实例信息进行本地初始化,会请求/distro/datums拉数据
// try sync data from remote server:
if (syncAllDataFromRemote(server)) {
initialized = true;
return;
}
}
}
初始化数据看完了,我们再看一下服务器间数据的同步;上面我们有分析到服务变化了会通过一个遍历发送通知给其他服务,如果其他服务更新失败了怎么办?
我们再次看到DataSyncer类创建后初始化
@PostConstruct
public void init() {
startTimedSync(); //timer
}
//周期性5s执行一次数据同步
public void startTimedSync() {
GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
}
public class TimedSync implements Runnable {
@Override
public void run() {
try {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", getServers());
}
//代码很简单,进本地的数据组装好key和MD5发送给其他服务器
// send local timestamps to other servers:
Map<String, String> keyChecksums = new HashMap<>(64);
for (String key : dataStore.keys()) {
//满足这种情况的key是不会同步数据的
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
Datum datum = dataStore.get(key);
if (datum == null) {
continue;
}
keyChecksums.put(key, datum.value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
}
//除了自身节点,遍历发送给所有其他节点
//所以我们会发现集群下,所有所有节点之间都会回想发送数据
for (Server member : getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
//请求远程服务节点,check md5数据
NamingProxy.syncCheckSums(keyChecksums, member.getKey());
}
} catch (Exception e) {
Loggers.DISTRO.error("timed sync task failed.", e);
}
}
}
//具体的请求会组装为这样
public static void syncCheckSums(Map<String, String> checksumMap, String server) {
try {
Map<String, String> headers = new HashMap<>(128);
headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.VERSION);
headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
headers.put("Connection", "Keep-Alive");
//请求:/distro/checksum , put方法
HttpClient.asyncHttpPutLarge("http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL + "?source=" + NetUtils.localServer(),
headers, JSON.toJSONBytes(checksumMap),
new AsyncCompletionHandler() {
@Override
public Object onCompleted(Response response) throws Exception {
if (HttpURLConnection.HTTP_OK != response.getStatusCode()) {
Loggers.DISTRO.error("failed to req API: {}, code: {}, msg: {}",
"http://" + server + RunningConfig.getContextPath() +
UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL,
response.getStatusCode(), response.getResponseBody());
}
return null;
}
@Override
public void onThrowable(Throwable t) {
Loggers.DISTRO.error("failed to req API:" + "http://" + server
+ RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL, t);
}
});
} catch (Exception e) {
Loggers.DISTRO.warn("NamingProxy", e);
}
}
再看下服务端对于check md5的处理
@PutMapping("/checksum")
public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap) {
consistencyService.onReceiveChecksums(dataMap, source);
return ResponseEntity.ok("ok");
}
//具体的处理逻辑
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
//表示正在处理,直接返回当前请求
if (syncChecksumTasks.containsKey(server)) {
// Already in process of this server:
Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
return;
}
//记录一下
syncChecksumTasks.put(server, "1");
try {
List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
//获取需要更新的key
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
//这里就是检测key是不是我们需要同步的,如果不满足,直接返回。
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
return;
}
if (!dataStore.contains(entry.getKey()) ||
dataStore.get(entry.getKey()).value == null ||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
}
}
//获取需要删除的
for (String key : dataStore.keys()) {
//这里也是有一个判断,server指发送请求的服务器地址
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
}
for (String key : toRemoveKeys) {
onRemove(key);
}
if (toUpdateKeys.isEmpty()) {
return;
}
try {
//获取需要更新的数据,
byte[] result = NamingProxy.getData(toUpdateKeys, server);
//更新本地,并且进行存储
processData(result);
} catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!", e);
}
} finally {
// Remove this 'in process' flag:
//去除正在执行的标识
syncChecksumTasks.remove(server);
}
}
上面粗略的分析完,你会发现,非持久化没有leader,follower的概念,那么节点间相互的数据,以哪一个节点的数据为准呢?什么情况下以远程的数据为准来更新?其实我们细心一点会发现,在每次数据组装和心跳check md5的时候会有一个判断
//发送心跳数据的时候,如果不满足这个时才会被发送的,服务器收到这个后满足这个的数据不会被接受处理;那么这里面具体做了什么呢
distroMapper.responsible(KeyBuilder.getServiceName(key)
//详细逻辑
public boolean responsible(String serviceName) {
//集群
if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
return true;
}
if (CollectionUtils.isEmpty(healthyList)) {
// means distro config is not ready yet
return false;
}
//我们假设index和lastIndex是相同的,当前list就包含一个本地server
int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) {
return true;
}
//重点就是这里,对档期那的serviceName做hash处理然后和健康节点数取模
//如果target满足=index就表示这个数据以当前节点数据为准;这应该就是典型的分片存储吧,而且是没有备份
int target = distroHash(serviceName) % healthyList.size();
return target >= index && target <= lastIndex;
}
总结:
- 非持久化的数据在集群多节点的情况,采用serviceName的hash值和健康节点数做取模处理,让实例数据在每一个节点做分片存储,然后通过定时心跳同步数据。
- 初始化的数据也是通过去原程健康节点拉去数据
4.2 持久化处理
这个可以参考RaftCore类了
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
executor.submit(notifier);
long start = System.currentTimeMillis();
//这里就是从本地的缓存文件中获取数据了
raftStore.loadDatums(notifier, datums);
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
}
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
GlobalExecutor.registerMasterElection(new MasterElection());
//主要就是这个心跳任务了,主要就是主节点发送给从节点了,让从节点更新数据到主节点上。
GlobalExecutor.registerHeartbeat(new HeartBeat());
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
HeatBeat的实现细节
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
//发送心跳
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
public void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
//只允许master发送心跳
if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}
local.resetLeaderDue();
// build data
JSONObject packet = new JSONObject();
packet.put("peer", local);
JSONArray array = new JSONArray();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
}
if (!switchDomain.isSendBeatOnly()) {
for (Datum datum : datums.values()) {
JSONObject element = new JSONObject();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp);
array.add(element);
}
}
//将本地的数据发送给其他从节点,然其他从节点更新自己的数据
packet.put("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JSON.toJSONString(packet));
String content = JSON.toJSONString(params);
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}",
content.length(), compressedContent.length());
}
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildURL(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
response.getResponseBody(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return 1;
}
peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
return 0;
}
@Override
public void onThrowable(Throwable t) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
}
5.服务健康检查
待续
分析到此,服务发现就已经差不多了,上面我们提出的猜想和问题应该都可以自己解答了 ;做一个大总结:
客户端
- 客户端注册实例后还会通过定时器检查实例是否失败,保证自己的实例注册到服务上
- 客户端故障转移会定时将本地缓存的服务实例做磁盘缓存
- 客户端通过udp监听服务实例变化
服务器数据一致性
- 非持久(默认)处理:
- 服务端一致性:缓存在内存中,然后遍历发给给其他节点,不分主从节点
- 服务器启动开启定时任务将服务同步给其他节点,由于不分主从,每个节点都以服务名的hash值对健康节点取模做判定,如果取模结果是当前节点说明当前节点作为准确数据给同步其他节点。
- 推送客户端:通过udp推送给客户端数据更新
- 持久化处理
- 服务端一致性:Raft实现,转发给leader 缓存在内存中,存储进磁盘,待超过一半follower处理成功返回成功。
- 选举通过raft协议和term的概念,每次事务操作term会添加100,选举一次会term加一,具体参考集群一章节。
- master发送心跳会将本地缓存的服务实例压缩发送给follower节点,用于follower实时check md5,避免数据不一致。
- 推送客户端:通过udp推送给客户端数据更新