SpringCloudAlibaba集成Gateway动态路由Nacos服务

官方git:https://github.com/spring-cloud/spring-cloud-gateway
Spring Cloud Gateway网关是用来代替zuul1.x作为微服务架构中的网关组件,zuul1.x是最早的网关组件,由于使用单线程阻塞式链接,所以性能有问题,gateway是搭建在webflux框架之上的响应式网关服务,底层使用Netty框架作为通讯框架。zuul2.x也使用了Netty。性能上gateway是zuul1.x的1.5~2倍,与zuul2.x相当。
我们使用的是spring-cloud-alibaba的2021.1版本,还踩了一个小小的坑,下面说。

什么是gateway

我个人理解gateway就是一个业务nginx,它支持请求转发,负载均衡,统一埋点,限流降级、安全认证等等很多功能,gateway收集所有请求根据路由规则转发请求,使用统一的过滤器处理请求参数等,是不是听起来有点像nginx,但是比nginx拥有更多功能。

gateway三大组件

image.png

如何使用

springcloud 提供了gateway的springboot版本启动器,我们可以很方便的创建一个网关项目
首先创建一个springboot项目,引入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway</artifactId>
        </dependency>

修改application.yml配置

server:
  port: 9000
spring:
  application:
    name: gateway-web
  cloud:
    gateway:
      discovery:
        routes:
        - id: demo-1
          uri: 127.0.0.1:9004
          predicates:
            - Path=/demo/** ##基于Path匹配的路由规则 还有其他的路由规则

集成Nacos 实现动态路由

引入依赖

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- Spring Cloud 2020 中重磅推荐的负载均衡器 Spring Cloud LoadBalancer 不引用无法实现动态路由  -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>

修改application.yml配置

server:
  port: 9000
spring:
  application:
    name: gateway-web
  cloud:
    gateway:
      discovery:
        routes:
        - id: demo-1
          uri: lb://demo-web   ##'lb://'代表发现注册中心的服务 'demo-web'服务提供方的name 
          predicates:
            - Path=/demo/**
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        password: nacos
        username: nacos

启动类增加服务发现注解

@SpringBootApplication
@EnableDiscoveryClient
public class GatewayWebApplication {

    public static void main(String[] args) {
        SpringApplication.run(GatewayWebApplication.class, args);
    }

}

可以自nacos中发现该gateway服务


image.png

基于约定的服务动态路由 自动配置routes

修改application.yml配置

server:
  port: 9000
spring:
  application:
    name: gateway-web
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true ##开启自动发现
          lower-case-service-id: true ##转小写
#      routes:
#        - id: demo-1
#          uri: lb://demo-web
#          predicates:
#            - Path=/demo/**
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        password: nacos
        username: nacos

注意这里在访问服务的时候需要添加一级以服务名称为路径的资源
比如:http://127.0.0.1:9004/demo/userinfo
要访问 http://127.0.0.1:9004/demo-web/demo/userinfo demo-web为服务提供方的name

image.png

2021.1版本踩坑

网上很多教程都没有提示需要引入spring-cloud-starter-loadbalance,可能文章都比较老,loadbalance是2020版本提供的负载均衡器,这里是默认使用的,所以不引用就没办反实现动态路由,连个日志都没有,害我debug半天。
说几个在服务转发中几个比较重要的组件:

DispatcherHandler 入口
@Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }
RoutePredicateHandlerMapping 负责获取所有路由规则
    @Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        // don't handle requests on management port if set and different than server port
        if (this.managementPortType == DIFFERENT && this.managementPort != null
                && exchange.getRequest().getURI().getPort() == this.managementPort) {
            return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }

FilteringWebHandler 执行过滤器链
    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        List<GatewayFilter> gatewayFilters = route.getFilters();

        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        combined.addAll(gatewayFilters);
        // TODO: needed or cached?
        AnnotationAwareOrderComparator.sort(combined);

        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: " + combined);
        }

        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }
RouteToRequestUrlFilter 负责根据路由规则转换url
@Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
        if (route == null) {
            return chain.filter(exchange);
        }
        log.trace("RouteToRequestUrlFilter start");
        URI uri = exchange.getRequest().getURI();
        boolean encoded = containsEncodedParts(uri);
        URI routeUri = route.getUri();

        if (hasAnotherScheme(routeUri)) {
            // this is a special url, save scheme to special attribute
            // replace routeUri with schemeSpecificPart
            exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
            routeUri = URI.create(routeUri.getSchemeSpecificPart());
        }

        if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
            // Load balanced URIs should always have a host. If the host is null it is
            // most
            // likely because the host name was invalid (for example included an
            // underscore)
            throw new IllegalStateException("Invalid host: " + routeUri.toString());
        }

        URI mergedUrl = UriComponentsBuilder.fromUri(uri)
                // .uri(routeUri)
                .scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();
        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
        return chain.filter(exchange);
    }

ReactiveLoadBalancerClientFilter 负责对远程分布式服务做负载均衡

以前使用的是LoadBalancerClientFilter

@Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
        if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
            return chain.filter(exchange);
        }
        // preserve the original url
        addOriginalRequestUrl(exchange, url);

        if (log.isTraceEnabled()) {
            log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
        }

        URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String serviceId = requestUri.getHost();
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
                .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
                        RequestDataContext.class, ResponseData.class, ServiceInstance.class);
        DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(
                new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint())));
        return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {

            if (!response.hasServer()) {
                supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
                throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
            }

            ServiceInstance retrievedInstance = response.getServer();

            URI uri = exchange.getRequest().getURI();

            // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
            // if the loadbalancer doesn't provide one.
            String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
            if (schemePrefix != null) {
                overrideScheme = url.getScheme();
            }

            DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,
                    overrideScheme);

            URI requestUrl = reconstructURI(serviceInstance, uri);

            if (log.isTraceEnabled()) {
                log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
            }
            exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
            exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
        }).then(chain.filter(exchange))
                .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                                CompletionContext.Status.FAILED, throwable, lbRequest,
                                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
                .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                                CompletionContext.Status.SUCCESS, lbRequest,
                                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
                                new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
    }

NettyRoutingFilter 负责通讯 使用HttpClient进行请求的发送
@Override
    @SuppressWarnings("Duplicates")
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

        String scheme = requestUrl.getScheme();
        if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
            return chain.filter(exchange);
        }
        setAlreadyRouted(exchange);

        ServerHttpRequest request = exchange.getRequest();

        final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
        final String url = requestUrl.toASCIIString();

        HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        filtered.forEach(httpHeaders::set);

        boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

        Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
            headers.add(httpHeaders);
            // Will either be set below, or later by Netty
            headers.remove(HttpHeaders.HOST);
            if (preserveHost) {
                String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                headers.add(HttpHeaders.HOST, host);
            }
        }).request(method).uri(url).send((req, nettyOutbound) -> {
            if (log.isTraceEnabled()) {
                nettyOutbound.withConnection(connection -> log.trace("outbound route: "
                        + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix()));
            }
            return nettyOutbound.send(request.getBody().map(this::getByteBuf));
        }).responseConnection((res, connection) -> {

            // Defer committing the response until all route filters have run
            // Put client response as ServerWebExchange attribute and write
            // response later NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

            ServerHttpResponse response = exchange.getResponse();
            // put headers and status so filters can modify the response
            HttpHeaders headers = new HttpHeaders();

            res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {
                exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
            }

            setResponseStatus(res, response);

            // make sure headers filters run after setting status so it is
            // available in response
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
                    Type.RESPONSE);

            if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
                    && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                // It is not valid to have both the transfer-encoding header and
                // the content-length header.
                // Remove the transfer-encoding header in the response if the
                // content-length header is present.
                response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }

            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());

            response.getHeaders().putAll(filteredResponseHeaders);

            return Mono.just(res);
        });

        Duration responseTimeout = getResponseTimeout(route);
        if (responseTimeout != null) {
            responseFlux = responseFlux
                    .timeout(responseTimeout,
                            Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
                    .onErrorMap(TimeoutException.class,
                            th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
        }

        return responseFlux.then(chain.filter(exchange));
    }
NettyWriteResponseFilter 处理response
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
        // until the NettyRoutingFilter is run
        // @formatter:off
        return chain.filter(exchange)
                .doOnError(throwable -> cleanup(exchange))
                .then(Mono.defer(() -> {
                    Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

                    if (connection == null) {
                        return Mono.empty();
                    }
                    if (log.isTraceEnabled()) {
                        log.trace("NettyWriteResponseFilter start inbound: "
                                + connection.channel().id().asShortText() + ", outbound: "
                                + exchange.getLogPrefix());
                    }
                    ServerHttpResponse response = exchange.getResponse();

                    // TODO: needed?
                    final Flux<DataBuffer> body = connection
                            .inbound()
                            .receive()
                            .retain()
                            .map(byteBuf -> wrap(byteBuf, response));

                    MediaType contentType = null;
                    try {
                        contentType = response.getHeaders().getContentType();
                    }
                    catch (Exception e) {
                        if (log.isTraceEnabled()) {
                            log.trace("invalid media type", e);
                        }
                    }
                    return (isStreamingMediaType(contentType)
                            ? response.writeAndFlushWith(body.map(Flux::just))
                            : response.writeWith(body));
                })).doOnCancel(() -> cleanup(exchange));
        // @formatter:on
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容