Ribbon结合Nacos的使用

本次是ribbon结合Nacos的源码解析,Nacos的版本为2.2.8,ribbon和getway的源码还是相对简单的
上一篇文章总结了LoadBalanced注解,但是没有贴出来被LoadBalanced标注的RestTemplate如何使用

        String url = "http://user-system/order/findOrderByUserId/"+id;
        R result = restTemplate.getForObject(url,R.class);

其中user-system就是微服务的名称,如果restTemplate没有被LoadBalanced标注,那么访问是会报错误的,
关键点就是restTemplate添加过滤器,本片文章分析添加了过滤器的restTemplate是如何把服务名称解析为ip地址,包括Nacos自带的路由规则.

R result = restTemplate.getForObject(url,R.class);
调用链
--org.springframework.web.client.RestTemplate#getForObject
--org.springframework.web.client.RestTemplate#execute
--org.springframework.web.client.RestTemplate#doExecute
doExecute的部分方法

        try {
//创建ClientHttpRequest 
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            response = request.execute();
            handleResponse(url, method, response);
            return (responseExtractor != null ? responseExtractor.extractData(response) : null);
        }

1.createRequest(url, method)方法返回值


image.png

2.response = request.execute();方法
调用链
--org.springframework.http.client.AbstractBufferingClientHttpRequest#executeInternal(HttpHeaders)
--org.springframework.http.client.InterceptingClientHttpRequest#executeInternal
--org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#execute

        @Override
        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
//this.iterator = interceptors.iterator() 拦截器的迭代器,if (this.iterator.hasNext())这个代码证明该execute方法
//肯定会被递归的调用
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
//调用拦截器的intercept方法,这里的拦截器是LoadBalancerInterceptor
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }

2.1LoadBalancerInterceptor#intercept方法

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null,
                "Request URI does not contain a valid hostname: " + originalUri);
//这里是将服务名称转ip的方法
        return this.loadBalancer.execute(serviceName,
                this.requestFactory.createRequest(request, body, execution));
    }

2.1.1分析return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
首先分析this.requestFactory.createRequest(request, body, execution),因为返回的是函数性接口(LoadBalancerRequest),这里不会马上的调用

    public LoadBalancerRequest<ClientHttpResponse> createRequest(
            final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) {
        return instance -> {
            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
                    this.loadBalancer);
            if (this.transformers != null) {
                for (LoadBalancerRequestTransformer transformer : this.transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest,
                            instance);
                }
            }
//还记得上面org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#execute
//方法中if (this.iterator.hasNext())的判断吗,这里就是递归的调用
            return execution.execute(serviceRequest, body);
        };
    }

2.1.2 this.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request, body, execution)); 中的execute方法
org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#execute

    public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
            throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
//关注这里,会调用的rule.choose(key); rule是注入到容器的NacosRule
        Server server = getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));
//这里会调用函数性接口(LoadBalancerRequest)
        return execute(serviceId, ribbonServer, request);
    }

2.1.2.1通过 getServer(loadBalancer, hint);最终调用到NacosRule的choose方法


image.png

2.1.2.2 NacosRule#choose

    @Override
    public Server choose(Object key) {
        try {
            String clusterName = this.nacosDiscoveryProperties.getClusterName();
            String group = this.nacosDiscoveryProperties.getGroup();
            DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer();
            String name = loadBalancer.getName();
            NamingService namingService = nacosServiceManager
                    .getNamingService(nacosDiscoveryProperties.getNacosProperties());
//获取服务名称对应的实例集合,包含了ip端口等信息
            List<Instance> instances = namingService.selectInstances(name, group, true);
            if (CollectionUtils.isEmpty(instances)) {
                LOGGER.warn("no instance in service {}", name);
                return null;
            }
            List<Instance> instancesToChoose = instances;
//这里就是通过nacos通过ClusterName标签实现就近访问的代码
//同一个ClusterName标签首先会互相调用,如果没有则调用其他服务
            if (StringUtils.isNotBlank(clusterName)) {
                List<Instance> sameClusterInstances = instances.stream()
                        .filter(instance -> Objects.equals(clusterName,
                                instance.getClusterName()))
                        .collect(Collectors.toList());
                if (!CollectionUtils.isEmpty(sameClusterInstances)) {
                    instancesToChoose = sameClusterInstances;
                }
                else {
                    LOGGER.warn(
                            "A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}",
                            name, clusterName, instances);
                }
            }
//通过权重随机调用
            Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);
            return new NacosServer(instance);
        }
        catch (Exception e) {
            LOGGER.warn("NacosRule error", e);
            return null;
        }
    }

返回到2
拦截器调用结束后,进入else方法,调用下游服务

 @Override
        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
//this.iterator = interceptors.iterator() 拦截器的迭代器,if (this.iterator.hasNext())这个代码证明该execute方法
//肯定会被递归的调用
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
//调用拦截器的intercept方法,这里的拦截器是LoadBalancerInterceptor
                return nextInterceptor.intercept(request, body, this);
            }
            else {
//拦截器链调用结束后
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
//调用下游服务
                return delegate.execute();
            }
        }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容

  • 简介 Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载均衡工具,它基于Netflix ...
    Chandler_珏瑜阅读 251,036评论 22 183
  • 一. 核心接口 ILoadBalancer Ribbon通过ILoadBalancer接口对外提供统一的选择服务...
    aiwen2017阅读 5,589评论 0 1
  • 本文作者:陈刚,叩丁狼高级讲师。原创文章,转载请注明出处。 本文章会通过断点跟踪的方式来解读 Ribbon 源码 ...
    叩丁狼教育阅读 1,982评论 0 3
  • 断断续续看Ribbon的源码差不多也有7-8天了,总算告一段落。本文记录了这些天对源码的阅读过程与一些分析理解,如...
    程序猿DD阅读 6,541评论 6 11
  • 本人小白,这个文章是本人的阅读笔记,不是权威解读,需要自己甄别对错 大致流程,具体代码没深入分析,大致跟读了下源码...
    逐月沐风阅读 448评论 0 0