微服务实战SpringCloud之Eureka Client启动源码分析

springboot工程注册到eureka server非常简单,只需要引入spring-cloud-starter-netflix-eureka-client依赖,在启动类上加上@EnableDiscoveryClient注解。

例如:

@EnableDiscoveryClient
@SpringBootApplication
public class CartApplication {

    public static void main(String[] args) {
        SpringApplication.run(CartApplication.class, args);
    }
}

再在application.yml或者application.properties中指定eureka server地址即可。

例如:

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
image-20181206105425600

启动,可以看到eureka server控制台已经有应用的注册信息了。那么eureka client 是如何做到的呢?

我们先从@EnableDiscoveryClient注解入手。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

   /**
    * If true, the ServiceRegistry will automatically register the local server.
    */
   boolean autoRegister() default true;
}

发现引入了EnableDiscoveryClientImportSelector类,也没有发现有用的东西。

换个思路,既然注解是@EnableDiscoveryClient,那就研究一下DiscoveryClient。

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    
    //...省略部分代码
    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());

        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

    //.....省略部分代码
}

可以看出是在DiscoveryClient类中的构造方法中初始化了心跳消息的线程池。再追踪下,构造方法会在哪里调用,发现会在CloudEurekaClient类的构造函数中调用。

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                   EurekaClientConfig config,
                   AbstractDiscoveryClientOptionalArgs<?> args,
                   ApplicationEventPublisher publisher) {
   super(applicationInfoManager, config, args);
   this.applicationInfoManager = applicationInfoManager;
   this.publisher = publisher;
   this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
   ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

再追踪,发现会在EurekaClientAutoConfiguration类进行调用。

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
      CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
      "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
      "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {

   private ConfigurableEnvironment env;

   public EurekaClientAutoConfiguration(ConfigurableEnvironment env) {
      this.env = env;
   }

   @Bean
   public HasFeatures eurekaFeature() {
      return HasFeatures.namedFeature("Eureka Client", EurekaClient.class);
   }
    
    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        EurekaClientConfigBean client = new EurekaClientConfigBean();
        if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
            // We don't register during bootstrap by default, but there will be another
            // chance later.
            client.setRegisterWithEureka(false);
        }
        return client;
    }
    //这里初始化了DiscoveryClient
    @Bean
    public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
        return new EurekaDiscoveryClient(config, client);
    }
    //....省略部分代码
}    

源头找到了,接下来仔细分析下DiscoveryClient的构造函数,这里是整个eureka client的 启动核心。

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
   
    //....

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        //配置了不注册到eureka且不获取eureka server上的实例注册信息
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());

        return;  // no need to setup up an network tasks and we are done
    }

    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        //创建定时线程池,线程数量为2个,分别用来维持心跳连接和刷新其他eureka client实例缓存
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());
        //创建一个线程池,线程池大小默认为2个,用来维持心跳连接
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
        //创建一个线程池,线程池大小默认为2个,用来刷新其他eureka client实例缓存
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        //创建及初始化维持心跳连接(注册)、获取注册实例信息的httpClient
        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);
        //...
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }
    //抓取远程实例注册信息,fetchRegistry()方法里的参数,这里为false,意思是要不要强制抓取所有实例注册信息
    //这里获取注册信息,分两种方式,一种是全量获取,另一种是增量获取,默认是增量获取
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        //如果配置的是要获取实例注册信息,但是从远程获取失败,从备份获取实例注册信息
        fetchRegistryFromBackup();
    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started
    //注册到eureka之前,调用的前置处理方法,这里eureka提供了接口,需要自己实现。
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        //如果client配置注册到eureka server 且 强制 初始化就注册到eureka 那么就注册到eureka server,默认是不初始化就注册到eureka
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    //最后,初始化维持心跳连接、更新注册信息缓存的定时任务
    initScheduledTasks();
    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}
//获取远程注册实例信息
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            //全量获取实例注册信息,这里是通过HTTPClient发送的请求,HTTPClient的初始化,在前面的scheduleServerEndpointTask()方法中
            getAndStoreFullRegistry();
        } else {
            //增量获取实例注册信息
            getAndUpdateDelta(applications);
        }
    }

    // 通知本地缓存要清除了
    onCacheRefreshed();
    // 更新远程实例信息在本机的状态
    updateInstanceRemoteStatus();
    return true;
}

注册方法:

/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        //也是通过前面初始化的HTTPClient发送注册请求
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

维持心跳连接:

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        //发送请求
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

总结一下:

  • eureka client在启动时首先会创建一个定时任务线程池,线程池大小为2个,分别用来维持心跳链接和刷新本地缓存
  • 获取远程实例信息时,有两种方式,一种是全量获取,一种是增量获取,默认是增量获取
  • 维持信条连接和获取远程实例信息是通过HTTPClient发送的请求
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,657评论 18 139
  • 本文作者:陈刚,叩丁狼高级讲师。原创文章,转载请注明出处。 对于一个优秀的程序员而言,一个技术不仅要会用,还要知道...
    叩丁狼教育阅读 1,480评论 0 1
  • 看过之前文章的朋友们,相信已经对Eureka的运行机制已经有了一定的了解。为了更深入的理解它的运作和配置,下面我们...
    程序猿DD阅读 17,968评论 5 32
  • "你在大学过的最充实的是什么?"如果,有人这样问我。 "读书和旅行"我会回答说。 01 关于读书我想说 刚上大...
    脆皮饼干阅读 331评论 0 3
  • 水远天高合暮色,松苍栢劲夕垂。山石错落雁栖飞,青峰静谧,独坐掩霞辉。 罗带飘然神远逸,冥思禅步无追。色空痴念是和非...
    静铃音阅读 498评论 34 39