Spring Cloud源码分析之eureka+feign远程调用

是什么

Eureka是一个REST (Representational State Transfer)服务,用于定位服务,以实现中间层服务器的负载平衡和故障转移,我们称此服务为Eureka服务器。Eureka还有一个基于java的客户端组件,Eureka客户端,这使得与服务的交互更加容易,同时客户端也有一个内置的负载平衡器,它执行基本的循环负载均衡。

Feign 是一种声明式服务调用组件,它在 RestTemplate 的基础上做了进一步的封装。通过 Feign,我们只需要声明一个接口并通过注解进行简单的配置(类似于 Dao 接口上面的 Mapper 注解一样)即可实现对 HTTP 接口的绑定。

通过 Feign,我们可以像调用本地方法一样来调用远程服务,而完全感觉不到这是在进行远程调用。

为什么

eureka解决了什么问题?为什么要用eureka?

eureka为分布式环境引入了服务注册与发现的能力,如果没有服务注册与发现,我们便需要手动去维护一大堆服务的地址信息,而当服务的状态变化发生变化,比如有新的服务加入或某些现有服务突然不可用时,现存的服务也无法感知到并及时切换到可用的服务上去。

相对其他提供服务注册与发现的组件(比如说zk),Eureka更偏向于AP。如果分布式系统对可用性的要求更高,那么,Eureka会是一个不错的选择。

另外,feign屏蔽了请求构建、发送、重试、负载均衡等相关细节,让我们能够从重复的工作中解放出来,更专注于业务本身。

架构图

Eureka-feign架构图.png
image.png

案例

eureka server

引入依赖

implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-server'

启动类

@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

配置文件

server.port=8761
eureka.client.register-with-eureka=false
eureka.client.fetch-registry=false

service consumer

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

启动类

@SpringBootApplication
@EnableDiscoveryClient
@RestController
@EnableFeignClients
public class HelloClientApplication {
    @Autowired
    HelloClient client;

    @RequestMapping("/")
    public String hello() {
        return client.hello();
    }

    public static void main(String[] args) {
        SpringApplication.run(HelloClientApplication.class, args);
    }

    @FeignClient("HelloServer")
    interface HelloClient {
        @RequestMapping(value = "/", method = GET)
        String hello();
    }
}

配置文件

spring:
  application:
    name: HelloClient

server:
  port: 7211

eureka:
  password: password
  client:
    serviceUrl:
      defaultZone: http://user:${eureka.password}@localhost:8761/eureka/
  instance:
    leaseRenewalIntervalInSeconds: 10
    metadataMap:
      instanceId: ${vcap.application.instance_id:${spring.application.name}:${spring.application.instance_id:${server.port}}}

endpoints:
  restart:
    enabled: true

service provider

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

启动类

@SpringBootApplication
@EnableDiscoveryClient
@RestController
public class HelloServerApplication {
    @Autowired
    DiscoveryClient client;

    @RequestMapping("/")
    public String hello() {
        List<ServiceInstance> instances = client.getInstances("HelloServer");
        ServiceInstance selectedInstance = instances
                .get(new Random().nextInt(instances.size()));
        return "Hello World: " + selectedInstance.getServiceId() + ":" + selectedInstance
                .getHost() + ":" + selectedInstance.getPort();
    }

    public static void main(String[] args) {
        SpringApplication.run(HelloServerApplication.class, args);
    }
}

配置文件

spring:
  application:
    name: HelloServer

server:
  port: 7111

eureka:
  password: password
  client:
    serviceUrl:
      defaultZone: http://user:${eureka.password}@localhost:8761/eureka/
  instance:
    leaseRenewalIntervalInSeconds: 10
    metadataMap:
      instanceId: ${vcap.application.instance_id:${spring.application.name}:${spring.application.instance_id:${server.port}}}

运行

依次启动eureka server、service provider、service consumer。访问http://localhost:7211/

如果在service provider的hello方法里打上断点,就可以发现请求由从service consumer的hello方法进到了service provider的hello方法。

而且,这里并没有在service consumer里配置service provider的地址信息,只是在service consumer里调用了HelloClient接口里的hello方法,便发出了一个到service provider的http请求。

提出问题

  1. 服务消费者怎么调用服务提供者
  2. 服务消费者如何拿到服务提供者的注册信息
  3. 有多个服务提供者的情况下,服务消费者如何判断访问哪一个
  4. 什么时候往eureka server注册
  5. eureka server怎么判断服务是否可用
  6. 有新服务上线或者服务地址发生变化怎么办
  7. 网络超时怎么办

初步分析

service consumerHttpClient接口很简单,只有两个注解和一个接口定义,没有任何Http相关的细节。

而访问HttpClient的方式是通过容器里的bean来实现的,自然而然可以想到这里一定用到了代理。而FeignClient注解大概率就是代理的Pointcut,代理内部就是网络访问的细节。

像这种开箱即用的组件,按之前文章的分析经验来看,Spring的套路基本都是通过XXXAutoConfiguration以及EnableXXX注解来实现的。

可以看到在service consumer的启动类上有EnableDiscoveryClientEnableFeignClients两个注解。

然后搜一搜eurekaClientAutoConfiguration

源码分析

Eureka

先看整体流程图

eureka流程.png

EurekaClient初始化

public class EurekaClientAutoConfiguration {

        //默认启用这个配置类,除非主动将eureka.client.refresh.enable设置为false
        protected static class RefreshableEurekaClientConfiguration {
    
            @Autowired
            private ApplicationContext context;
    
            @Autowired
            private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    
            public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
                    EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {

                //创建eurekaclient,注册register,renew,refresh任务到线程池
                CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
                        this.context);
                cloudEurekaClient.registerHealthCheck(healthCheckHandler);
                return cloudEurekaClient;
            }
    }
}

获取注册信息并注册定时任务

public class DiscoveryClient implements EurekaClient {
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
                //获取注册信息并保存
                boolean primaryFetchRegistryResult = fetchRegistry(false);

                if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                                //注册
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

                //初始化用于refresh跟renew的线程池
                //线程数为2,保证了refresh跟renew的单线程运行
                scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

                //注册定时任务
                initScheduledTasks();
        }
        private void initScheduledTasks() {
                //检查eureka.client.fetch-registry配置项的值,默认为true
        if (clientConfig.shouldFetchRegistry()) {
                        //任务执行间隔,默认30s。配置项:eureka.client.registryFetchIntervalSeconds
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                        //创建任务,内部会执行CacheRefreshThread#run
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            // 注册refresh任务,负责刷新从eureka server获取的注册信息
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

                //eureka.client.register-with-eurek 默认为true
        if (clientConfig.shouldRegisterWithEureka()) {
                        //间隔,默认10s
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
                        //注册renew/heartbeatTask
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // 周期性检查当前服务的状态信息,比如hostname变化,或者改变InstanceStatus或者renew失败。
                        // 一旦状态发生变化,会重新将自己注册到eureka server
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

                        //注册状态监听器,好处是状态发生变更时可能比周期性更早的感知到变化
                        //参考:ApplicationInfoManager#setInstanceStatus
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        }
    }
}

refresh任务: CacheRefreshThread

class CacheRefreshThread implements Runnable {
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

            boolean remoteRegionsModified = false;
            // 判断region是否发生变化
                        // 比如突然某个机房的服务出问题了,需要切到另一个机房。修改fetchRemoteRegionsRegistry配置项
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {

                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                                        //更新region
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                                        //保存region变化状态
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently," +
                                    " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                }
            }
                        //根据配置的url从eureka server拉取信息
                        //如果region发生变化,则拉取注册信息后进行全量覆盖。否则根据applicationName更新
                        //参考:DiscoveryClient#updateDelta -> applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            boolean success = fetchRegistry(remoteRegionsModified);
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }
}

renew

public class DiscoveryClient implements EurekaClient {

        boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
                        //将当前实例的Id等信息发至eureka
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            //如果status时404
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                                //将dirty设置为true
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                                        //如果成功,则将dirty设置为false。
                                        //如果失败,InstanceInfoReplicator在扫描到dirty为false后会重新注册
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }
}

Feign

核心组件关系概览

LoadBalancerAutoConfiguration 引入LoadBalancerAutoConfiguration

LoadBalancerAutoConfiguration引入LoadBalancerClientConfiguration

LoadBalancerClientConfiguration引入ServiceInstanceListSupplier

ServiceInstanceListSupplierBeanFactory获取DiscoveryClient

DiscoveryClient持有注册到eureka server的服务的信息

----

LoadBalancerClientConfiguration引入RoundRobinLoadBalancer

RoundRobinLoadBalancerBeanFactory获取ServiceInstanceListSupplier

----

DefaultFeignLoadBalancerConfiguration 引入FeignBlockingLoadBalancerClient

FeignBlockingLoadBalancerClient从容器中获取RoundRobinLoadBalancer

----

EnableFeignClients 引入FeignClientsRegistrar

FeignClientsRegistrar 为所有标注了FeignClient的类生成BeanDefinition

FeignClientsRegistrar 为生成的BeanDefinition注册callback用于生成代理对象

----

FeignClientsRegistrar注册的callback:创建SynchronousMethodHandler ,在其中注入FeignBlockingLoadBalancerClient,然后将SynchronousMethodHandler包装成InvocationHandler并注入代理对象

调用关系概览

访问代理对象->SynchronousMethodHandler->FeignBlockingLoadBalancerClient->RoundRobinLoadBalancer->DiscoveryClient获取服务提供者信息→RoundRobinLoadBalancer轮询选择服务提供者→调用目标服务

流程图

初始化

![Untitled.png](https://upload-images.jianshu.io/upload_images/14309538-a45d64a1722b1825.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

远程调用

Untitled (1).png

EnableFeignClients

点进EnableFeignClients. 可以看到它import了FeignClientsRegistrar

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
    ...
}

扫描FeignClient

FeignClientsRegistrar主要做了两件事

  1. 扫描basePackage下带FeignClient注解的类
  2. 注册工厂方法,用于为第一步扫描出来的类生成代理对象
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {

        LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
        
        //指定注解过滤器,过滤的注解为FeignClient
        scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
        Set<String> basePackages = getBasePackages(metadata);
        for (String basePackage : basePackages) {
            //扫描basePackage下带FeignClient注解的类并包装成beanDefinition
            candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
        }

        for (BeanDefinition candidateComponent : candidateComponents) {
            if (candidateComponent instanceof AnnotatedBeanDefinition) {
                // verify annotated class is an interface
                AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
                AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();

                Map<String, Object> attributes = annotationMetadata
                        .getAnnotationAttributes(FeignClient.class.getCanonicalName());

                String name = getClientName(attributes);
                registerClientConfiguration(registry, name, attributes.get("configuration"));

                registerFeignClient(registry, annotationMetadata, attributes);
            }
        }
    }

注册beanDefinition以及callback

factoryBean.getObject(); 生成代理对象。通过将SynchronousMethodHandler包装成InvocationHandler植入目标对象来完成。而SynchronousMethodHandler里负责http调用的细节。

private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
            Map<String, Object> attributes) {
        String className = annotationMetadata.getClassName();
        Class clazz = ClassUtils.resolveClassName(className, null);

        FeignClientFactoryBean factoryBean = new FeignClientFactoryBean();
        factoryBean.setType(clazz);

        //注册一个回调来生成类型为clazz的bean
        //往beanDefinition里放入一个Supplier的实例,Spring会优先通过Supplier来创建目标bean
        BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {
            //返回代理对象
            return factoryBean.getObject();
        });

        BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers);
        //注册beanDefinition
        BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    }

拦截器

创建SynchronousMethodHandler.Factory

public abstract class Feign {

    public <T> T target(Target<T> target) {
      return build().newInstance(target);
  }

    public <T> T target(Target<T> target) {
      return build().newInstance(target);
    }

  public Feign build() {

        //创建SynchronousMethodHandler的factory
    SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
        new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
            logLevel, decode404, closeAfterDecode, propagationPolicy, forceDecoding);
    ParseHandlersByName handlersByName =
        new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
            errorDecoder, synchronousMethodHandlerFactory);
    return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
  }
}

生成代理对象

public class ReflectiveFeign extends Feign {
        public <T> T newInstance(Target<T> target) {
            //通过SynchronousMethodHandler.Factory的create方法创建SynchronousMethodHandler
          Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
          Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();

          methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));

            //通过SynchronousMethodHandler创建FeignInvocationHandler
          InvocationHandler handler = factory.create(target, methodToHandler);

            //生成代理对象
          T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
              new Class<?>[] {target.type()}, handler);
        
          return proxy;
        }
}

InvocationHandler

InvocationHandler的生成与执行

static class FeignInvocationHandler implements InvocationHandler {

    private final Target target;
    private final Map<Method, MethodHandler> dispatch;

    FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
            //SynchronousMethodHandler是MethodHandler的子类
            //method是目标方法
      this.target = checkNotNull(target, "target");
      this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
    }

        @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //根据目标方法拿到MethodHandler并调用其invoke方法
      return dispatch.get(method).invoke(args);
    }
}

重试策略

SynchronousMethodHandler,http调用入口以及重试策略

final class SynchronousMethodHandler implements MethodHandler {
    @Override
  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
                //执行http request
        return executeAndDecode(template, options);
      } catch (RetryableException e) {
                //IO异常
                //服务提供者返回服务端错误(status>300&status!=404),并且response header里有Retry-After
        try {
                    //检查是否重试,重试次数达到retryer.maxAttempts就会抛出RetryableException
                    //重试间隔:如果是服务端错误,则使用Retry-After指定的值,但是不能超过maxPeriod
                    //如果是IO异常,则每次的间隔延长1.5倍
                    //如果符合重试要求,则不抛出异常
                    //服务端异常参考:ErrorDecoder#decode
                    //IO异常参考:SynchronousMethodHandler#executeAndDecode
          retryer.continueOrPropagate(e);
        } catch (RetryableException th) {
          Throwable cause = th.getCause();
          if (propagationPolicy == UNWRAP && cause != null) {
            throw cause;
          } else {
            throw th;
          }
        }
        continue;
      }
    }
  }

    Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
        ...
    Request request = targetRequest(template);
        try {
            response = client.execute(request, options);
        } catch (IOException e) {
      //抛出RetryableException
      throw errorExecuting(request, e);
    }

        ...
  }

}

调用LoadBalancer

获取服务提供者信息并执行http请求

public class FeignBlockingLoadBalancerClient implements Client {
        @Override
        public Response execute(Request request, Request.Options options) throws IOException {
            //从eureka client中根据serviceId获取服务提供方的地址信息
            //serviceId是FeignClient注解的value
            //通过eurekaClient从eurekaServer获取服务提供方的地址信息
            ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
            ...

            //生成并执行http请求
            return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
                    supportedLifecycleProcessors);
        }
}

获取服务提供者信息

根据FeignClient里设置的value获取服务提供者信息

public class BlockingLoadBalancerClient implements LoadBalancerClient {
        public <T> ServiceInstance choose(String serviceId, Request<T> request) {
                //从容器中获取ReactorServiceInstanceLoadBalancer的实例
                //默认是RoundRobinLoadBalancer
                //参考LoadBalancerClientConfiguration
           ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
           if (loadBalancer == null) {
              return null;
           }
                //通过loadBanlancer获取地址信息
           Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
           if (loadBalancerResponse == null) {
              return null;
           }
           return loadBalancerResponse.getServer();
        }
}

负载均衡处理

public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer { 
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        //拿到ServiceInstanceListSupplier,持有eurekaClient
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
                .getIfAvailable(NoopServiceInstanceListSupplier::new);

        //根据请求从eureka拿到所有的服务端实例后选出一个实例
        return supplier.get(request).next()
                .map(serviceInstances -> 
                        //从serviceInstances选择一个实例
                        processInstanceResponse(supplier, serviceInstances)
                );
    }

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
            List<ServiceInstance> serviceInstances) {
        //轮询选出一个实例
        Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
        //每次获取+1
        int pos = Math.abs(this.position.incrementAndGet());
        //取余实现轮询
        ServiceInstance instance = instances.get(pos % instances.size());

        return new DefaultResponse(instance);
    }
}

解决问题

  1. 通过代理生成http进行远程调用
  2. 过eureka clienteureka server根据appName(FeignClient注解的value)获取服务提供者信息,并通过refresh保证信息及时性
  3. 默认通过轮询的方式
  4. 启动时,并通过周期性任务以及状态监听器保证数据变更时能够及时通知到eureka server
  5. 通过定时(默认10s)向eureka server发送心跳消息(renew)
  6. 通过定时(默认30s)从eureka server获取服务信息,然后根据appName更新本地缓存(refresh)
  7. 如果没超过最大重试次数,则定时重试,间隔时间每次增加1.5倍,但不超过最大重试时间(默认5s)。如果服务端返回的header里设置了Retry-After,则根据该header指定的值来设置间隔时间,且不超过配置的最大间隔

参考

https://github.com/Netflix/eureka/wiki/Eureka-at-a-glance

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容