Eureka源码

先明了Eureka做了哪些事情:

  • 服务注册
  • 自我保护
  • 心跳检测
  • 状态同步
  • ...

从这几件事出发 看看Eureka如何实现的。

服务注册<客户端注册>

服务注册是由Eureka Client向Eureka Server上报自己的服务信息,是在服务启动的时候就做了。那就从客户端启动入手。

但重点是如何找 哪个方法是做服务注册的呢?说实话 很迷茫 ,于是从网上找到Spring Cloud的一些规范,ServiceRegistry接口是Spring Cloud定义的服务注册的约定。那就从这个接口的实现类着手。也就是EurekaServiceRegistry.register,那这个方法是哪里调用的呢?最终找到了org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration#start。这个方法其实是IOC容器中Bean的生命周期函数。

所以,有两个入手点,一是客户端的注解EnableEurekaClient或者AutoConfiguration;一是EurekaAutoServiceRegistration

声明一下:在客户端使用时 不论EnableEurekaClient还是EnableDiscoveryClient都是可选项。且EnableEurekaClient的注释明确说了 注解为可选项。因此,第一个入手点仅剩下AutoConfiguration

EurekaClientAutoConfiguration

EurekaClientAutoConfiguration本身就是个配置类,里面全是Bean的定义,下面截取的代码片段 都是根据上面的分析而来,将相关Bean的初始化入口列出来。

// Spring Cloud的规范实现类
// 约定使用Service Registry注册和注销实例。
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
    return new EurekaServiceRegistry();
}

// 服务自动注册
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
                       matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
        ApplicationContext context, EurekaServiceRegistry registry,
        EurekaRegistration registration) {
  
    return new EurekaAutoServiceRegistration(context, registry, registration);
}

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
        EurekaClientConfig config, EurekaInstanceConfig instance,
        @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
    // If we use the proxy of the ApplicationInfoManager we could run into a
    // problem
    // when shutdown is called on the CloudEurekaClient where the
    // ApplicationInfoManager bean is
    // requested but wont be allowed because we are shutting down. To avoid this
    // we use the
    // object directly.
    ApplicationInfoManager appManager;
    if (AopUtils.isAopProxy(manager)) {
        appManager = ProxyUtils.getTargetObject(manager);
    } else {
        appManager = manager;
    }
    CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
            config, this.optionalArgs, this.context);
    cloudEurekaClient.registerHealthCheck(healthCheckHandler);
    return cloudEurekaClient;
}

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
                       matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
        CloudEurekaInstanceConfig instanceConfig,
        ApplicationInfoManager applicationInfoManager, 
        @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
  
    return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
            .with(eurekaClient).with(healthCheckHandler).build();
}

EurekaAutoServiceRegistration

AutoConfiguration中找到了实例化此类的地方,说明这个类必然是IoC容器中的一个Bean。同时,关注一下此类的接口实现,重点关注一下SmartLifecycle 。点一下AutoServiceRegistration,这个其实也算是Spring Cloud提供的一个自动注册的点。

public class EurekaAutoServiceRegistration implements AutoServiceRegistration,
      SmartLifecycle, Ordered, SmartApplicationListener {
        ....
}

这就表明,在IOC容器启动时 会回调里面的生命周期方法,例如startstop

// 回调时触发
public void start() {
    // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
    if (this.port.get() != 0) {
        if (this.registration.getNonSecurePort() == 0) {
            this.registration.setNonSecurePort(this.port.get());
        }

        if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
            this.registration.setSecurePort(this.port.get());
        }
    }

    // only initialize if nonSecurePort is greater than 0 and it isn't already running
    // because of containerPortInitializer below
    if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
        // serviceRegistry = org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry
        this.serviceRegistry.register(this.registration);

        this.context.publishEvent(new InstanceRegisteredEvent<>(this,
                this.registration.getInstanceConfig()));
        this.running.set(true);
    }
}

EurekaServiceRegistry#register

这里虽然是真正的注册方法,但是Spring还是进行了一些其他的设计,是通过发布事件这种方式来表示状态变更的。

@Override
public void register(EurekaRegistration reg) {
    maybeInitializeClient(reg);

    if (log.isInfoEnabled()) {
        log.info("Registering application "
                + reg.getApplicationInfoManager().getInfo().getAppName()
                + " with eureka with status "
                + reg.getInstanceConfig().getInitialStatus());
    }

    // 这里其实是发布了状态变更的事件
    reg.getApplicationInfoManager() // =com.netflix.appinfo.ApplicationInfoManager
            .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

    reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
            .getEurekaClient().registerHealthCheck(healthCheckHandler));
}

这里穿插一下com.netflix.appinfo.ApplicationInfoManager.setInstanceStatus

// 设置此实例的状态。应用程序可以使用它来指示是否准备接收通信流。在这里设置状态还会将状态更改事件通知所有注册的侦听器。
public synchronized void setInstanceStatus(InstanceStatus status) {
    InstanceStatus next = instanceStatusMapper.map(status);
    if (next == null) {return;}

    InstanceStatus prev = instanceInfo.setStatus(next);
    if (prev != null) {
        // listeners是从哪里添加的内容呢 看方法-registerStatusChangeListener
        for (StatusChangeListener listener : listeners.values()) {
            try {
                // 状态的变更 意味着 有服务进行注册或者更新
                listener.notify(new StatusChangeEvent(prev, next));
            } catch (Exception e) {
                logger.warn("failed to notify listener: {}", listener.getId(), e);
            }
        }
    }
}

// 点击alt+F7 找到调用这个方法的入口:com.netflix.discovery.DiscoveryClient#initScheduledTasks
public void registerStatusChangeListener(StatusChangeListener listener) {
    listeners.put(listener.getId(), listener);
}

DiscoveryClient#initScheduledTasks

DiscoveryClient的构造函数里面调了initScheduledTasks,这里就可以找一下哪里对DiscoveryClient进行了初始化,Spring Cloud这里做了一个巧妙的设计,它定义了一个类CloudEurekaClient继承了DiscoveryClient,而在EurekaClientAutoConfiguration里面 初始化了CloudEurekaClient

public class CloudEurekaClient extends DiscoveryClient {
    ....
    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
            EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
            ApplicationEventPublisher publisher) {
        // 这里调用了DiscoveryClient的构造
        super(applicationInfoManager, config, args);
        this.applicationInfoManager = applicationInfoManager;
        this.publisher = publisher;
        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
                "eurekaTransport");
        ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }
    ....
}

public class DiscoveryClient implements EurekaClient {
    ....
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
                    AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, 
                    EndpointRandomizer endpointRandomizer) {
      
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch 就在这里了
        initScheduledTasks();
    }
    ....
}

梳理前面的类的关系

到这里就全通了。

首先在AutoConfiguration中实例化了CloudEurekaClient,从而调用了DiscoveryClient#initScheduledTasks,初始化了StatusChangeListener,同时将其添加到了com.netflix.appinfo.ApplicationInfoManager#listeners里面。

之后IoC容器初始化完成后回调所有SmartLifecycle的实现类的start()方法,进而到org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry#register,然后就是com.netflix.appinfo.ApplicationInfoManager#setInstanceStatus,之后com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener#notify

StatusChangeListener

事件的监听 一个接口 只有一个匿名内部类

以下代码片段来自DiscoveryClient#initScheduledTasks,是事件监听的一个匿名内部类。

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
            logger.error("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        // 真正将实例信息更新到注册中心
        instanceInfoReplicator.onDemandUpdate();
    }
};

com.netflix.discovery.InstanceInfoReplicator#onDemandUpdate 这个方法其实就是调用了com.netflix.discovery.InstanceInfoReplicator#run

com.netflix.discovery.InstanceInfoReplicator#run

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register

从这里面 只能看出来是通过Post方式请求了http://{eureka.ip:port}/eureka/apps/{appName}地址。

这样只需要找一下服务端是如何暴漏相关的接口 并如何处理注册信息的即可。

String urlPath = "apps/" + info.getAppName();
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
    .header("Accept-Encoding", "gzip")
    .type(MediaType.APPLICATION_JSON_TYPE)
    .accept(MediaType.APPLICATION_JSON)
    .post(ClientResponse.class, info);

服务注册<服务端接收>

Eureka使用的jersey框架,所以说 很难像对待Spring Web MVC的方式找暴漏的接口服务,所以从网上找了一下对应的方法。最后即使找到了方式 也没搞明白是咋样映射到url的。

ApplicationResource#addInstance

从网上找到的,接收注册请求的方式是com.netflix.eureka.resources.ApplicationResource#addInstance,方法前面都是校验参数,真正执行注册操作的是registry.register(info, "true".equals(isReplication));,其中registry=org.springframework.cloud.netflix.eureka.server.InstanceRegistry

看一下关于InstanceRegistry的类图

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register

public void register(final InstanceInfo info, final boolean isReplication) {
    // 等于 publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
    handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
    // com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
    super.register(info, isReplication);
}

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register

public void register(final InstanceInfo info, final boolean isReplication) {
    // 心跳的超时时间 默认90s
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    // 如果自定义配置了超时时间 这里重新赋值一下
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // com.netflix.eureka.registry.AbstractInstanceRegistry#register
    super.register(info, leaseDuration, isReplication);
    // 高可用情况下 节点间的同步复制
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

com.netflix.eureka.registry.AbstractInstanceRegistry#register

存储节点信息的容器:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
外层 KEY = spring.application.name
内层 KEY = instanceID
内层 VALUE = 实例信息

代码很长 把logger相关的内容删掉了

入参 registrant 其实是远程调用接口时传入的 而 registry.get(registrant.getAppName()).get(registrant.getId()) 则是本地的副本

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    read.lock();
    try {
        // registry = ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
        // 根据spring.application.name获取服务的列表<集群情况下才会是列表>
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        // 这里代表第一次来注册 实例化一个
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            // 说明上一步成功进行了初始化 这里大概是考虑其他节点插入了同样的服务并同步了过来这种情况
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        // 本地副本
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        // 如果不为空 说明服务器本地存在副本 这里需要保留下本地脏时间戳 而且不覆盖它
        if (existingLease != null && (existingLease.getHolder() != null)) {
            // registry存在的这个脏时间戳
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            // 远程传输过来 的时间戳
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            
            // 只有大于的情况下 使用服务器本地副本
            // 在小于等于时 使用远程传输过来的
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                
                registrant = existingLease.getHolder();
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // 这个好像是记录的客户端的数量 提供给心跳续约那里用的
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    // 更新一下 每分钟心跳续约次数阈值 
                    // TODO 这个值啥用
                    updateRenewsPerMinThreshold();
                }
            }
        }
        // 重新构造一个
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        // 如果本地副本不为空 那就将重新构造出来的更新 服务启动的时间
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 这一步就已经将服务信息加入到了注册中心了
        gMap.put(registrant.getId(), lease);
        // 这一步没啥用 recentRegisteredQueue好像是提供给Eureka的控制台用的
        recentRegisteredQueue.add(new Pair<Long, String>(
                System.currentTimeMillis(),
                registrant.getAppName() + "(" + registrant.getId() + ")"));
        // 下面开始处理状态的问题 不知道是干啥的
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            // 这一步等于 serviceUpTimestamp = System.currentTimeMillis();
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        // 这个应该是被 readWriteCacheMap 用的
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        // 更新缓存 也是更新 readWriteCacheMap 里面的内容
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
        read.unlock();
    }
}

服务信息缓存

未能保证多级缓存的一致性

com.netflix.eureka.registry.ResponseCacheImpl

public class ResponseCacheImpl implements ResponseCache {
    
    ...
    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key,Value>();
    // guava的缓存
    private final LoadingCache<Key, Value> readWriteCacheMap;
    
    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, 
                      AbstractInstanceRegistry registry) {
        ....
        this.readWriteCacheMap= CacheBuilder.newBuilder()
            .initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
            // 默认180s
            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(),
                              TimeUnit.SECONDS)
            .removalListener(new RemovalListener<Key, Value>() {
                  @Override
                  // readWriteCacheMap.invalidate的时候 回调这个方法
                  public void onRemoval(RemovalNotification<Key, Value> notification) {
                      Key removedKey = notification.getKey();
                      if (removedKey.hasRegions()) {
                          Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                          regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                      }
                  }
              }).build(new CacheLoader<Key, Value>() {
                  @Override
                  // readWriteCacheMap.get的时候 回调这个方法
                  public Value load(Key key) throws Exception {
                      if (key.hasRegions()) {
                          Key cloneWithNoRegions = key.cloneWithoutRegions();
                          regionSpecificKeys.put(cloneWithNoRegions, key);
                      }
                      Value value = generatePayload(key);
                      return value;
                  }
              });
        // 定时任务 更新只读缓存 默认30s
        // responseCacheUpdateIntervalMs = eureka.server.responseCacheUpdateIntervalMs
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                           new Date(
                               (
                               (System.currentTimeMillis() / responseCacheUpdateIntervalMs) 
                               * responseCacheUpdateIntervalMs
                               ) + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
        }
    }
    ....
}

心跳续约<客户端发送>

入口

心跳只可能在注册之后开始,所以在看注册相关的代码的时候可以留意一下有关heartbeat相关的内容。

这里就提两个点

  1. DiscoveryClient#DiscoveryClient
  2. DiscoveryClient#initScheduledTasks

这两个 都有和心跳相关的线程池 或者 定时任务,那就从这里着手

DiscoveryClient#DiscoveryClient

构造方法种创建了一个线程池

heartbeatExecutor = new ThreadPoolExecutor(
    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>(),
    new ThreadFactoryBuilder()
    .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
    .setDaemon(true)
    .build()
);

DiscoveryClient#initScheduledTasks

这里构建了一个Task 然后最终走到了com.netflix.discovery.TimedSupervisorTask#run,这个方法其实就是将HeartbeatThread作为一个task 交由 heartbeatExecutor里面执行

heartbeatTask = new TimedSupervisorTask(
        "heartbeat",
        scheduler,
        heartbeatExecutor,
        renewalIntervalInSecs,
        TimeUnit.SECONDS,
        expBackOffBound,
        new HeartbeatThread()
);
// 线程调度
scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);

有个小疑问:

为啥不直接heartbeatExecutor.submit(new HeartbeatThread()),为啥要费劲弄一个TimedSupervisorTask类 然后使用 ScheduledExecutorService.schedule呢?

其实看一眼逻辑 无非就是处理了一些异常情况

DiscoveryClient.HeartbeatThread

其实最终还是起一个HeartbeatThread的线程

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

DiscoveryClient#renew

这里无非还是一个远程调用而已

EurekaHttpResponse<InstanceInfo> httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#sendHeartBeat

这里只传入了状态和脏时间戳

String urlPath = "apps/" + appName + '/' + id;

WebResource webResource = jerseyClient.resource(serviceUrl)
        .path(urlPath)
        .queryParam("status", info.getStatus().toString())
        .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
    webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
ClientResponse response = requestBuilder.put(ClientResponse.class);

心跳续约<服务端接收>

InstanceResource#renewLease

服务端接收请求的地方 核心代码就是 registry.renew(app.getName(), id, isFromReplicaNode);

org.springframework.cloud.netflix.eureka.server.InstanceRegistry#renew

public boolean renew(final String appName, final String serverId, boolean isReplication) {
    
    List<Application> applications = getSortedApplications();
    for (Application input : applications) {
        if (input.getName().equals(appName)) {
            InstanceInfo instance = null;
            for (InstanceInfo info : input.getInstances()) {
                if (info.getId().equals(serverId)) {
                    instance = info;
                    break;
                }
            }
            // 发布事件
            publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                    instance, isReplication));
            break;
        }
    }
    // com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
    return super.renew(appName, serverId, isReplication);
}

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew

public boolean renew(final String appName, final String id, final boolean isReplication) {
    // 
    if (super.renew(appName, id, isReplication)) {
        // 同步其他节点
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

com.netflix.eureka.registry.AbstractInstanceRegistry#renew

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    // lease doesn't exist
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            // 感觉这个方法比较重要
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
        // 计数器递增
        renewsLastMin.increment();
        // 更新 最后更新时间戳
        leaseToRenew.renew();
        return true;
    }
}

自我保护机制

自我保护机制的触发条件:在15分钟内超过85%的客户端都没有正常的心跳,那么Eureka就认为客户端与注册中心之间出现了网络故障,Eureka Server自动进入自我保护机制。

进入自我保护机制后,Eureka Server会做以下的事情:

  1. Eureka Server不再剔除注册表中失去心跳连接的服务
  2. Eureka Server可以继续接受新服务的注册和查询请求,但不会将信息同步到其他节点中,保证当前节点依然可用
  3. 当网络稳定后,当前Eureka Server的新注册信息才会被同步到其他节点中

EurekaServerBootstrap

这个类以及其中方法的由来:

  1. EurekaServerAutoConfiguration中进行了实例化 注入了IoC容器
  2. EurekaServerBootstrap#contextInitialized 这个方法是用来做自我保护机制的
  3. contextInitializedEurekaServerInitializerConfiguration#start 调用

调用链:org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized

org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#initEurekaServerContext

com.netflix.eureka.registry.InstanceRegistry#openForTraffic

InstanceRegistry#openForTraffic

单纯调用了父类的方法 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    this.expectedNumberOfClientsSendingRenews = count;
    // 更新每分钟的续约阈值
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    // 重点在这里
    super.postInit();
}

com.netflix.eureka.registry.AbstractInstanceRegistry#updateRenewsPerMinThreshold

  • expectedNumberOfClientsSendingRenews 这个值是预期的 有几个客户端就是几个

  • serverConfig.getExpectedClientRenewalIntervalSeconds() 默认等于 30

  • serverConfig.getRenewalPercentThreshold() 默认等于 0.85

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

AbstractInstanceRegistry#postInit

protected void postInit() {
    // 开启一个定时任务 统计 每分钟 心跳续约的次数
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    // 定时器的定时任务提交 每60s执行一次 serverConfig.getEvictionIntervalTimerInMs()=60*1000
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(),
                           serverConfig.getEvictionIntervalTimerInMs());
}

EvictionTask里面的run方法 最终调用了com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)

AbstractInstanceRegistry#evict

public void evict(long additionalLeaseMs) {
    
    // 判断是否需要开启自我保护
    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    
    // 先收集所有的过期项,以随机的顺序驱逐他们。
    // 对于大规模的驱逐项集合,如果我们不随机驱逐,我们可能会在自我保护开始之前把整个应用程序抹去。
    // 通过随机化,影响应该均匀分布在所有应用程序中。
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    
    // 判断是不是 超过85%的客户端都没有正常的心跳
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = registrySize * serverConfig.getRenewalPercentThreshold();
    int evictionLimit = registrySize - registrySizeThreshold;
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
       
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            // 下线操作
            internalCancel(appName, id, false);
        }
    }
}

客户端拉取服务节点信息

客户端的配置eureka.client.fetch-registry在默认的情况下是true,这个配置表示是否从注册中心拉取服务信息。在构造方法DiscoveryClient#DiscoveryClient里面有一段判断这个配置的内容:

if (clientConfig.shouldFetchRegistry()) {
    try {
        // 这里就是拉取的入口
        boolean primaryFetchRegistryResult = fetchRegistry(false);
        ....
    } catch (Throwable th) {
        throw new IllegalStateException(th);
    }
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // 如果增量被禁用 或者是 第一次来 那就执行这个
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            // 全量拉取 调用接口 apps/
            getAndStoreFullRegistry();
        } else {
            // 增量拉取 调用接口 apps/delta
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

客户端保存服务节点信息

在上一步 已经将服务信息通过远程通信的方式请求而来,如何保存 其实就是如何处理响应数据的问题。

这部分的代码不想看了。

节点间数据同步

在上面的章节 注册,有一点说了以下节点间的同步

PeerAwareInstanceRegistryImpl#replicateToPeers

与Ribbon的整合

Ribbon获取服务列表的地方是com.netflix.loadbalancer.DynamicServerListLoadBalancer#updateListOfServers,在这个方法里面有一个ServerList.getUpdatedListOfServers(),如果是从配置文件的话,那么子类是ConfigurationBasedServerList,现在换成了Eureka,那子类是org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList

DomainExtractingServerList#getUpdatedListOfServers

public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
    // this.list = com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList
    List<DiscoveryEnabledServer> servers = setZones(
            this.list.getUpdatedListOfServers());
    return servers;
}

DiscoveryEnabledNIWSServerList#getUpdatedListOfServers

public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
    return obtainServersViaDiscovery();
}

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();
    }

    EurekaClient eurekaClient = eurekaClientProvider.get();
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // if targetRegion is null, it will be interpreted as the same region of client
            List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
            for (InstanceInfo ii : listOfInstanceInfo) {
                if (ii.getStatus().equals(InstanceStatus.UP)) {

                    if(shouldUseOverridePort){
                        if(logger.isDebugEnabled()){
                            logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                        }

                        // copy is necessary since the InstanceInfo builder just uses the original reference,
                        // and we don't want to corrupt the global eureka copy of the object which may be
                        // used by other clients in our system
                        InstanceInfo copy = new InstanceInfo(ii);

                        if(isSecure){
                            ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                        }else{
                            ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                        }
                    }

                    DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                    serverList.add(des);
                }
            }
            if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
            }
        }
    }
    return serverList;
}

Eureka Server小记

服务端的搭建过程很简单,除了引入依赖与yml文件的配置之外,就是一个Spring Boot的项目,在启动类上标注了@EnableEurekaServer,就完成了。因此入手点从这个注解开始。

@EnableEurekaServer

这个注解设计的极为巧妙,通过注释也可以看出来,他的作用就是激活这个Eureka Server的配置的;但是他是怎么激活的呢?重点就在@Import(EurekaServerMarkerConfiguration.class) 导入的这个配置也极其简单,声明了一个Marker的Bean。这里就很奇怪了,凭什么一个简单的Marker就能激活配置呢?

在Spring自己实现的类SPI的机制下,一个类配置在了spring.factories中时,Spring 启动时就可以扫描到这里面配置的类,也就是EurekaServerAutoConfiguration。但是EurekaServerAutoConfiguration上的条件就是当前环境中必须有Marker这个Bean。这样@EnableEurekaServer就可以实现他注释所说的了。

为什么会这样设计呢?个人理解,可能就是为了避免错误的引入依赖而导致项目加载很多不必要的类吧。

/**
 * Annotation to activate Eureka Server related configuration.
 * {@link EurekaServerAutoConfiguration}
 * 用以 <激活 Eureka Server 相关的配置> 的注解
 */
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

/**
 * Responsible for adding in a marker bean to activate
 * {@link EurekaServerAutoConfiguration}.
 * 负责添加一个标记Bean 来激活 配置类EurekaServerAutoConfiguration
 */
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {

    @Bean
    public Marker eurekaServerMarkerBean() {
        return new Marker();
    }

    class Marker {}
}

// 重点就是这个条件
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
}

控制台入口 EurekaController

这个是Spring提供的控制台接口,按照Spring Web MVC的方式定义的。还是比较容易看懂。

http://localhost:8761 相关接口

©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容