Eureka源码浅读---服务心跳

Eureka源码采用1.7.2版本

本人小白,此文为本人阅读源码笔记,如果您读到本文,您需要自己甄别是否正确,文中的说明只代表本人理解,不一定是正确的!!!

心跳机制在于证明客户端正常运行,在代码层面在于定时更新过期时间,防止自动故障移除机制导致实例被摘除。
Eureka Client的心跳机制是在客户端初始化时也构建了一个心跳执行的线程池

com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)

            //初始化保持心跳的线程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

心跳任务的触发在调度任务初始化时候

com.netflix.discovery.DiscoveryClient#initScheduledTasks

        scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            //任务线程
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

默认是renewalIntervalInSecs延迟之后再进行心跳发送,默认是30S,并且定时发送的间隔也是30S。这里面核心的是
任务执行线程HeartbeatThread

com.netflix.discovery.DiscoveryClient.HeartbeatThread#run

    public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            //发送心跳
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }

这两个方法没什么可说的,就是发送一个Http请求,请求的地址类似这样的:

http://DESKTOP-1S6DCTA:8080/v2/apps/EUREKA/001

参数:status,lastDirtyTimestamp

method:PUT

直接转Server端吧,看看怎么处理心跳请求

com.netflix.eureka.resources.InstanceResource#renewLease
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            //集群节点同步心跳
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

核心方法还是调用了父类的renew(),主要的处理逻辑在那里面

com.netflix.eureka.registry.AbstractInstanceRegistry#renew

    //服务续约方法
    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        //获取appName获取服务实例
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            //通过ID获取租约
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            //获取租约的服务实例
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                //获取服务实例的状态
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                //如果不相等则进行覆盖
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            //记录每一分钟实际的心跳数量
            renewsLastMin.increment();
            //重置续约时间
            leaseToRenew.renew();
            return true;
        }
    }

主要流程:

  • 依据服务名称和服务ID获取到租约
  • 设置租约的覆盖状态
  • 当前分钟内心跳次数自增
  • 重置续约时间

最核心就是更新服务实例的过期时间,防止自动故障移除机制错误的摘除,服务实例的方法如下:

lastUpdateTimestamp = System.currentTimeMillis() + duration;

这个是当前时间+过期时间(90S),有个BUG,在自动故障移除的时候会看到的

从这个源码中可以看出一个场景啊,当一个服务实例因为网络故障长时间未发送心跳,造成服务实例被摘除(3min)
那么后续网络正常后,再次发送心跳会找不到对应的实例
客户端这块是怎么解决的呢?看看客户端代码:

com.netflix.discovery.DiscoveryClient#renew

    //如果当前的服务实例在注册中心已经被移除
    if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                //重新注册下
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }

可以看到客户端在发现服务实例未找到时,直接进行注册了

我这里说下,Eureka的一堆机制里调度任务太多,造成一些时间计算感觉不是很可靠

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容