本人小白,这个文章是本人的阅读笔记,不是权威解读,需要自己甄别对错
最后一个部分了,就是IPing组件了,接着看看这个
IPing机制在我的理解当中可能就是就是弥补Eureka的服务信息变更的延时效应的。
我们算算,假设一个服务宕机了,其他服务Eureka Client列表最慢获取到服务变更消息的时间,也是复习下Eureka的相关知识
- 服务故障移除 30S执行一次,但是的但是,服务租约的过期是需要2 * 90S,就是3分钟,取最长的,那么需要6次检测才能感知故障进行移除。
- 故障实例移除了,但是只是清空了注册表和读写缓存,只读缓存需要花费30S进行同步,才能从最近变更队列中获取。
- Eureka Client是花费30S进行一次增量获取
假设啊,时间都错开了,那么SumTime = 180 + 30 + 30 大概需要4分钟,并且Ribbon的AllList从Eureka Client本地注册表拉取也是30S进行一次,那么感知一个服务宕机了,需要花费好几分钟,这不是有点小坑?
这种情况下,在并发比较高的场景,那么请求宕机服务的线程都会超时,超时造成的当前线程资源耗尽会引发服务雪崩效应,不知道是不是基于这个考虑,Hystrix中资源隔离和服务熔断降级很好的解决了这个情况!
IPing机制在一定情况下也缓解了这种情况发生。
说说主要的IPing实现吧
首先IPing是怎么实例化的呢?
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
如果配置了自定义IPing机制就用自己那一套,没配就走NIWSDiscoveryPing
那么在哪进行使用的呢,在ZoneAwareLoadBalancer进行初始化的时候传入了,向上追踪父类构造,你会发现在BaseLoadBalancer初始化时候使用了,顺便说一嘴,BaseLoadBalancer是原生的Ribbon调用接口类。
String clientName = clientConfig.getClientName();
this.name = clientName;
// 默认定时调度30S
int pingIntervalTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerPingInterval,
Integer.parseInt("30")));
int maxTotalPingTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
Integer.parseInt("2")));
//启动IPing机制
setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);
// cross associate with each other
// i.e. Rule,Ping meet your container LB
// LB, these are your Ping and Rule guys ...
setRule(rule);
setPing(ping);
接着会触发一个定时任务
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
//30S执行一次
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
任务核心执行方法
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
//使用IPing的isAlive()方法判断服务是否可用
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
//重置服务状态
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
//加入服务状态改变列表
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
//通知服务状态变更
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
}
就是这一块调用了,目的也是重置服务状态
具体的实现类我们看几个主要的,不常用的就懒得看了
NIWSDiscoveryPing
默认实现
public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
从本地注册表拉取判断服务实例状态,但是有个疑问,Ribbon的AllList拉取,即enableAndInitLearnNewServersFeature()启动的定时更新AllList任务也是30S执行一次,这个任务是全量更新服务列表,那么这个IPing的作用是???不懂,待大佬解答啊
PingUrl
public boolean isAlive(Server server) {
String urlStr = "";
if (isSecure){
urlStr = "https://";
}else{
urlStr = "http://";
}
urlStr += server.getId();
urlStr += getPingAppendString();
boolean isAlive = false;
HttpClient httpClient = new DefaultHttpClient();
HttpUriRequest getRequest = new HttpGet(urlStr);
String content=null;
try {
HttpResponse response = httpClient.execute(getRequest);
content = EntityUtils.toString(response.getEntity());
isAlive = (response.getStatusLine().getStatusCode() == 200);
if (getExpectedContent()!=null){
LOGGER.debug("content:" + content);
if (content == null){
isAlive = false;
}else{
if (content.equals(getExpectedContent())){
isAlive = true;
}else{
isAlive = false;
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
// Release the connection.
getRequest.abort();
}
return isAlive;
}
就是直接请求服务了,默认的就是http://192.168.31.128:8080,默认情况下返回200就任务服务可用,当然可配置接口和返回值进行二次验证
这个机制对Eureka Client服务感知延迟做了一定的保障,但是这个是不是有点耗费性能
好了,IPing机制就说这点,没啥好说的