手撕Gateway源码,今日撕工作流程、负载均衡源码

Spring Cloud Gateway源码剖析

通过前面的学习,我们知道SpringCloud Gateway是一个微服务网关,主要实现不同功能服务路由,关于SpringCloud Gateway的实战使用我们就告一段落,我们接下来深入学习SpringCloud Gateway源码。

2.1 Gateway工作流程源码剖析

2.1.1 Gateway工作流程分析

image.png

前面我们已经学习过Gateway的工作流程,如上工作流程图,我们回顾一下工作流程:

1:所有都将由ReactorHttpHandlerAdapter.apply()方法拦截处理,此时会封装请求对象和响应对象,并传递到HttpWebHandlerAdapter.handle()方法。

2:HttpWebHandlerAdapter.handle(),将request和response封装成上下文对象ServerWebExchange,方法通过getDelegate()获取全局异常处理器ExceptionHandlingWebHandler执行全局异常处理

3:ExceptionHandlingWebHandler执行完成后,调用DispatcherHandler.handle(),循环所有handlerMappings查找处理当前请求的Handler

4:找到Handler后调用DispatcherHandler.invokeHandler()执行找到的Handler,此时会调用FilteringWebHandler.handle()

5:DefaultGatewayFilterChain.filter()是关键流程,所有过滤器都会在这里执行,比如服务查找、负载均衡、远程调用等,都在这一块。

上面工作流程我们都是基于说的层面,接下来我们一层一层分析Gateway源码,深入学习Gateway。

2.1.2 Gateway工作流程源码

我们首先来看一下Gateway拦截处理所有请求的方法handle():

/****
*处理所有请求
****/
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
    if (this.forwardedHeaderTransformer != null) {
        request = this.forwardedHeaderTransformer.apply(request);
    }
    //创建网关上下文对象
    ServerWebExchange exchange = createExchange(request, response);

    LogFormatUtils.traceDebug(logger, traceOn ->
            exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
                    (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

    //getDelegate()获取当前的Handler
    return getDelegate().handle(exchange)
            .doOnSuccess(aVoid -> logResponse(exchange))
            .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
            .then(Mono.defer(response::setComplete));
}

上面getDelegate()方法源码如下:

/**
* Return the wrapped delegate.
* 返回WebHandler:处理web请求的对象
*/
public WebHandler getDelegate() {
    return this.delegate;
}

我们进行Debug测试如下:


image.png

当前返回的WebHandler是ExceptionHandlingWebHandler,而ExceptionHandlingWebHandler的delegate是FilteringWebHandler,而FilteringWebHandler的delegate是delegateDispatcherHandler,所有的delegate的handle()方法都会依次执行,我们可以把断点放到DispatcherHandler.handler()方法上:

image.png

handler()方法会调用所有handlerMappings的getHandler(exchange)方法,而getHandler(exchange)方法会调用getHandlerInternal(exchange)方法:

image.png

getHandlerInternal(exchange)该方法由各个HandlerMapping自行实现,我们可以观察下断言处理的RoutePredicateHandlerMappinggetHandlerInternal(exchange)方法会调用lookupRoute方法,该方法用于返回对应的路由信息:

image.png

这里的路由匹配其实就是我们项目中对应路由配置的一个一个服务的信息,这些服务信息可以帮我们找到我们要调用的真实服务:

image.png

每个Route对象如下:

image.png

Route的DEBUG数据如下:

image.png

找到对应Route后会返回指定的FilterWebHandler,如下代码:

image.png

FilterWebHandler主要包含了所有的过滤器,过滤器按照一定顺序排序,主要是order值,越小越靠前排,过滤器中主要将请求交给指定真实服务处理了,debug测试如下:

image.png

这里有RouteToRequestUrlFilterForwardRoutingFilter以及LoadBalancerClientFilter等多个过滤器。

2.1.3 请求处理

在上面FilterWebHandler中有2个过滤器,分别为RouteToRequestUrlFilterForwardRoutingFilter

RouteToRequestUrlFilter:用于根据匹配的 Route,计算请求地址得到 lb://hailtaxi-order/order/list

ForwardRoutingFilter:转发路由网关过滤器。其根据 forward:// 前缀( Scheme )过滤处理,将请求转发到当前网关实例本地接口。

2.1.3.1 RouteToRequestUrlFilter真实服务查找

RouteToRequestUrlFilter源码如下:

/***
 * 处理uri过滤器
 * @param exchange
 * @param chain
 * @return
 */
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    //获取当前的route
    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
    if (route == null) {
        return chain.filter(exchange);
    }
    log.trace("RouteToRequestUrlFilter start");
    //得到uri  = http://localhost:8001/order/list?token=123
    URI uri = exchange.getRequest().getURI();
    boolean encoded = containsEncodedParts(uri);
    URI routeUri = route.getUri();

    if (hasAnotherScheme(routeUri)) {
        // this is a special url, save scheme to special attribute
        // replace routeUri with schemeSpecificPart
        exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
                routeUri.getScheme());
        routeUri = URI.create(routeUri.getSchemeSpecificPart());
    }

    if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
        // Load balanced URIs should always have a host. If the host is null it is
        // most
        // likely because the host name was invalid (for example included an
        // underscore)
        throw new IllegalStateException("Invalid host: " + routeUri.toString());
    }

    //将uri换成 lb://hailtaxi-order/order/list?token=123 
    URI mergedUrl = UriComponentsBuilder.fromUri(uri)
            // .uri(routeUri)
            .scheme(routeUri.getScheme()).host(routeUri.getHost())
            .port(routeUri.getPort()).build(encoded).toUri();
    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
    return chain.filter(exchange);
}

debug调试结果如下:

image.png

从上面调试结果我们可以看到所选择的Route以及uri和routeUri和mergedUrl,该过滤器其实就是将用户请求的地址换成服务地址,换成服务地址可以用来做负载均衡。

2.1.3.2 NettyRoutingFilter远程调用

SpringCloud在实现对后端服务远程调用是基于Netty发送Http请求实现,核心代码在NettyRoutingFilter.filter()中,其中核心代码为send()方法,代码如下:

Flux<HttpClientResponse> responseFlux = httpClientWithTimeoutFrom(route)
        // 头信息处理
        .headers(headers -> {
            headers.add(httpHeaders);
            // Will either be set below, or later by Netty
            headers.remove(HttpHeaders.HOST);
            if (preserveHost) {
                String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                headers.add(HttpHeaders.HOST, host);
            }
            // 执行发送,基于HTTP协议
        }).request(method).uri(url).send((req, nettyOutbound) -> {
            if (log.isTraceEnabled()) {
                nettyOutbound
                        .withConnection(connection -> log.trace("outbound route: "
                                + connection.channel().id().asShortText()
                                + ", inbound: " + exchange.getLogPrefix()));
            }
            return nettyOutbound.send(request.getBody()
                    .map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
                            .getNativeBuffer()));
        }).
        // 响应结果
        responseConnection((res, connection) -> {

            // Defer committing the response until all route filters have run
            // Put client response as ServerWebExchange attribute and write
            // response later NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
            // 获取响应结果
            ServerHttpResponse response = exchange.getResponse();
            // put headers and status so filters can modify the response
            HttpHeaders headers = new HttpHeaders();

            res.responseHeaders().forEach(
                    entry -> headers.add(entry.getKey(), entry.getValue()));

            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {
                exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
                        contentTypeValue);
            }

            setResponseStatus(res, response);

            // make sure headers filters run after setting status so it is
            // available in response
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
                    getHeadersFilters(), headers, exchange, Type.RESPONSE);

            if (!filteredResponseHeaders
                    .containsKey(HttpHeaders.TRANSFER_ENCODING)
                    && filteredResponseHeaders
                            .containsKey(HttpHeaders.CONTENT_LENGTH)) {
                // It is not valid to have both the transfer-encoding header and
                // the content-length header.
                // Remove the transfer-encoding header in the response if the
                // content-length header is present.
                response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }

            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
                    filteredResponseHeaders.keySet());

            response.getHeaders().putAll(filteredResponseHeaders);

            return Mono.just(res);
        });

Duration responseTimeout = getResponseTimeout(route);

上面send方法最终会调用ChannelOperations>send()方法,而该方法其实是基于了Netty实现数据发送,核心代码如下:

image.png
2.1.3.3 Netty特性

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,他的并发性能得到了很大提高,对比于BIO(Blocking I/O,阻塞IO),隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。Netty 是一个广泛使用的 Java 网络编程框架。

传输极快

Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。
Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

image.png

良好的封装

Netty无论是性能还是封装性都远远超越传统Socket编程。

image.png

Channel:表示一个连接,可以理解为每一个请求,就是一个Channel。

ChannelHandler:核心处理业务就在这里,用于处理业务请求。

ChannelHandlerContext:用于传输业务数据。

ChannelPipeline:用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。

ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:

image.png

2.2 Gateway负载均衡源码剖析

前面源码剖析主要剖析了Gateway的工作流程,我们接下来剖析Gateway的负载均衡流程。在最后的过滤器集合中有LoadBalancerClientFilter过滤器,该过滤器是用于实现负载均衡。

2.2.1 地址转换

LoadBalancerClientFilter过滤器首先会将用户请求地址转换成真实服务地址,也就是IP:端口号,源码如下:

/***
 * 负载均衡过滤
 * @param exchange
 * @param chain
 * @return
 */
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    //负载均衡的URL = lb://hailtaxi-order/order/list?token=123
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
    if (url == null
            || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
        return chain.filter(exchange);
    }
    // preserve the original url
    addOriginalRequestUrl(exchange, url);

    if (log.isTraceEnabled()) {
        log.trace("LoadBalancerClientFilter url before: " + url);
    }
    
    //服务选择
    final ServiceInstance instance = choose(exchange);

    if (instance == null) {
        throw NotFoundException.create(properties.isUse404(),
                "Unable to find instance for " + url.getHost());
    }
    //用户提交的URI = http://localhost:8001/order/list?token=123
    URI uri = exchange.getRequest().getURI();

    // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
    // if the loadbalancer doesn't provide one.
    String overrideScheme = instance.isSecure() ? "https" : "http";
    if (schemePrefix != null) {
        overrideScheme = url.getScheme();
    }
    //真实服务的URL =http://192.168.211.1:18182/order/list?token=123
    URI requestUrl = loadBalancer.reconstructURI(
            new DelegatingServiceInstance(instance, overrideScheme), uri);

    if (log.isTraceEnabled()) {
        log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
    }

    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
    return chain.filter(exchange);
}

2.2.2 负载均衡服务选择

上面代码的关键是choose(exchange)的调用,该方法调用其实就是选择指定服务,这里涉及到负载均衡服务轮询调用算法等,我们可以跟踪进去查看方法执行流程。

image.png

Gateway自身已经集成Ribbon,所以看到的对象是RibbonLoadBalancerClient,我们跟踪进去接着查看:

!
image.png

上面方法会依次调用到getInstance()方法,该方法会返回所有可用实例,有可能有多个实例,如果有多个实例就涉及到负载均衡算法,方法调用如下图:

image.png

此时调用getServer()方法,再调用BaseLoadBalancer.chooseServer(),这里是根据指定算法获取对应实例,代码如下:

image.png

BaseLoadBalancer是属于Ribbon的算法,我们可以通过如下依赖包了解,并且该算法默认用的是RoundRobinRule,也就是随机算法,如下代码:

image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容