我们都知道ribbon 是做负载均衡的一个组件,那么它是如何拿到 注册中心的服务列表的呢?本次我们就来解决这个问题。首先我们来看一个简单的案例,使用ribbon 客户端来实现一下负载均衡。
@Autowired// ribbon 客户端
private LoadBalancerClient loadBalancerClient;
@GetMapping("/hello2")
public String hello2() {
RestTemplate restTemplate = new RestTemplate();
// 获取服务实例
ServiceInstance serviceInstance = loadBalancerClient.choose("server-provider");
String url = String.format("http://%s:%s",serviceInstance.getHost(), serviceInstance.getPort() + "/hello");
String template = restTemplate.getForObject(url, String.class);
return template;
}
ServiceInstance serviceInstance = loadBalancerClient.choose("server-provider"); 这句话就是拿到 经过负载均衡算法后 得到的一个 服务实例。进去看一下这个方法。
*
* @author Ryan Baxter
*/
public interface ServiceInstanceChooser {
/**
* Chooses a ServiceInstance from the LoadBalancer for the specified service.
* @param serviceId The service ID to look up the LoadBalancer.
* @return A ServiceInstance that matches the serviceId.
*/
ServiceInstance choose(String serviceId);
}
- 去到 RibbonLoadBalancerClient
....
@Override
public ServiceInstance choose(String serviceId) {
//去到下面这个 方法
return choose(serviceId, null);
}
/**
* New: Select a server using a 'key'.
* @param serviceId of the service to choose an instance for
* @param hint to specify the service instance
* @return the selected {@link ServiceInstance}
*/
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
Server server = getServer(getLoadBalancer(serviceId), hint);
- getLoadBalancer(serviceId): 看下这个方法。获取一个 LoadBalancer
public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory;
public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
}
- SpringClientFactory
/**
* Get the load balancer associated with the name.
* @param name name to search by
* @return {@link ILoadBalancer} instance
* @throws RuntimeException if any error occurs
*/
public ILoadBalancer getLoadBalancer(String name) {
return getInstance(name, ILoadBalancer.class);
}
@Override
public <C> C getInstance(String name, Class<C> type) {
C instance = super.getInstance(name, type);
if (instance != null) {
return instance;
}
IClientConfig config = getInstance(name, IClientConfig.class);
return instantiateWithConfig(getContext(name), type, config);
}
- C instance = super.getInstance(name, type);
C instance = super.getInstance(name, type); 这句往上走,就是从spring 容器中 获取一个 instance -> ILoadBalancer 。这里就不往下追了。我们去看一下 ILoadBalancer 这个接口。
- ILoadBalancer
public interface ILoadBalancer {
/**
* Initial list of servers.
* This API also serves to add additional ones at a later time
* The same logical server (host:port) could essentially be added multiple times
* (helpful in cases where you want to give more "weightage" perhaps ..)
*
* @param newServers new servers to add
*/
public void addServers(List<Server> newServers);
/**
* Choose a server from load balancer.
*
* @param key An object that the load balancer may use to determine which server to return. null if
* the load balancer does not use this parameter.
* @return server chosen
*/
public Server chooseServer(Object key);
/**
* To be called by the clients of the load balancer to notify that a Server is down
* else, the LB will think its still Alive until the next Ping cycle - potentially
* (assuming that the LB Impl does a ping)
*
* @param server Server to mark as down
*/
public void markServerDown(Server server);
/**
* @deprecated 2016-01-20 This method is deprecated in favor of the
* cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
* and {@link #getAllServers} API (equivalent to availableOnly=false).
*
* Get the current list of servers.
*
* @param availableOnly if true, only live and available servers should be returned
*/
@Deprecated
public List<Server> getServerList(boolean availableOnly);
/**
* @return Only the servers that are up and reachable.
*/
public List<Server> getReachableServers();
/**
* @return All known servers, both reachable and unreachable.
*/
public List<Server> getAllServers();
}
这个接口有一个实现类 :DynamicServerListLoadBalancer。
C instance = super.getInstance(name, type); 我们debug的话 会发现 这里的 instance -> DynamicServerListLoadBalancer。也就说这里返回的是 DynamicServerListLoadBalancer 对象。
- DynamicServerListLoadBalancer
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);
boolean isSecure = false;
boolean useTunnel = false;
// to keep track of modification of server lists
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
volatile ServerList<T> serverListImpl;
volatile ServerListFilter<T> filter;
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers(); //更新服务列表。
}
};
@Deprecated
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter) {
this(
clientConfig,
rule,
ping,
serverList,
filter,
new PollingServerListUpdater()
);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
initWithNiwsConfig(clientConfig);
}
- restOfInit(clientConfig); // initWithNiwsConfig(clientConfig);
initWithNiwsConfig(clientConfig) ; 这个方法里面会 调用 restOfInit(clientConfig);
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature(); // 开启后台线程 更新服务列表。
updateListOfServers(); // 更新服务列表。
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
- enableAndInitLearnNewServersFeature();
/**
* Feature that lets us add new instances (from AMIs) to the list of
* existing servers that the LB will use Call this method if you want this
* feature enabled
*/
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction); //开始执行后台更新。
}
现在我们来看一下 serverListUpdater.start(updateAction); 前面有一段 下面的代码:
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers(); //
}
};
// serverListUpdater.start(updateAction); // 这句话就是 回调 的 这里的 doUpdate();
- serverListUpdater.start(updateAction); -> PollingServerListUpdater 走的这里的start 方法。
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate(); // 回调。
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable, //传入线程,定时轮询,
initialDelayMs, //延迟启动
refreshIntervalMs,// 服务列表刷新 时间间隔
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
到这里我们就明白了 ,ribbon 也是 定时轮询服务列表,使用后台线程 完成的。
接下来我们 就来分析 一下 具体 获取服务列表的方法 updateListOfServers();
- updateListOfServers();
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
// 获取服务列表
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
//保存到本地缓存
updateAllServerList(servers);
}
这里呢,我们知道 ribbon 获取了 服务列表后 保存在了本地缓存里面。 updateAllServerList(servers);
servers = serverListImpl.getUpdatedListOfServers();
serverListImpl -> DiscoveryEnabledNIWSServerList
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
- obtainServersViaDiscovery();
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
// 到这里 就明白了 ribbon 是通过 EurekaClient 来完成的服务列表的获取,也就说使用的还是 eureka 自己的服务发现。
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
// 获取服务实例
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
// 返回了一个服务列表。
return serverList;
}
好了,到这里 我们就分析出 ribbon 是 如何 去 注册中心获取 服务列表的了。 ribbon 会在 后台 开一个定时任务,使用 eureka client 自己的服务发现 去获取 服务列表,然后 保存到本地缓存,当客户段在 消费服务的时候 经过负载均衡算法 选取其中一个服务实例,并通过 restTemplate 发送http 请求 ,请求到 服务提供者。完成带有负载均衡的 远程通信。
这里我们会发现一个问题,也是在 使用spring cloud 开发中经常 碰到的一个现象,就是 当服务在做集群的时候 ,新增 或者 挂掉一个服务时候,不能及时反映到系统里面。到这里我们就知道 原因了,1. eureka client 端的 服务发现 是通过 后台定时任务去更新的,保存到本地缓存的 ,2. ribbon 也是通过 后台定时任务 借助 于 eureka client 端的 服务发现 去查询服务列表,然后保存到本地的。这里就产生了 延迟的问题。当然 eureka 本身是 居于 AP 的,所以如果要使用 eureka 做为注册中的话,一定要 明确 系统是否对 时效 性有要求,如果有的话 就不适合,可以 考虑 zk(CP) 、nacos 等。
服务列表我们拿到了,接下来会 分析 ribbon 的负载均衡策略。