一、Nacos Server服务注册流程——AP模式
- Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl。
1、在Nacos Server的nacos-naming工程下的InstanceController类中的register方法作为服务注册的入口
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
@Autowired
private ServiceManager serviceManager;
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//这里可以看出Nacos作为服务注册中心没有用到group
//命名空间
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//服务名称
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//将服务注册注册请求参数转换成Instance
final Instance instance = parseInstance(request);
//注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
}
2、serviceManager.registerInstance注册服务实例
@Component
public class ServiceManager implements RecordListener<Service> {
//Nacos的注册表
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建一个空的服务
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//获取服务,从Nacos的注册表获取服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//新增实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//获取服务,从Nacos的注册表获取服务
Service service = getService(namespaceId, serviceName);
//加锁,同一时间同一命名空间下的同一服务,只能允许有一个服务注册请求
synchronized (service) {
//更新并返回总的instanceList列表
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
//创建新的instance列表对象
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//将实例列表集合和key设置进consistencyService中
consistencyService.put(key, instances);
}
}
//获取服务
public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseServiceMap(namespaceId).get(serviceName);
}
}
3、addIpAddresses更新并返回总的instance服务实例列表
这里面还做了挺多的事,先是获取老的数据(持久的或者临时的),从一致性服务里获取,因为这个数据是要同步更新的,所以要拿出来及时更新,然后获取服务实例(持久的或者临时的),用他们来更新的老的数据,然后遍历新增的实例,如果没有集群的话先创建集群,并初始化集群,会开启心跳检查,最后根据是添加还是删除实例来更新老的实例映射,最后封装成集合返回最新的实例集合。
@Component
public class ServiceManager implements RecordListener<Service> {
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
//重DataStore类中的dataMap获取老的实例集合数据
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
//获取集群中所有相关的实例集合,临时的或者是永久的
List<Instance> currentIPs = service.allIPs(ephemeral);
//IP端口和实例的映射
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
//实例ID集合
Set<String> currentInstanceIds = Sets.newHashSet();
//放入对应的集合里
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
//更新后的老的实例集合
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
//根据当前服务实例的健康标志和心跳时间,来更新老的实例集合数据
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
//重新创建一个
instanceMap = new HashMap<>(ips.length);
}
for (Instance instance : ips) {
//遍历新的实例
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
//不存在就创建服务实例集群
Cluster cluster = new Cluster(instance.getClusterName(), service);
//初始化,开启集群心跳检查
cluster.init();
//添加服务实例集群
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
//删除操作的话就删除老的实例集合的数据
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
//否则添加
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
//存在原实例,则直接使用原服务InstanceId
instance.setInstanceId(oldInstance.getInstanceId());
} else {
//否则,则直接使用原服务InstanceId
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
//返回总的实例集合
return new ArrayList<>(instanceMap.values());
}
}
-
Service的allIPs获取集群中的实例集合
遍历集群,获取集群里的实例集合,临时的或者是永久的。
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
public List<Instance> allIPs(boolean ephemeral) {
List<Instance> result = new ArrayList<>();
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
result.addAll(entry.getValue().allIPs(ephemeral));
}
return result;
}
}
- ServiceManager的setValid更新老的实例集合
- 其实就是用服务集群中获取的实例集合去更新老的实例集合,健康状态和心跳时间。
@Component
public class ServiceManager implements RecordListener<Service> {
private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) {
Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size());
//遍历老的实例集合,如果新的实例存在的话就更新
for (Instance instance : oldInstances) {
//获取对应新的实例
Instance instance1 = map.get(instance.toIpAddr());
//存在就更新
if (instance1 != null) {
instance.setHealthy(instance1.isHealthy());
instance.setLastBeat(instance1.getLastBeat());
}
//放入映射
instanceMap.put(instance.getDatumKey(), instance);
}
return instanceMap;
}
}
- Cluster的init集群初始化
- 即是开启一个心跳检查的任务
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
private HealthCheckTask checkTask;
public void init() {
if (inited) {
return;
}
checkTask = new HealthCheckTask(this);
HealthCheckReactor.scheduleCheck(checkTask);
inited = true;
}
}
4、将服务注册请求放入到ArrayBlockingQueue阻塞队列中,并将服务实例存入DataStore中的dataMap中
@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Override
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
}
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private final DataStore dataStore;
private volatile Notifier notifier = new Notifier();
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//将服务实例集合存入DataStore中的dataMap中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//添加数据变更的任务
notifier.addTask(key, DataOperation.CHANGE);
}
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
//在ArrayBlockingQueue中添加任务
tasks.offer(Pair.with(datumKey, action));
}
}
}
注意:在DistroConsistencyServiceImpl实例化完成之后,启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private volatile Notifier notifier = new Notifier();
@PostConstruct
public void init() {
//当前Bean实例话后,启动异步线程池,监听ArrayBlockingQueue中的任务进行消费
GlobalExecutor.submitDistroNotifyTask(notifier);
}
}
5、将服务注册请求放入到ArrayBlockingQueue阻塞队列后,处理该阻塞队列中的任务,在Notifier中的run方法中处理该任务
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private final DataStore dataStore;
private volatile Notifier notifier = new Notifier();
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
//取出注册请求任务
Pair<String, DataOperation> pair = tasks.take();
//处理任务
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
//如果是一个数据变更动作,服务注册数据数据变更
if (action == DataOperation.CHANGE) {
//从dataStore中获取服务注册请求放入的服务实例集合
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
//如果是一个数据删除动作
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
6、从dataStore中获取服务注册请求放入的服务实例集合,调用listener.onChange方法注册
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private Map<String, Cluster> clusterMap = new HashMap<>();
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
//权重最大值边界设定
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
//权重最小值边界设定
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//更新IP列表
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
//遍历服务注册的实例列表
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
//ClusterName为空,则设置默认值
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
//如果不包含ClusterName,则初始化
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
//初始化Cluster,即先创建服务健康检查任务,
//并调用HealthCheckReactor.scheduleCheck执行健康检查任务,即心跳机制
cluster.init();
//根据ClusterName注册服务集群实例
getClusterMap().put(instance.getClusterName(), cluster);
}
//根据ClusterName获取集群IP集合
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
//IP列表为空,也注册IP为空的服务
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
//新增服务实例
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
//更新为临时节点
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
//设置最后更新的时间
setLastModifiedMillis(System.currentTimeMillis());
//广播,UDP通知通客户端service发生了改变
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
}
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
@JsonIgnore
private HealthCheckTask checkTask;
public void init() {
if (inited) {
return;
}
//创建健康检查任务
checkTask = new HealthCheckTask(this);
//执行健康检查任务
HealthCheckReactor.scheduleCheck(checkTask);
inited = true;
}
}
- 最核心的就是updateIPs
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
@JsonIgnore
private Set<Instance> persistentInstances = new HashSet<>();
@JsonIgnore
private Set<Instance> ephemeralInstances = new HashSet<>();
public void updateIps(List<Instance> ips, boolean ephemeral) {
//拿到cluster中旧的instance列表
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
//updatedIps主要做的是找出oldipmap中的实例并返回
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
// do not update the ip validation status of updated ips
// because the checker has the most precise result
// Only when ip is not marked, don't we update the health status of IP:
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
if (ip.isHealthy() != oldIP.isHealthy()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
}
if (ip.getWeight() != oldIP.getWeight()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
ip.toString());
}
}
}
//找出新增的实例列表,即ips中的实例,oldipmap不存在的实例列表
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
//进行新实例的健康检查设置
HealthCheckStatus.reset(ip);
}
}
//找出oldipmap的ip的实例。不存在于ips中的实例
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
//将不存在新实例ip列表的值的健康检查删除
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
//更新服务下cluster的intances列表
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
}
为了防止读写并发冲突,直接创建了一个新的HashMap,然后去操作新的HashMap,操作完了之后再去替换老的Map数据,CopyOnWrite的思想。最后还发布了服务变化事件。
- 服务注册通过CopyOnWrite支持并发读写的能力
- Cluster类中的updateIPs方法中是对原服务IP列表的副本进行操作,注册完成替换原有服务IP列表即可,即CopyOnWrite操作,不需要加锁,性能高,存在服务延迟。
Eureka防止读写冲突用的是多级缓存结构,多级缓存定时同步,客户端感知及时性不如Nacos。
7、同步实例信息到Nacos Server集群其它节点
回到之前的代码,put方法中distroProtocol.sync();进行同步信息到集群其它节点,跟进代码:
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private final DistroProtocol distroProtocol;
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
}
- 通过newSingleScheduledExecutorService.scheduleWithFixedDelay()定时执行ProcessRunnable任务,发送http请求同步实例信息到Nacos Server集群其它节点
@Component
public class DistroProtocol {
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
//遍历所有服务ip。进行数据同步
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
//创建延迟任务
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//其实也是一个定时任务
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
}
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
private final ScheduledExecutorService processingExecutor;
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
//NacosDelayTaskExecuteEngine实例化
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
//初始化tasks
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
//创建定时任务线程池
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
//每间隔一段时间定时执行任务ProcessRunnable
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
/**
* process tasks in execute engine.
*/
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
}
public class DistroDelayTaskProcessor implements NacosTaskProcessor {
@Override
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
//又创建了一个定时任务
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}
}
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
//是调用接口地址,通知其他服务,同步服务变更数据
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
}
二、Nacos服务端CP模式实现:RaftConsistencyServiceImpl
Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl,简单介绍一下大概实现方式:
1、是阿里自己实现的CP模式的简单raft协议。
2、判断自己是Leader节点的话才执行逻辑,否则转发给Leader。
3、同步更新实例数据到磁盘,异步更新内存注册表。
4、用CountDownLatch实现,必须集群半数以上节点写入成功才返回客户端成功。
5、成功后调用/raft/datum/commit接口提交。
6、发布ValueChangeEvent事件
7、PersistentNotifier监听ValueChangeEvent事件,处理服务变更,调用Service的updateIPs方法进行服务注册完成替换原有服务IP列表。
RaftConsistencyServiceImpl的put永久实例集合一致性服务
- 和raft选举算法有关。
@Deprecated
@DependsOn("ProtocolManager")
@Service
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
private final RaftCore raftCore;
@Override
public void put(String key, Record value) throws NacosException {
checkIsStopWork();
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}
}
raftCore.signalPublish
这个其实涉及到raft选举的协议,如果本服务不是leader就要交给leader去处理,就发一个http请求给leader,leader接受到之后还是会到他的signalPublish里处理。如果是leader的话就进行服务实例改变通知,通知本地的监听器,并且要同步到其他结点,使用过半机制,刚好CountDownLatch可以用,只要有过半响应成功就算同步成功。
@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {
private final RaftProxy raftProxy;
public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
//不是leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
//交给leader去做/v1/ns/raft/datum
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
OPERATE_LOCK.lock();
//是leader
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
//发布数据改变通知
onPublish(datum, peers.local());
final String content = json.toString();
//只要过半的结点数
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//遍历所有结点
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
//自己算一次
latch.countDown();
continue;
}
///v1/ns/raft/datum/commit
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, result.getCode());
return;
}
//异步完成
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
}
@Override
public void onCancel() {
}
});
}
//等待半数完成
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
}
RaftCore中的onPublish(datum, peers.local());会发布一个ValueChangeEvent事件
@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {
public void onPublish(Datum datum, RaftPeer source) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);
}
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
//发布ValueChangeEvent事件
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
}
PersistentNotifier
会发布一个ValueChangeEvent事件,在PersistentNotifier中监听并处理
public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {
private final Map<String, ConcurrentHashSet<RecordListener>> listenerMap = new ConcurrentHashMap<>(32);
public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) {
for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
try {
if (action == DataOperation.CHANGE) {
listener.onChange(key, value);
}
if (action == DataOperation.DELETE) {
listener.onDelete(key);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
}
}
}
}
if (!listenerMap.containsKey(key)) {
return;
}
for (RecordListener listener : listenerMap.get(key)) {
try {
if (action == DataOperation.CHANGE) {
listener.onChange(key, value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(key);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
}
}
}
@Override
public void onEvent(ValueChangeEvent event) {
notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}
}
总结:
- 1、通过启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费。
- 2、通过高性能的内存监听队列将服务请求的写和处理进行分割。
好处:
- 提高性能:服务提供方发起注册和注册中心处理服务注册的实现分离。
- 采用ArrayBlockingQueue内存队列,避免了并发写的处理问题。
参考:
https://www.cnblogs.com/chz-blogs/p/14325288.html
https://www.cnblogs.com/guoxiaoyu/p/14248226.html
https://blog.csdn.net/wangwei19871103/article/details/105834317
https://blog.csdn.net/wangwei19871103/article/details/105835207