一、Nacos服务心跳
1.1、客户端心跳
Nacos Client会维护一个定时任务通过持续调用服务端的接口更新心跳时间,保证自己处于存活状态,防止服务端将服务剔除,Nacos默认5秒向服务端发送一次,通过请求服务端接口/instance/beat发送心跳。
客户端服务在注册服务
根据nacos-discovery的META-INF目录下的spring.factories配置来完成相关类的自动装配。
- NacosServiceRegistryAutoConfiguration用来注册管理这几个bean。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
NacosServiceRegistry:完成服务注册,实现ServiceRegistry。
NacosRegistration:用来注册时存储nacos服务端的相关信息。
NacosAutoServiceRegistration 继承spring中的AbstractAutoServiceRegistration,AbstractAutoServiceRegistration实现ApplicationListener<WebServerInitializedEvent>,通过事件监听来发起服务注册,到时候会调用NacosServiceRegistry.register(registration)
在NacosServiceRegistry.registry方法中,调用了nacos client sdk中的namingService.registerInstance完成服务注册。
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}
}
继续看namingService.registerInstance的实现主要就两件事
1、beatReactor.addBeatInfo创建心跳信息实现健康检查,Nacos Server必须要确保注册的服务实例是健康的,而心跳检测就是服务监控检测的方式。
2、serverProxy.registerService 服务注册。
public class NacosNamingService implements NamingService {
private BeatReactor beatReactor;
private NamingProxy serverProxy;
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
//创建group@@servciceName
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//如果当前实例是临时实例(默认是临时实例)
if (instance.isEphemeral()) {
// 创建心跳信息
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//beanReactor添加心跳信息检测
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
//发送请求向nacos注册服务
serverProxy.registerService(groupedServiceName, groupName, instance);
}
}
看下BeatInfo这个类
- 给周期任务设定时间
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval())
public class BeatReactor implements Closeable {
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
//服务名称
beatInfo.setServiceName(groupedServiceName);
//IP
beatInfo.setIp(instance.getIp());
//端口
beatInfo.setPort(instance.getPort());
//集群名称
beatInfo.setCluster(instance.getClusterName());
//权重
beatInfo.setWeight(instance.getWeight());
//元数据
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
//心跳周期,给周期任务设定时间
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
}
public class Instance implements Serializable {
public long getInstanceHeartBeatInterval() {
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
}
public class Constants {
//Constants内部定义的一个DEFAULT_HEART_BEAT_INTERVAL的常量,设定5秒:
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
}
接下来我们看下addBeatInfo方法,该方法内部主要是将BeatTask任务加入到线程池ScheduledExecutorService当中。
public class BeatReactor implements Closeable {
private final ScheduledExecutorService executorService;
private final NamingProxy serverProxy;
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
//实例化客户端心跳机制线程池
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
//将心跳任务添加到线程池中,发起一个心跳检测任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
}
重点部分就是看BeatTask
BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为/instance/beat的方法。
public class BeatReactor implements Closeable {
private final ScheduledExecutorService executorService;
private final NamingProxy serverProxy;
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
//心跳周期执行时间
long nextTime = beatInfo.getPeriod();
try {
//向Nacos Server服务端发送心跳请求
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
//如果返回资源未找到,则立即重新注册服务
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
//发送http请求,向Nacos Server服务端注册服务
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
//定时去运行,发送心跳请求
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
}
public class NamingProxy implements Closeable {
//向Nacos Server服务端发送心跳请求
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
//发送http请求,向Nacos Server服务端注册服务
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
}
心跳实际就是通过schedule定时向server发送数据包,然后启动一个线程检测服务端的返回,如果在指定时间没有返回则认为服务端出了问题,服务端也会根据发来的心跳包不断更新服务的状态。
1.2、服务端心跳
接下来我们把目光放到服务端,找到InstanceController的beat方法,如果是参数beat信息的话,说明是第一次发起心跳,则会带有服务实例信息,因为发起心跳成功则服务端会返回下次不要带beat信息的参数,这样客户端第二次就不会携带beat信息了。如果发现没有该服务,又没带beat信息,说明这个服务可能被移除过了,直接返回没找到。如果没有服务,但是发现有beat信息,那就从beat中获取服务实例信息,进行注册:
看InstanceController的beat方法
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
//设置心跳间隔
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
//判断有无心跳内容
//如果存在心跳内容则不是轻量级心跳就转化为RsInfo
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
//获取对应的实例的信息
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
//如果实例不存在
if (instance == null) {
//并且心跳内容也不存在
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
//返回RESOURCE_NOT_FOUND给客户端,客户端会发起服务注册
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
//根据心跳内容创建一个实例信息
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
//注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
//获取服务的信息
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
//不存在心跳内容的话,要创建一个进行处理
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
//5秒间隔
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
//告诉客户端不需要带上心跳信息了,变成轻量级心跳了
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
}
接下来看一下processClientBeat方法,该方法将ClientBeatProcessor放入到线程池中,接下来我们看下重点看下run方法,
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
//将服务提供者的心跳信息设置进去
clientBeatProcessor.setRsInfo(rsInfo);
//放入线程池
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
}
public class HealthCheckReactor {
public static ScheduledFuture<?> scheduleNow(Runnable task) {
return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
}
}
该方法内部主要就是更新对应实例下心跳时间,并且发送服务变更事件。
public class ClientBeatProcessor implements Runnable {
private RsInfo rsInfo;
private Service service;
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
//获取对应集群下的所有实例信息
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
//更新对应实例下最近一次心跳信息
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
//更新该实例最后一次心跳更新时间
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
//如果之前该实例是不健康的,会被设置为健康
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
//发送服务变更事件
getPushService().serviceChanged(service);
}
}
}
}
}
}
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
@Autowired
private SwitchDomain switchDomain;
private ApplicationContext applicationContext;
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap
.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
//发送服务变更事件
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
}
PushService.onApplicationEvent监听ServiceChangeEvent事件
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
@Autowired
private SwitchDomain switchDomain;
private static ConcurrentMap<String, Future> futureMap = new ConcurrentHashMap<>();
private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<>();
private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
//nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,
//clientMap中存放的就是这些udp客户端信息
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
//switchDomain.getDefaultPushCacheMillis()默认是10秒,
//即10000毫秒,不会进入这个分支,所以compressData=null
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
//compressData=null,所以会进入这个分支,
//关注prepareHostsData(client)方法
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));
//通过udp协议向nacos 消费者客户端推送数据
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
}
咋一看这个方法很复杂,首先看一下这个方法的主体结构,其实主要就是开启了一个一次性延迟任务(注意不是定时任务,只会执行一次),它的职责就是通过udp协议向nacos客户端推送数据,对应方法:udpPush(ackEntry)
udpPush(ackEntry)
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<>();
private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<>();
private static DatagramSocket udpSocket;
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
//如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
//结合Receiver.run()可知,ackMap存放的是已发送udp但是还没收到ACK响应的数据包
ackMap.put(ackEntry.key, ackEntry);
//udpSendTimeMap存放每个udp数据包开始发送的事件
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
//发送udp数据包
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
//又提交了一个延迟任务(延迟10秒),其实这个任务的作用就是重试,
//实现的效果就是当前发送完udp之后,如果没有收到ACK响应,就隔10秒重发一次,并且只重试一次
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
//实现重发的任务Retransmitter
public static class Retransmitter implements Runnable {
Receiver.AckEntry ackEntry;
public Retransmitter(Receiver.AckEntry ackEntry) {
this.ackEntry = ackEntry;
}
@Override
public void run() {
//如果ackMap中包含该数据包,就重发一次,ackMap存放的都是没有收到ACK响应的包
//如果接受到ACK响应,会移除(参考Receiver线程)
if (ackMap.containsKey(ackEntry.key)) {
Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
udpPush(ackEntry);
}
}
}
}
上面这段代码其实就一个作用:向nacos客户端发送udp包,如果隔了10秒还没收到ACK响应,就重发一次(通过另一个延迟任务实现)。
PushService类结构
第一眼就会看到一个关键接口:ApplicationListener,没错,PushService这个类就是一个事件监听类,它所监听的事件正是ServiceChangeEvent,onApplicationEvent方法已经在上面讲过了。
static代码块
PushService中有一个static代码块,static代码块在类被主动引用的时候会首先执行一次。
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
static {
try {
udpSocket = new DatagramSocket();
Receiver receiver = new Receiver();
Thread inThread = new Thread(receiver);
inThread.setDaemon(true);
inThread.setName("com.alibaba.nacos.naming.push.receiver");
inThread.start();
GlobalExecutor.scheduleRetransmitter(() -> {
try {
removeClientIfZombie();
} catch (Throwable e) {
Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
}
}, 0, 20, TimeUnit.SECONDS);
} catch (SocketException e) {
Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
}
}
}
这个代码块里先是开启了一个线程:Receiver,然后又开启了一个定时任务(20秒执行一次),对应的逻辑代码:removeClientIfZombie(),对他们分别简单讲解下。
Receiver线程
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
public static class Receiver implements Runnable {
@Override
public void run() {
while (true) {
byte[] buffer = new byte[1024 * 64];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
udpSocket.receive(packet);
String json = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8).trim();
AckPacket ackPacket = JacksonUtils.toObj(json, AckPacket.class);
InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();
String ip = socketAddress.getAddress().getHostAddress();
int port = socketAddress.getPort();
//接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒
//ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L)
if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {
Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);
}
String ackKey = getAckKey(ip, port, ackPacket.lastRefTime);
AckEntry ackEntry = ackMap.remove(ackKey);
if (ackEntry == null) {
throw new IllegalStateException(
"unable to find ackEntry for key: " + ackKey + ", ack json: " + json);
}
long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);
Loggers.PUSH
.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", json, ip,
port, pushCost, ackMap.size(), totalPush);
//pushCostMap存放每个数据包的耗时
pushCostMap.put(ackKey, pushCost);
udpSendTimeMap.remove(ackKey);
} catch (Throwable e) {
Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);
}
}
}
}
}
这个线程一直轮询接受UDP协议的响应,接受到ACK响应包后没干其它事,主要就是维护一些属性:ackMap、pushCostMap、udpSendTimeMap,至于这些属性干嘛用的(其中pushCostMap已经知道,见上面注释),可以往下看,后面会有用到。
定时任务:removeClientIfZombie()
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
//根据方法命名,可以隐约猜到,应该是移除僵尸客户端的
private static void removeClientIfZombie() {
int size = 0;
for (Map.Entry<String, ConcurrentMap<String, PushClient>> entry : clientMap.entrySet()) {
ConcurrentMap<String, PushClient> clientConcurrentMap = entry.getValue();
for (Map.Entry<String, PushClient> entry1 : clientConcurrentMap.entrySet()) {
PushClient client = entry1.getValue();
//如果是僵尸client,则从clientMap中移除
if (client.zombie()) {
clientConcurrentMap.remove(entry1.getKey());
}
}
size += clientConcurrentMap.size();
}
if (Loggers.PUSH.isDebugEnabled()) {
Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", size);
}
}
public class PushClient {
public boolean zombie() {
return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
}
}
}
该方法就是维护了clientMap(见onApplicationEvent方法中注释),在client.zombie()判断规则中有个很关键的属性:lastRefTime(位于PushClient中),要知道这个属性的意思,就需要知道clientMap是如何初始化的(clientMap中存放的就是PushClient)?
到这里,Receiver线程中维护的那几个属性的作用也已经很清楚了:
- ackMap:存放所有已经发送了udp但还没收到客户端的ACK响应的数据包;
- pushCostMap:存放每个数据包的耗时;
- udpSendTimeMap:存放每个数据包开始发送的时间;
Receiver线程就是用来接收ACK响应的,所以每接受到一个响应包,就会从ackMap和udpSendTimeMap中移除,所以Receiver线程的作用也很清楚了。
UDP客户端的初始化
clientMap中存放的是所有的udp客户端,nacos服务端需要往客户端通过udp协议推送数据,所以需要将所有客户端进行初始化。
不知道大家有没有注意到上面onApplicationEvent方法的代码中我加了两个注释,在构建Receiver.AckEntry对象的时候,会执行到这行代码:ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime),然后重点关注下prepareHostsData(client)方法:
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
private static Map<String, Object> prepareHostsData(PushClient client) throws Exception {
Map<String, Object> cmd = new HashMap<String, Object>(2);
cmd.put("type", "dom");
//初始化udp客户端
cmd.put("data", client.getDataSource().getData(client));
return cmd;
}
}
//InstanceController类中pushDataSource初始化代码
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
private DataSource pushDataSource = new DataSource() {
@Override
public String getData(PushService.PushClient client) {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
try {
//默认传入的udp的端口为0,即不开启nacos服务端udp推送功能
result = doSrvIpxt(client.getNamespaceId(), client.getServiceName(), client.getAgent(),
client.getClusters(), client.getSocketAddr().getAddress().getHostAddress(), 0,
StringUtils.EMPTY, false, StringUtils.EMPTY, StringUtils.EMPTY, false);
} catch (Exception e) {
String serviceNameField = "name";
String lastRefTimeField = "lastRefTime";
if (result.get(serviceNameField) == null) {
String serviceName = client.getServiceName();
if (serviceName == null) {
serviceName = StringUtils.trimToEmpty(serviceName);
}
result.put(serviceNameField, serviceName);
result.put(lastRefTimeField, System.currentTimeMillis());
}
Loggers.SRV_LOG.warn("PUSH-SERVICE: service is not modified", e);
}
// overdrive the cache millis to push mode
result.put("cacheMillis", switchDomain.getPushCacheMillis(client.getServiceName()));
return result.toString();
}
};
//继续看doSrvIpxt方法,方法很长,这里只贴片段
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
//udpPort是服务发现时指定的
if (udpPort > 0 && pushService.canEnablePush(agent)) {
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
//代码略……
}
}
doSrvIpxt方法会调用PushService类的addClient方法,而clientMap就是在addClient方法中初始化的:
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
public void addClient(String namespaceId, String serviceName, String clusters, String agent,
InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {
PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,
app);
addClient(client);
}
public void addClient(PushClient client) {
// client is stored by key 'serviceName' because notify event is driven by serviceName change
String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
if (clients == null) {
clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
clients = clientMap.get(serviceKey);
}
PushClient oldClient = clients.get(client.toString());
if (oldClient != null) {
oldClient.refresh();
} else {
PushClient res = clients.putIfAbsent(client.toString(), client);
if (res != null) {
Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());
}
Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());
}
}
}
看到这我们已经清楚clientMap初始化的来龙去脉了,现在再回看一下僵尸client的判断规则:
public boolean zombie() {
//客户端从初始化到响应ack,超过了10秒,就认为是僵尸client
//lastRefTime是PushClient类中的属性,默认是当前时间,可以代表PushClient初始化的时间
//这句代码就可以理解为:如果一个客户端长时间没有进行ack响应,就认识它是僵尸client
return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
}
当任务某个客户端是僵尸client时,就从客户端集合(clientMap)中移除,下次就不会向它推送udp数据包了。
Nacos心跳机制总结
PushService类的主要功能基本上讲的差不多了,可能有人会觉得一脸懵,nacos源码想说明白确实不容易,里面有太多的异步任务,跳来跳去,很容易晕,我这里做一个总结吧。
udp推送
- 当服务端注册表中实例发送了变更时,就会发布ServiceChangeEvent事件,就会被PushService监听到,监听到之后就会以服务维度向客户端通过udp协议推送通知,从clientMap中找出需要推送的客户端进行能推送;
- 如果发送失败或者超过10秒没收到ack响应,就会隔10秒进行重试(从ackMap中找出需要重试的包,ackMap由Receiver线程维护),最大重试次数默认为1次,超过1次就不再发送;
ack接收
- PushService类的static代码块中开启了守护线程Receiver,用于循环接收来自客户端的ack响应,使用ackMap维护所有已发送udp包但还没有进行ack响应的包,如果接收到ack响应,就从ackMap中移除;
udp客户端集合维护
- PushService类的static代码块中开启了一个定时任务(20秒一次)专门用来维护clientMap(存放了所有需要进行udp推送的客户端),如果发现哪个客户端从初始化到响应ack的时间间隔超过了10秒,就从clientMap中移除,那么下次就不会再往这个客户端推送udp了。
Nacos服务的健康检查
Nacos Server会开启一个定时任务来检查注册服务的健康情况,对于超过15秒没收到客户端的心跳实例会将它的 healthy属性置为false,此时当客户端不会将该实例的信息发现,如果某个服务的实例超过30秒没收到心跳,则剔除该实例,如果剔除的实例恢复,发送心跳则会恢复。
当有实例注册的时候,我们会看到有个service.init()的方法,该方法的实现主要是将ClientBeatCheckTask加入到线程池当中:
@Component
public class ServiceManager implements RecordListener<Service> {
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
private void putServiceAndInit(Service service) throws NacosException {
putService(service);
service = getService(service.getNamespaceId(), service.getName());
//启动服务检查
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
}
ClientBeatCheckTask中的run方法主要做两件事心跳时间超过15秒则设置该实例信息为不健康状况和心跳时间超过30秒则删除该实例信息,如下代码:
public class ClientBeatCheckTask implements Runnable {
private Service service;
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
//获取服务所有实例信息
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
//如果心跳时间超过15秒则设置该实例信息为不健康状况
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
//设置该实例信息为不健康状况
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
//如果心跳时间超过30秒则删除该实例信息
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
}
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
首先我们来看一下deleteIp方法,该方法内部主要通过构建删除请求,发送删除请求:
public class ClientBeatCheckTask implements Runnable {
private void deleteIp(Instance instance) {
try {
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
//构建Url
String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
//发送Http删除请求
// delete instance asynchronously:
HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJson(), result.getMessage(), result.getCode());
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
}
}
}
删除实例的接口
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
@CanDistro
@DeleteMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String deregister(HttpServletRequest request) throws Exception {
Instance instance = getIpAddress(request);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
return "ok";
}
//删除方法
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return "ok";
}
}
内部通过调用ServiceManager的removeInstance方法
@Component
public class ServiceManager implements RecordListener<Service> {
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
Service service = getService(namespaceId, serviceName);
synchronized (service) {
removeInstance(namespaceId, serviceName, ephemeral, service, ips);
}
}
private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service,
Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//排除要删除的实例信息
List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//更新实例信息
consistencyService.put(key, instances);
}
}
重点看下substractIpAddresses内部通过调用updateIpAddresses,该方法内部主要就是移除到超过30秒的实例信息
@Component
public class ServiceManager implements RecordListener<Service> {
private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips)
throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
//获取所有实例信息
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
//初始化Map
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length);
}
for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
//移除超过30秒的实例信息
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
}