Ribbon负载均衡

最近对负载均衡的实现原理比较感兴趣,于是阅读了其相关的源码并通过作图记录下整个流程,对于自己感兴趣的某一个模块我会探究下去,后面我也会带着当初我不懂的问题去寻找答案,包括我之前写博客的方式也是带是着问题去找答案,这样会有一个很好的方向。

Q:
1.ribbon负载均衡如何获取服务列表
2.负载均衡如何保持数据更新
3.负载均衡的策略
4.Spring RestTemplate的使用

负载均衡uml摘要图

通过enrekaClient获取服务列表

负载均衡策略

心跳

关于负载均衡的使用,网上已有比较全面的资料,在这里主要记录,Spring RestTemplate 是如何结合负载均衡使用的

开启负载均衡

    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }

例子,调用服务EUREKACLIENT的接口

/**
 * 沈敏杰
 * 18/6/8 下午3:37
 * 测试服务
 */
@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    public String send(String name) {
        return restTemplate.postForObject("http://EUREKACLIENT/helloController/hi?name=" + name, null, String.class);
    }

}

通过使用声明Bean并且通过注解@LoadBalanced开启负载均衡,实际上RestTemplate能做到负载均衡是通过添加拦截器,将url的服务,通过ILoadBalancer获取该服务的实际地址。我们通过@LoadBalanced开启负载均衡,RestTemplate注册拦截器。

LoadBalancerAutoConfiguration.java

    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {

        /**
         * 声明负载均衡拦截器 Bean
         */
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        /**
         * 创建RestTemplate 定制器
         * restTemplate添加拦截器
         */
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
    }

以上为开启负载均衡,restTemplate添加拦截器后发起请求:

restTemplate.postForObject("http://EUREKACLIENT/helloController/hi?name=" + name, null, String.class);

我们先通过这个进行查找源码分析
RestTemplate.java:

    @Override
    @Nullable
    public <T> T postForObject(String url, @Nullable Object request, Class<T> responseType,
            Object... uriVariables) throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {

        URI expanded = getUriTemplateHandler().expand(url, uriVariables);
        return doExecute(expanded, method, requestCallback, responseExtractor);
    }

    @Nullable
    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
                              @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

        Assert.notNull(url, "URI is required");
        Assert.notNull(method, "HttpMethod is required");
        ClientHttpResponse response = null;
        try {
            //创建请求对象
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            response = request.execute();
            handleResponse(url, method, response);
            return (responseExtractor != null ? responseExtractor.extractData(response) : null);
        }
        catch (IOException ex) {
            String resource = url.toString();
            String query = url.getRawQuery();
            resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
            throw new ResourceAccessException("I/O error on " + method.name() +
                    " request for \"" + resource + "\": " + ex.getMessage(), ex);
        }
        finally {
            if (response != null) {
                response.close();
            }
        }
    }

当我们发起一个请求的时候,会执行doExecute,然后创建一个http请求对象ClientHttpRequest,ClientHttpRequest是一个接口

/**
 * Represents a client-side HTTP request.
 * Created via an implementation of the {@link ClientHttpRequestFactory}.
 *
 * <p>A {@code ClientHttpRequest} can be {@linkplain #execute() executed},
 * receiving a {@link ClientHttpResponse} which can be read from.
 *
 * @author Arjen Poutsma
 * @since 3.0
 * @see ClientHttpRequestFactory#createRequest(java.net.URI, HttpMethod)
 */
public interface ClientHttpRequest extends HttpRequest, HttpOutputMessage {

    /**
     * Execute this request, resulting in a {@link ClientHttpResponse} that can be read.
     * @return the response result of the execution
     * @throws IOException in case of I/O errors
     */
    ClientHttpResponse execute() throws IOException;

}

我们看到注释:

Created via an implementation of the {@link ClientHttpRequestFactory}.

ClientHttpRequest是通过ClientHttpRequestFactory创建的,看回RestTemplate是如何创建ClientHttpRequest的

/**
     * Create a new {@link ClientHttpRequest} via this template's {@link ClientHttpRequestFactory}.
     * @param url the URL to connect to
     * @param method the HTTP method to execute (GET, POST, etc)
     * @return the created request
     * @throws IOException in case of I/O errors
     * @see #getRequestFactory()
     * @see ClientHttpRequestFactory#createRequest(URI, HttpMethod)
     */
    protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
        ClientHttpRequest request = getRequestFactory().createRequest(url, method);
        if (logger.isDebugEnabled()) {
            logger.debug("Created " + method.name() + " request for \"" + url + "\"");
        }
        return request;
    }

    /**
     * Overridden to expose an {@link InterceptingClientHttpRequestFactory}
     * if necessary.
     * @see #getInterceptors()
     */
    @Override
    public ClientHttpRequestFactory getRequestFactory() {
        //获取当前对象持有的拦截器列表
        List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
        //如果拦截器列表不为空,创建InterceptingClientHttpRequestFactory并返回
        if (!CollectionUtils.isEmpty(interceptors)) {
            ClientHttpRequestFactory factory = this.interceptingRequestFactory;
            if (factory == null) {
                factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
                this.interceptingRequestFactory = factory;
            }
            return factory;
        }
        else {
            return super.getRequestFactory();
        }
    }

在RestTample获取ClientHttpRequestFactory工厂类,根据当前是否持有拦截器,如果有拦截器则返回InterceptingClientHttpRequestFactory http请求工厂类:

/**
     * {@link ClientHttpRequestFactory} wrapper with support for {@link ClientHttpRequestInterceptor}s.
     *
     * @author Arjen Poutsma
     * @since 3.1
     * @see ClientHttpRequestFactory
     * @see ClientHttpRequestInterceptor
     */
    public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {

        private final List<ClientHttpRequestInterceptor> interceptors;


        /**
         * Create a new instance of the {@code InterceptingClientHttpRequestFactory} with the given parameters.
         * @param requestFactory the request factory to wrap
         * @param interceptors the interceptors that are to be applied (can be {@code null})
         */
        public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
                                                    @Nullable List<ClientHttpRequestInterceptor> interceptors) {

            super(requestFactory);
            this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
        }


        @Override
        protected org.springframework.http.client.ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
            //创建具有拦截器请求类
            return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
        }

    }

获取到ClientHttpRequest请求对象后,发请求实际是执行execute()

response = request.execute();

InterceptingClientHttpRequest 实际上不是没有直接实现execute(),经过一层一层查找,实现execute()的是AbstractClientHttpRequest

public abstract class AbstractClientHttpRequest implements ClientHttpRequest {

    private final HttpHeaders headers = new HttpHeaders();

    private boolean executed = false;


    @Override
    public final HttpHeaders getHeaders() {
        return (this.executed ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
    }

    @Override
    public final OutputStream getBody() throws IOException {
        assertNotExecuted();
        return getBodyInternal(this.headers);
    }

    @Override
    public final ClientHttpResponse execute() throws IOException {
        assertNotExecuted();
        ClientHttpResponse result = executeInternal(this.headers);
        this.executed = true;
        return result;
    }

    /**
     * Assert that this request has not been {@linkplain #execute() executed} yet.
     * @throws IllegalStateException if this request has been executed
     */
    protected void assertNotExecuted() {
        Assert.state(!this.executed, "ClientHttpRequest already executed");
    }


    /**
     * Abstract template method that returns the body.
     * @param headers the HTTP headers
     * @return the body output stream
     */
    protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException;

    /**
     * Abstract template method that writes the given headers and content to the HTTP request.
     * @param headers the HTTP headers
     * @return the response object for the executed request
     */
    protected abstract ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException;

}

execute()会调用AbstractClientHttpRequest自己的一个抽象方法executeInternal(HttpHeaders headers),AbstractBufferingClientHttpRequest继承成AbstractClientHttpRequest,实现了executeInternal(HttpHeaders headers)

abstract class AbstractBufferingClientHttpRequest extends AbstractClientHttpRequest {

    private ByteArrayOutputStream bufferedOutput = new ByteArrayOutputStream(1024);


    @Override
    protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {
        return this.bufferedOutput;
    }

    @Override
    protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
        byte[] bytes = this.bufferedOutput.toByteArray();
        if (headers.getContentLength() < 0) {
            headers.setContentLength(bytes.length);
        }
        ClientHttpResponse result = executeInternal(headers, bytes);
        this.bufferedOutput = new ByteArrayOutputStream(0);
        return result;
    }

    /**
     * Abstract template method that writes the given headers and content to the HTTP request.
     * @param headers the HTTP headers
     * @param bufferedOutput the body content
     * @return the response object for the executed request
     */
    protected abstract ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput)
            throws IOException;


}

最后我们看到了executeInternal(HttpHeaders headers)还调用了一个抽象方法,executeInternal(HttpHeaders headers, byte[] bufferedOutput),这个就是最后拦截器请求对象实现的方法

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
        
        ....

        @Override
        protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
            org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution requestExecution = new org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution();
            return requestExecution.execute(this, bufferedOutput);
        }
    }

在executeInternal(HttpHeaders headers, byte[] bufferedOutput)中,声明一个执行器,InterceptingRequestExecution,InterceptingRequestExecution是InterceptingClientHttpRequest中的一个内部类

private class InterceptingRequestExecution implements ClientHttpRequestExecution {

        private final Iterator<ClientHttpRequestInterceptor> iterator;

        public InterceptingRequestExecution() {
            //根据拦截器列表,创建迭代器
            this.iterator = interceptors.iterator();
        }

        @Override
        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            //通过迭代器获取拦截器
            if (this.iterator.hasNext()) {
                //通过拦截器,执行请求
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                org.springframework.http.client.ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }
    }

InterceptingRequestExecution获取外部类的拦截器列表,通过该列表创建迭代器,从而获取拦截器,开头我们看到,@LoadBalanced开启负载均衡,实际上就是RestTamplate添加拦截器,以致发请求的时候会执行拦截器

 nextInterceptor.intercept(request, body, this);

实际执行是LoadBalancerInterceptor,我们看一下LoadBalancerInterceptor的代码:

/**
 * @author Spencer Gibb
 * @author Dave Syer
 * @author Ryan Baxter
 * @author William Tran
 */
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容