gateway实战

  • 1.gateway基础介绍
  • 2.gateway跨域处理
  • 3.gateway动态路由
  • 4.gateway自定义局部拦截器和全局拦截器
  • 5.Weblux启动和gateway处理request源码分析
  • 6.自定义拦截器滑动窗口限流
  • 7.集成nacos灰度发布
  • 8.集成oauth2.0统一授权

1.gateway 介绍

spring cloud gatewya 是专门为微服务框架提供简单的api路由关键组件,它建立在 Spring Boot 2.x、 Spring WebFlux 和 Project Reactor之上 需要Spring Boot和Spring Webflux提供的Netty运行时。不能在传统的Servlet容器中工作,也不能以WAR的形式构建。

通常的作用如下:

  • 协议转换,路由转发
  • 流量聚合,对流量进行监控,日志输出
  • 作为整个系统的前端工程,对流量进行控制,有限流的作用
  • 作为系统的前端边界,外部流量只能通过网关才能访问系统
  • 可以在网关层做权限的判断
  • 可以在网关层做缓存

2.gateway 重要概念

路由(route):路由信息由ID,目的url,一组断言工厂和一组Filter组成,断言为true说明请求的url和配置的路由匹配
断言(predicates):gateway中断言函数类型是spring5.0框架中的ServerWebExchange,允许开发自定义规则匹配来自Http request的任何信息
过滤器(filter):标准的Spring webFilter 分为两种Gateway Filter和Global Filter

3.gateway 工作原理

image.png

1.客户端向 Spring Cloud Gateway 发出请求。
2.如果 Gateway Handler Mapping 找到与请求相匹配的路由,将其发送到 Gateway Web Handler。
3.Gateway Web Handler再通过指定的 过滤器链 来将请求发送到我们实际的服务执行业务逻辑,然后返回。
4.过滤器之间用虚线分开是因为过滤器可能会在发送代理请求之前(“pre”)或之后(“post”)执行业务逻辑。

4.简单案例

引入pom配置

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

4.1 快捷配置与完全展开配置

配置application文件

4.1.1 快捷配置

spring:
  cloud:
    gateway:
      routes:
      - id: after_route
        uri: https://example.org
        predicates:
        - Cookie=mycookie,mycookievalue

4.1.2 完全展开配置配置

spring:
  cloud:
    gateway:
      routes:
      - id: after_route
        uri: https://example.org
        predicates:
        - name: Cookie
          args:
            name: mycookie
            regexp: mycookievalue

4.2 路由断言predicates详解

predicates 的类型型有以下几种:

断言类型 说明
After 时间过滤,设定一个时间之后才能访问
Before 时间过滤,设定一个时间之前才能访问
Between 时间过滤,设定两个时间之间才能访问
Cookie Cookie 匹配过滤,cookie名和值过滤访问
Header 请求头过滤,请求头参数名与值过滤访问
Host 域名地址过滤
Method 请求方式过滤,get post
Path 路径匹配过滤
Query 请求参数过滤
RemoteAddr 主机ip过滤
Weight 权重过滤

常用配置规则如下

spring:
  cloud:
    gateway:
      routes:
      - id: after_route
        uri: http://localhost:8001/
        predicates:
       - After=2019-06-20T17:42:47.789-07:00[America/Denver]
#        时间之前路由
#      - Before=2019-06-20T17:42:47.789-07:00[America/Denver]

#        时间之间路由
#      - Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-                 07:00[America/Denver]

#        Cookie 匹配,cookei名为chocolate,值与ch.p正则表达式匹配的cookie
#      - Cookie=chocolate, ch.p

#       请求具有名称,X-Request-Id其值与\d+正则表达式匹配(具有一个或多个数字的值),则此路由匹配。
#      - Header=X-Request-Id, \d+

#       如果请求的Host标头具有值www.somehost.org或www.anotherhost.org,则此路由将匹配
#      - Host=**.somehost.org,**.anotherhost.org

#       所有get方法将匹配此规则
#      - Method=GET

#       
#      - Path=/foo/**,/bar/**

#       查询条件中,参数名包含baz的请求配置此规则
#      - Query=baz

4.3 gateway内置filter

filter 的类型常用的有以下几种:

filter类型 说明
AddRequestHeader 添加参赛到下游请求的header信息
AddRequestHeadersIfNotPresent 添加参赛到下游请求的header信息,多个逗号隔开,与 AddRequestHeader 不同的是,它只在header 信息不存在的情况下才会这样做。否则,客户端请求中的原始值将被发送
AddRequestParameter 添加请求参数到下游请求中
AddResponseHeader 返回结果中添加参数
CircuitBreaker gateway 断路器
CacheRequestBody 请求体缓存过滤器

中文官网地址

gateway 有很多内置filters ,可以在官网了解更多

5.Gateway实战

5.1 跨域

5.1.1 全局跨域处理

1.配置文件配置

spring:
  cloud:
    gateway:
      globalcors:
        cors-configurations:
          '[/**]':
            allowedOrigins: "https://docs.spring.io"
            allowedMethods:
            - GET

2.代码配置

@Configuration
public class CorsConfig {
    @Bean
    public CorsWebFilter corsFilter() {
        // 初始化cors配置对象
        CorsConfiguration config = new CorsConfiguration();
        //设置允许哪些url访问跨域资源,*表示全部允许
        config.addAllowedOrigin("*");
        //允许访问的头信息,*表示全部
        config.addAllowedHeader("*");
        //允许所有请求方法(GET,POST等)访问跨域资源
        config.addAllowedMethod("*");
        // 预检请求的缓存时间(秒),即在这个时间内,对于相同的跨域请求不会再预检了
        config.setMaxAge(18000L);
        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(new PathPatternParser());
        source.registerCorsConfiguration("/**", config);
        return new CorsWebFilter(source);
    }
}

5.1.2 路由跨域处理

spring:
  cloud:
    gateway:
      routes:
      - id: cors_route
        uri: https://example.org
        predicates:
        - Path=/service/**
        metadata:
          cors
            allowedOrigins: '*'
            allowedMethods:
              - GET
              - POST
            allowedHeaders: '*'
            maxAge: 30

5.2 动态路由

5.2.1 静态路由

gateway中,在配置文件中写死配置路由策略,即为静态路由,无法实时调整路由策略,需要修改配置文件,但是生产有时候需要调整路由策略或者增加,减少路由对象,但是又不想频繁的更改配置文件,而且配置文件没有办法进行系统管理,这时候就需要动态路由
配置文件静态配置如下

spring:
  cloud:
    gateway:
      routes:
      - id: nameRoot
        uri: https://www.provder.com
        predicates:
        - Path=/name/test/**
        filters:
        - StripPrefix=2

5.2.2 动态路由

实现路由最重要的三点
1.RouteDefinition 路由对象,封装了路由id,url,断言,filter等信息
2.RouteDefinitionRepository 路由信息缓存器,目前支持自带支持2种inMemoryRouteDefinitionRepository和RedisRouteDefinitionRepository,但是RedisRouteDefinitionRepository需要配置gateway.redis-route-definition-repository.enabled=true才能开启使用
3.ApplicationEventPublisher 路由刷新事件发布

下面是具体实现

/**
 * @author guangjie.liao
 * @Date: 2022/10/24 19:09
 * @Description: 动态路由实现
 */
@Slf4j
public class GateWayDynamicRoute implements ApplicationEventPublisherAware {
    /**
     * 路由缓存方式
     * 可以是InMemory 和 Redis(3.1.x版本后支持),在路由集群部署下建议使用Redis,
     * 使用InMemory 需要通知到各个节点才能全部刷新
     */
    private RouteDefinitionRepository repository;
    /**
     * 路由刷新事件
     */
    private ApplicationEventPublisher publisher;
    /**
     * 路由持久化
     * 通过读取mysql持久化的路由信息我们可以在后台配置化路由信息
     */
    private RouteRepository routeRepository;


    public GateWayDynamicRoute(RouteDefinitionRepository repository,
                               ApplicationEventPublisher publisher,
                               RouteRepository routeRepository){
        this.repository = repository;
        this.publisher = publisher;
        this.routeRepository = routeRepository;
    }

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }

    /**
     * 刷新路由
     * @return
     */
    public Mono<Void> refresh() {
        this.loadRoute();
        this.publisher.publishEvent(new RefreshRoutesEvent(this));
        return Mono.empty();
    }

    /**
     * 加载所有路由
     */
    public void loadRoute(){
        List<RouteDefinition> routeDefinitions = getRouteDefinition();
        //每次加载前把缓存的全部清空
        routeDefinitions.stream().forEach(routeDefinition -> {
            repository.delete(Mono.just(routeDefinition.getId())).subscribe();
        });
        routeDefinitions.stream().forEach(routeDefinition -> {
            repository.save(Mono.just(routeDefinition)).subscribe();
        });
        log.info("动态加载路由:{}",routeDefinitions.size());
    }

    /**
     * 构建 RouteDefinition
     * RouteDefinition 为路由对象,包含断言和filter
     * @return
     */
    private List<RouteDefinition> getRouteDefinition(){
        List<RouteDefinition> routeDefinitions = new ArrayList<>();

        List<GatewayRoute> routeList = routeRepository.selectAllRoute();
        if (CollectionUtils.isEmpty(routeList)){
            return routeDefinitions;
        }
        routeList.stream().forEach(route->{
            RouteDefinition routeDef =new RouteDefinition();
            routeDef.setPredicates(getPredicateDefs(route));
            routeDef.setFilters(getFilters(route));

            routeDef.setId(route.getRouteName());
            routeDef.setUri(getServerUri(route));
            routeDefinitions.add(routeDef);
        });
        return routeDefinitions;
    }

    /**
     * 构建断言
     * @param route
     * @return
     */
    private List<PredicateDefinition> getPredicateDefs(GatewayRoute route){
        List<PredicateDefinition> predicateDefinitions = new ArrayList<>();
        PredicateDefinition predicatePath = new PredicateDefinition();
        Map<String, String> predicatePathParams = new HashMap<>(8);
        predicatePath.setName("Path");
        predicatePathParams.put("name", route.getRouteName());
        predicatePathParams.put("pattern", route.getPath());
        predicatePathParams.put("pathPattern", route.getPath());
        predicatePath.setArgs(predicatePathParams);
        predicateDefinitions.add(predicatePath);
        return predicateDefinitions;
    }

    /**
     * 构建 filter
     * @param route
     * @return
     */
    private List<FilterDefinition> getFilters(GatewayRoute route){
        List<FilterDefinition> filterDefinitions = new ArrayList<>();

        FilterDefinition retryDefinition = new FilterDefinition();
        Map<String, String> retryParams = new HashMap<>(8);
        retryDefinition.setName("Retry");
        retryParams.put("exceptions", "java.lang.Exception");
        retryDefinition.setArgs(retryParams);

        FilterDefinition stripPrefixDefinition = new FilterDefinition();
        Map<String, String> stripPrefixParams = new HashMap<>(8);
        stripPrefixDefinition.setName("StripPrefix");
        stripPrefixParams.put(NameUtils.generateName(0), route.getStripPrefix()+"");
        stripPrefixDefinition.setArgs(stripPrefixParams);

        filterDefinitions.add(retryDefinition);
        filterDefinitions.add(stripPrefixDefinition);
        return filterDefinitions;
    }

    /**
     * 构建路由地址
     * @param route
     * @return
     */
    private URI getServerUri(GatewayRoute route){
        String urlStr = route.getUrl() + route.getServiceId() ;
        URI uri = UriComponentsBuilder.fromUriString(urlStr).build().toUri();
        return uri;
    }
}

5.3 动态路由实现原理

image.png

5.3.1 Gateway的服务启动

5.3.1.1 SpringApplication#run方法

SpringApplication.run方法是springboot项目服务启动的入口,在这里会初始化很多配置,如下代码,在这里我们主要关注refreshContext 方法,因为这个方法深入跟踪会看到我们gateway处理客户端请求的HttpWebHandlerAdapter,handlerMapping,webHandler,filter以及GatewayAutoConfiguration是如何初始化的。这里会有相关springboot服务启动过程的内容,所有有另外一篇springboot 启动过程介绍整个启动过程。下面看完这个run方法,就进入refreshContext 方法的介绍。

public ConfigurableApplicationContext run(String... args) {
        //省略部分代码
        try {
             //省略部分代码
             refreshContext(context);
             // 刷新后操作
             afterRefresh(context, applicationArguments);
              //省略部分代码
              // 事件广播 启动完成了
              listeners.started(context);
              //执行程序启动后需要执行的任务
              callRunners(context, applicationArguments);
          }
        //省略部分代码
        return context;
    }

5.3.1.3 SpringApplication#refreshContext方法

refreshContext方法同样定义在ApplicationContext类中,如下

private void refreshContext(ConfigurableApplicationContext context) {
    if (this.registerShutdownHook) {
        //向JVM运行时注册一个关闭挂钩,在JVM关闭时关闭此上下文时回调此方法
        shutdownHook.registerApplicationContext(context);
    }
    refresh(context);
}

5.3.1.4 SpringApplication#refresh方法

下一步就进入到了这个refresh方法中,也定义在ApplicationContext中,如下

protected void refresh(ConfigurableApplicationContext applicationContext) {
    applicationContext.refresh();
}

5.3.1.5 ConfigurableApplicationContext #refresh方法

ConfigurableApplicationContext 有三个实现类,分别是

  • AbstractApplicationContext
    它实现了ConfigurableApplicationContext接口,提供了refresh方法的具体实现
  • ReactiveWebServerApplicationContext
    WebFlux容器实现,它实现了ConfigurableApplicationContext接口,同时继承了AbstractApplicationContext ,它提供的refresh方法也是调用了super.refresh()
  • ServletWebServerApplicationContext
    SpringMvc容器实现,它实现了ConfigurableApplicationContext接口,同时继承了AbstractApplicationContext ,它提供的refresh方法也是调用了super.refresh()
image.png

所以上面的refresh方法就是调用了AbstractApplicationContext 的refresh方法,下面进入这个方法

5.3.1.6 AbstractApplicationContext #refresh方法

@Override
    public void refresh() throws BeansException, IllegalStateException {
        synchronized (this.startupShutdownMonitor) {
            //省略部分代码
            try {
                // 创建web容器
                onRefresh();
                //注册监听器
                registerListeners();
                //完成bean初始化
                finishBeanFactoryInitialization(beanFactory);
                //刷新,启动web容器
                finishRefresh();
            }
            catch (BeansException ex) {
                //省略部分代码
            }
                //省略部分代码
        }
    }

5.3.1.7 AbstractApplicationContext #onRefresh方法

AbstractApplicationContext 里面的onRefresh是个空的方法,具体的实现在其子类里面,由于gateway使用的是webflux组件,所以这里的onRefresh走的是ReactiveWebServerApplicationContext的onRefresh方法,如下

protected void onRefresh() throws BeansException {
    // For subclasses: do nothing by default.
}

5.3.1.8 ReactiveWebServerApplicationContext#onRefresh方法

protected void onRefresh() {
    super.onRefresh();
    try {
        createWebServer();
    }
    catch (Throwable ex) {
        throw new ApplicationContextException("Unable to start reactive web server", ex);
    }
}

5.3.1.9 ReactiveWebServerApplicationContext#createWebServer方法

private void createWebServer() {
    WebServerManager serverManager = this.serverManager;
    if (serverManager == null) {
        StartupStep createWebServer = this.getApplicationStartup().start("spring.boot.webserver.create");
        //获取创建web容器的工具类名字
        String webServerFactoryBeanName = getWebServerFactoryBeanName();
        // 通过bean的名称获取具体实例
        ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
        createWebServer.tag("factory", webServerFactory.getClass().toString());
        boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
        //注入ReactiveWebServerFactory 工具类和getHttpHandler 方法构建一个WebServerManager对象,
        //web容器的启动关闭都由它管理
        this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
        getBeanFactory().registerSingleton("webServerGracefulShutdown",
                new WebServerGracefulShutdownLifecycle(this.serverManager.getWebServer()));
        getBeanFactory().registerSingleton("webServerStartStop",
                    new WebServerStartStopLifecycle(this.serverManager));
        createWebServer.end();
    }
        initPropertySources();
}

说明

  • ReactiveWebServerFactory
    它有三个实现如下图,getWebServerFactoryBeanName()放里面获ReactiveWebServerFactory 所有实现类,返回数组中的第一个,即TomcatReactiveWebServerFactory,web容器具体的初始化内容都在各自的工具类中,如tomcat就在TomcatReactiveWebServerFactory中,netty就在NettyReactiveWebServerFactory等等。


    image.png
  • getHttpHandler

  1. getHttpHandler方法获取HttpHandler接口下面实现类,即使HttpWebHandlerAdapter
  2. getHttpHandler()方法定义在ReactiveWebServerApplicationContext,如下代码,但是在这里通过Supplier<HttpHandler >将这个方法传递到WebServerManager函数中,在调用Supplier的get方法的地方真正的执行getHttpHandler()。

5.3.1.10 ReactiveWebServerApplicationContext#createWebServer方法

protected HttpHandler getHttpHandler() {
    // 找到HttpHandler接口下面所有实现类
    String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
    //没有找到抛异常
    if (beanNames.length == 0) {
        throw new ApplicationContextException(
                "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
    }
    //大于1个也抛异常
    if (beanNames.length > 1) {
        throw new ApplicationContextException(
                "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
                            + StringUtils.arrayToCommaDelimitedString(beanNames));
    }
       //有且只有一个的时候返回
    return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
}

5.3.1.11 WebServerManager

WebServerManager 中部分代码

    WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
            Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
        this.applicationContext = applicationContext;
        Assert.notNull(factory, "Factory must not be null");
        //这个handlerSupplier就是上面传递进来的getHttpHandler() 方法,把他传递到这个 
        //DelayedInitializationHttpHandler内部类中
        this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
        //构建web容器,在这里传入了handler对象,并且这个getWebServer方法中还初始化了一个TomcatHttpHandlerAdapter对象,它继承了ServletHttpHandlerAdapter适配器
        this.webServer = factory.getWebServer(this.handler);
    }
    //启动web容器的方法
    void start() {
        this.handler.initializeHandler();
        this.webServer.start();
        this.applicationContext
                .publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
    }

    static final class DelayedInitializationHttpHandler implements HttpHandler {
        private final Supplier<HttpHandler> handlerSupplier;
        private final boolean lazyInit;
        private volatile HttpHandler delegate = this::handleUninitialized;
        private DelayedInitializationHttpHandler(Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
            this.handlerSupplier = handlerSupplier;
            this.lazyInit = lazyInit;
        }
        //省略部分代码
        @Override
        public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
            return this.delegate.handle(request, response);
        }
        //this.handlerSupplier.get() 真正调用getHttpHandler()地方,到这里HttpHandler被初始化
        void initializeHandler() {
            this.delegate = this.lazyInit ? new LazyHttpHandler(this.handlerSupplier) : this.handlerSupplier.get();
        }
        //省略部分代码
    }

ReactiveWebServerFactory#getWebServer方法
ReactiveWebServerFactory接口的getWebServer方法有多个实现类,如下图,WebFlux使用的是TomcatReactiveWebServerFactory。

image.png

以下是TomcatReactiveWebServerFactory实现的getWebServer方法

@Override
    public WebServer getWebServer(HttpHandler httpHandler) {
        if (this.disableMBeanRegistry) {
            Registry.disableRegistry();
        }
        //初始化tomcat对象,并配置tomcat连接信息
        Tomcat tomcat = new Tomcat();
        File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
        tomcat.setBaseDir(baseDir.getAbsolutePath());
        for (LifecycleListener listener : this.serverLifecycleListeners) {
            tomcat.getServer().addLifecycleListener(listener);
        }
        Connector connector = new Connector(this.protocol);
        connector.setThrowOnFailure(true);
        tomcat.getService().addConnector(connector);
        customizeConnector(connector);
        tomcat.setConnector(connector);
        tomcat.getHost().setAutoDeploy(false);
        configureEngine(tomcat.getEngine());
        for (Connector additionalConnector : this.additionalTomcatConnectors) {
            tomcat.getService().addConnector(additionalConnector);
        }
        //初始http请求处理适配器,并将httpHandler传进去,TomcatHttpHandlerAdapter 继承了ServletHttpHandlerAdapter适配器。
        TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);
        prepareContext(tomcat.getHost(), servlet);
        return getTomcatWebServer(tomcat);
    }

在WebServerManager的构造函数执行完后,HttpWebHandlerAdapter被初始化,AbstractApplicationContext #refresh方法真个链路也就结束了,下一个是AbstractApplicationContext #finishBeanFactoryInitialization

5.3.1.12 AbstractApplicationContext#finishBeanFactoryInitialization

finishBeanFactoryInitialization是所有bean初始化的入口包括自动装配的配置类,在这里对这个类的初始化不过多展开,最主看看GatewayAutoConfiguration,WebFluxConfigurationSupport,HttpHandlerAutoConfiguration这几个类初始化的内容, 看看在gateway请求执行过程中涉及到的节点,了解了这些类的初始化节点在梳理处理过程更不会显得那么突兀,不知道怎么来的。

5.3.1.13 GatewayAutoConfiguration

  • 1.RouteLocatorBuilder 将配置的路由信息转化成Route对象的工具类
  • 2.RouteLocator 定义路由获取方法的接口
  • 3.RouteDefinitionRouteLocator 将路由配置信息转化成Rout对象的具体实现
  • 4.RouteDefinitionRepository 定义路由信息存储的接口,实现可以是基于内存InMemoryRouteDefinitionRepository,基于redis存储的RedisRouteDefinitionRepository(在GatewayRedisAutoConfiguration里面初始化),基于配置文件的PropertiesRouteDefinitionLocator
  • 5.RouteRefreshListener 路由事件监听
  • 6.RoutePredicateHandlerMapping 匹配路由信息,校验断言,判断路由是否可用
  • 7.FilteringWebHandler 过滤器链,处理filtter
  • 8.Filter 具体filter如RouteToRequestUrlFilter,NettyRoutingFilter等等

源码如下

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class, WebFluxAutoConfiguration.class })
@AutoConfigureAfter({ GatewayReactiveLoadBalancerClientAutoConfiguration.class,
        GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {
    //省略部分代码
    @Bean
    public RouteLocatorBuilder routeLocatorBuilder(ConfigurableApplicationContext context) {
        return new RouteLocatorBuilder(context);
    }
        
    @Bean
    @ConditionalOnMissingBean(RouteDefinitionRepository.class)
    public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
        return new InMemoryRouteDefinitionRepository();
    }

    @Bean
    public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
            List<GatewayFilterFactory> gatewayFilters, List<RoutePredicateFactory> predicates,
            RouteDefinitionLocator routeDefinitionLocator, ConfigurationService configurationService) {
        return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates, gatewayFilters, properties,
                configurationService);
    }

    @Bean
    @ConditionalOnClass(name = "org.springframework.cloud.client.discovery.event.HeartbeatMonitor")
    public RouteRefreshListener routeRefreshListener(ApplicationEventPublisher publisher) {
        return new RouteRefreshListener(publisher);
    }

    @Bean
    public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
        return new FilteringWebHandler(globalFilters);
    }
    @Bean
    @ConditionalOnMissingBean
    public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler,
            RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {
        return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
    }
    //将route信息转换成请求地址
    @Bean
    @ConditionalOnEnabledGlobalFilter
    public RouteToRequestUrlFilter routeToRequestUrlFilter() {
        return new RouteToRequestUrlFilter();
    }
}

5.3.1.14 WebFluxConfigurationSupport

WebFluxConfigurationSupport的初始化由子类DelegatingWebFluxConfiguration实现自动初始化,它主要初始化的有

  • 1.DispatcherHandler 主要遍历处理已经初始化好的HandlerMapping,这个在request请求中会调用到

    1. WebExceptionHandler 异常处理类
  • 3.RequestMappingHandlerMapping 请求映射处理类

  • 4.ResponseBodyResultHandler 请求返回结果处理类等等。

源码如下

public class WebFluxConfigurationSupport implements ApplicationContextAware {
    @Bean
    public DispatcherHandler webHandler() {
        return new DispatcherHandler();
    }
    @Bean
    @Order(0)
    public WebExceptionHandler responseStatusExceptionHandler() {
        return new WebFluxResponseStatusExceptionHandler();
    }

    protected RequestMappingHandlerMapping createRequestMappingHandlerMapping() {
        return new RequestMappingHandlerMapping();
    }
    @Bean
    public RequestMappingHandlerMapping requestMappingHandlerMapping(
            @Qualifier("webFluxContentTypeResolver") RequestedContentTypeResolver contentTypeResolver) {

        RequestMappingHandlerMapping mapping = createRequestMappingHandlerMapping();
        mapping.setOrder(0);
        mapping.setContentTypeResolver(contentTypeResolver);
        PathMatchConfigurer configurer = getPathMatchConfigurer();
        configureAbstractHandlerMapping(mapping, configurer);
        Map<String, Predicate<Class<?>>> pathPrefixes = configurer.getPathPrefixes();
        if (pathPrefixes != null) {
            mapping.setPathPrefixes(pathPrefixes);
        }

        return mapping;
    }
    @Bean
    public ResponseEntityResultHandler responseEntityResultHandler(
            @Qualifier("webFluxAdapterRegistry") ReactiveAdapterRegistry reactiveAdapterRegistry,
            ServerCodecConfigurer serverCodecConfigurer,
            @Qualifier("webFluxContentTypeResolver") RequestedContentTypeResolver contentTypeResolver) {

        return new ResponseEntityResultHandler(serverCodecConfigurer.getWriters(),
                contentTypeResolver, reactiveAdapterRegistry);
    }
}

5.3.1.15 HttpHandlerAutoConfiguration

这个类最主要是初始化了一个HttpHandler类并且将一个初始化好的WebHandler放到HttpHandler对象里面,如下代码,在applicationContext方法中做构建HttpHandler的一些前置工作,如获取实例化好的WebHandler对象,在build方法中实例化HttpWebHandlerAdapter对象,同时将初始化好的DispatcherHandler放进去

@Bean
public HttpHandler httpHandler(ObjectProvider<WebFluxProperties> propsProvider) {
    HttpHandler httpHandler = WebHttpHandlerBuilder.applicationContext(this.applicationContext).build();
    WebFluxProperties properties = propsProvider.getIfAvailable();
    if (properties != null && StringUtils.hasText(properties.getBasePath())) {
        Map<String, HttpHandler> handlersMap = Collections.singletonMap(properties.getBasePath(), httpHandler);
        return new ContextPathCompositeHandler(handlersMap);
    }
    return httpHandler;
}

WebHttpHandlerBuilder

public static WebHttpHandlerBuilder applicationContext(ApplicationContext context) {
  //初始化WebHttpHandlerBuilder 对象,并且传入实例化好的WebHandler(DispatcherHandler)对象
  WebHttpHandlerBuilder builder = new WebHttpHandlerBuilder(
                context.getBean(WEB_HANDLER_BEAN_NAME, WebHandler.class), context);
  //初始化所有WebFilter
  List<WebFilter> webFilters = context
                .getBeanProvider(WebFilter.class)
                .orderedStream()
                .collect(Collectors.toList());
        builder.filters(filters -> filters.addAll(webFilters));
  //省略部分代码
}

public HttpHandler build() {
   //将已经初始化好的webHandler,和上面方法获取的所有filters放入FilteringWebHandler中
   WebHandler decorated = new FilteringWebHandler(this.webHandler, this.filters);
   decorated = new ExceptionHandlingWebHandler(decorated,  this.exceptionHandlers);
   //将上面实例化好的DispatcherHandler对象放到HttpWebHandlerAdapter对象里面
   HttpWebHandlerAdapter adapted = new HttpWebHandlerAdapter(decorated);
}

看完以上代码到这里,我们了解到了

    1. AbstractApplicationContext #refresh方法中的onRefresh()初始化了Web容器,并创建WebServerManager作为容器启动,关闭的管理类,在创建WebServerManager对象时,传入了一个this::getHttpHandler方法,这个方法的目的是给WebServerManager对象初始化一个HttpHandler类,但是这个方法不会立马执行,因为它是一个Supplier<HttpHandler>参数,在调用get方法真正执行,这个get方法在WebServerManager#start方法内调用initializeHandler()方法执行
  • 2.AbstractApplicationContext #refresh方法中的finishBeanFactoryInitialization(beanFactory)方法初始了所有的配置类这其中有GatewayAutoConfiguration,WebFluxConfigurationSupport和HttpHandlerAutoConfiguration

  • 3.上面三者中第一个初始化的是GatewayAutoConfiguration类,它主要配置了gateway的路由信息转化,存储,刷新和路由处理的HandlerMapping,WebHandler还有Filter等信息,其次是WebFluxConfigurationSupport,它主要初始化了一个WebHandler(DispatcherHandler),它的作用是遍历所有handler Mapping进行处理,最后是HttpHandlerAutoConfiguration 初始化了HttpWebHandlerAdapter对象,并且将DispatcherHandler放进去。

5.3.1.16 AbstractApplicationContext #finishRefresh()

在这个finishRefres方法中最终会调用WebServerManager的start方法去启动容器,如下

void start() {
    this.handler.initializeHandler();
    this.webServer.start();
    this.applicationContext
                .publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
}

void initializeHandler() {
    //真正调用AbstractApplicationContext #getHttpHandler方法的地方,之所有在这里调是因为到这里的时候上面HttpWebHandlerAdapter才初始化完成了
    this.delegate = this.lazyInit ? new LazyHttpHandler(this.handlerSupplier) : this.handlerSupplier.get();
}

5.3.2 Gateway的request请求

5.3.2.1 ServletHttpHandlerAdapter#service

ServletHttpHandlerAdapter 实现Servlet接口,在service方法中将ServletRequest和ServletResponse转化成了ServletServerHttpRequest和ServerHttpResponse,并调用了httpHandler的handler方法,实际上ServletHttpHandlerAdapter 是通过在上面讲到的TomcatHttpHandlerAdapter 被调用的,因为TomcatHttpHandlerAdapter 继承了它,并且初始化Web容器的时候创建的是TomcatHttpHandlerAdapter 对象

@Override
public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException {
  //省略部分代码
  ServletServerHttpRequest httpRequest;
  try {
    httpRequest = createRequest(((HttpServletRequest) request), asyncContext);
     //省略部分代码
  }
  ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext, httpRequest);
  //省略部分代码
  this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);
}

这个httpHandler就是初始化Web容器管理类WebServerManager构造函数中factory.getWebServer(this.handler)时传入的DelayedInitializationHttpHandler,它通过TomcatReactiveWebServerFactory类的getWebServer方法中new TomcatHttpHandlerAdapter(httpHandler)传入,而DelayedInitializationHttpHandler的delegate对象正是handlerSupplier.get()获取的HttpWebHandlerAdapter对象。

5.3.2.2 HttpWebHandlerAdapter#handle

public HttpWebHandlerAdapter(WebHandler delegate) {
    super(delegate);
}
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
    //省略部分代码
    ServerWebExchange exchange = createExchange(request, response);
    //省略部分代码
    //调用父类getDelegate()获取WebHandler 对象并调用handle方法
    return getDelegate().handle(exchange)
                .doOnSuccess(aVoid -> logResponse(exchange))
                .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
                .then(Mono.defer(response::setComplete));
}

HttpWebHandlerAdapter 的构造函数我们在看HttpHandlerAutoConfiguration配置类时,在初始化HttpWebHandlerAdapter 对象时,调用了WebHttpHandlerBuilder的applicationContext方法和build方法将DispatcherHandler传入了HttpWebHandlerAdapter 的构造函数,所以这里的这个getDelegate()实际上就是获取DispatcherHandler对象并调用

5.3.2.3 DispatcherHandler#handle

public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {
    @Nullable
    private List<HandlerMapping> handlerMappings;
    @Nullable
    private List<HandlerAdapter> handlerAdapters;

    @Nullable
    private List<HandlerResultHandler> resultHandlers;
    protected void initStrategies(ApplicationContext context) {
       //获取所有HandlerMapping接口已经实例化好的实现类
        Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerMapping.class, true, false);

        ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
        AnnotationAwareOrderComparator.sort(mappings);
        this.handlerMappings = Collections.unmodifiableList(mappings);
        //省略部分代码
    }


    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
            return handlePreFlight(exchange);
        }
        //遍历所有handlerMapping,并调用其getHandler方法,
        //在获得返回的handler对象后,放入invokeHandler方法执行
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }

    private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
        if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
            return Mono.empty();  // CORS rejection
        }
        if (this.handlerAdapters != null) {
            for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
                //选用能够处理handler的适配器调用handler对象的handle方法
                if (handlerAdapter.supports(handler)) {
                    return handlerAdapter.handle(exchange, handler);
                }
            }
        }
        return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
    }
}

handle方法内遍历handlerMappings,调用getHandler实际上是通过RoutePredicateHandlerMapping对象调用了其父类AbstractHandlerMapping的getHandler方法

5.3.2.4 AbstractHandlerMapping#getHandler

@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
    return getHandlerInternal(exchange).map(handler -> {
        //省略部分代码
        ServerHttpRequest request = exchange.getRequest();
        //跨域处理
        if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
            CorsConfiguration config = (this.corsConfigurationSource != null ?
                    this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
            CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
            config = (config != null ? config.combine(handlerConfig) : handlerConfig);
            if (config != null) {
                config.validateAllowCredentials();
            }
            if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
                return NO_OP_HANDLER;
            }
        }
        return handler;
    });
}

在AbstractHandlerMapping中定义了一个抽象的getHandlerInternal方法,他的具体实现在子类RoutePredicateHandlerMapping中,它主要是获取路由对象Route放到exchange中,并返回一个FilteringWebHandler对象。

5.3.2.5 RoutePredicateHandlerMapping#getHandlerInternal

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
  private final RouteLocator routeLocator;
  private final FilteringWebHandler webHandler;
  @Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        //省略部分代码
        //获取路由Route对象
        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }
                    //路由Route对象放入到exchange对象中
                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                   //返回FilteringWebHandler 对象
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }

    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        //调用routeLocator获取Route对象
        return this.routeLocator.getRoutes()
                .concatMap(route -> Mono.just(route).filterWhen(r -> {
                    // add the current route we are testing
                    exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                    //校验路由是否满足断言要求
                    return r.getPredicate().apply(exchange);
                })
                        // instead of immediately stopping main flux due to error, log and
                        // swallow it
                        .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
                        .onErrorResume(e -> Mono.empty()))
                // .defaultIfEmpty() put a static Route not found
                // or .switchIfEmpty()
                // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
                .next()
                // TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    //校验路由,这个方法默认是个空方法,需要校验的话需要子类覆盖这个方法实现
                    validateRoute(route, exchange);
                    return route;
                });
    }
}

5.3.2.6 RouteDefinitionRouteLocator#getRoutes

public class RouteDefinitionRouteLocator implements RouteLocator {
    private final RouteDefinitionLocator routeDefinitionLocator;

    @Override
    public Flux<Route> getRoutes() {
        //获取路由RouteDefinition对象,并通过convertToRoute方法转换成Route对象
        Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
        if (!gatewayProperties.isFailOnRouteDefinitionError()) {
            // instead of letting error bubble up, continue
            routes = routes.onErrorContinue((error, obj) -> {
                if (logger.isWarnEnabled()) {
                    logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()
                            + " will be ignored. Definition has invalid configs, " + error.getMessage());
                }
            });
        }
        return routes.map(route -> {
            if (logger.isDebugEnabled()) {
                logger.debug("RouteDefinition matched: " + route.getId());
            }
            return route;
        });
    }
    //将RouteDefinition对象转换成Route对象
    private Route convertToRoute(RouteDefinition routeDefinition) {
        AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
        List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
        return Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters).build();
    }
}

5.3.2.7 RouteDefinitionLocator#getRouteDefinitions

RouteDefinitionLocator接口的getRouteDefinitions方法有多个类的实现,如下图
[图片上传失败...(image-4a21bc-1692157918199)]
RouteDefinitionRepository继承RouteDefinitionLocator接口,RedisRouteDefinitionRepository和InMemoryRouteDefinitionRepository实现RouteDefinitionRepository接口,它两就是我们在配置或数据库读取到的路由配置经过getRouteDefinitions方法转换成RouteDefinition对象后的存储地方,基于内存或redis

5.3.2.8 DispatcherHandler#invokeHandler

上面从DispatcherHandler#handle进入一直到RouteDefinitionRouteLocator的getRoutes方法这个方法链路结束。返回到DispatcherHandler的invokeHandler方法,这个方法的参数handler是RoutePredicateHandlerMapping的getHandlerInternal方法执行后返回的FilteringWebHandler 对象

private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
        if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
            return Mono.empty();  // CORS rejection
        }
        if (this.handlerAdapters != null) {
            //遍历所有适配器,获取能够执行FilteringWebHandler 的适配器
            for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
                if (handlerAdapter.supports(handler)) {
                    return handlerAdapter.handle(exchange, handler);
                }
            }
        }
        return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
    }

5.3.2.9 FilteringWebHandler#handle

public class FilteringWebHandler extends WebHandlerDecorator {
    private final List<GatewayFilter> globalFilters;
    //构造函数传入所有GatewayFilter
    public FilteringWebHandler(List<GlobalFilter> globalFilters) {
        this.globalFilters = loadFilters(globalFilters);
    }
    //将所有GlobalFilter对象转GatewayFilterAdapter对象,利用GatewayFilterAdapter对象执行filter方法
    private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
        return filters.stream().map(filter -> {
            GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
            if (filter instanceof Ordered) {
                int order = ((Ordered) filter).getOrder();
                return new OrderedGatewayFilter(gatewayFilter, order);
            }
            return gatewayFilter;
        }).collect(Collectors.toList());
    }

    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        //将Route路由配置的Filter加入到gatewayFilters集合里面
        List<GatewayFilter> gatewayFilters = route.getFilters();

        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        combined.addAll(gatewayFilters);
        //排序
        AnnotationAwareOrderComparator.sort(combined);
        //省略部分代码
        //创建DefaultGatewayFilterChain对象,传入所有filter,调用fiter方法
        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }

  private static class DefaultGatewayFilterChain implements GatewayFilterChain {
        private final int index;
        private final List<GatewayFilter> filters;
        DefaultGatewayFilterChain(List<GatewayFilter> filters) {
            this.filters = filters;
            this.index = 0;
        }
        private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
            this.filters = parent.getFilters();
            this.index = index;
        }
        public List<GatewayFilter> getFilters() {
            return filters;
        }
        //fitlter方法,利用DefaultGatewayFilterChain 实现递归调用所有GatewayFilterAdapter的filter方法
        @Override
        public Mono<Void> filter(ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < filters.size()) {
                    GatewayFilter filter = filters.get(this.index);
                    DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
                    return filter.filter(exchange, chain);
                }
                else {
                    return Mono.empty(); // complete
                }
            });
        }

    }
 private static class GatewayFilterAdapter implements GatewayFilter {
        private final GlobalFilter delegate;
        GatewayFilterAdapter(GlobalFilter delegate) {
            this.delegate = delegate;
        }
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            return this.delegate.filter(exchange, chain);
        }
        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");
            sb.append("delegate=").append(delegate);
            sb.append('}');
            return sb.toString();
        }
    }
}

5.3.2.10 GlobalFilter

GlobalFilter 的实现有很多,这里介绍几个常用的实现。

  • 1.NettyRoutingFilter 利用HttpClientConnect发出请求
  • 2.NettyWriteResponseFilter 请求结果处理
  • 3.ReactiveLoadBalancerClientFilter 请求负载均衡处理
  • 4.RouteToRequestUrlFilter 将路由对象转换成真正请求的url

5.4 自定义拦截器

5.4.1 自定义局部过滤器

代码

/**
 * @author: guangjie.liao
 * @Date: 2023/8/16 13:48
 * @Description: 局部过滤的命名固定为 【前缀+GatewayFilterFactory】
 */
@Component
public class PartGatewayFilterFactory extends AbstractGatewayFilterFactory<PartGatewayFilterFactory.Config> {

    private static String FAIL_URL = "http://localhost:8001/test/fail";
    /**
     * 重新父类apply方法,使用Config接收配置参数
     * @param config
     * @return
     */
    @Override
    public GatewayFilter apply(Config config) {
        //这里可以做具体的拦截处理,如限流,规则校验
        return (exchange, chain) -> {
            ServerHttpRequest request = exchange.getRequest().mutate().build();
            URI uri = exchange.getRequest().getURI();
            String query = uri.getRawQuery();
            //请求地址中如果不包括token惨,就跳转到失败地址
            if (query.indexOf("token") == -1){
                request = exchange.getRequest().mutate().uri(URI.create(FAIL_URL)).build();
                return chain.filter(exchange.mutate().request(request).build());
            }
            return chain.filter(exchange.mutate().request(request).build());
        };
    }

    /**
     * 指定配置参数接收顺序
     * @return
     */
    @Override
    public List<String> shortcutFieldOrder() {
        return Arrays.asList("arg1","arg2","arg3","arg4","arg5");
    }

    /**
     * 定义配置参数接收字段
     */
    @Data
    public static class Config{
        private Object arg1;
        private Object arg2;
        private Object arg3;
        private Object arg4;
        private Object arg5;
    }
}

使用配置

spring:
  cloud:
    gateway:
      routes:
      - id: microServiceGateway
        # lb 表示从服务注册中心获取服务名称
        uri: lb://microServiceCustomer
        #uri: http://localhost:8001
        predicates:
        - Path=/test/**
        #自定义Filter
        filters:
          #   Part对应PartGatewayFilterFactory 类前缀
          - name: Part
            #args对应PreGatewayFilterFactory 内部类Config的字段
            args:
             args1: 1
             args2: 2
             args3: 3
             args4: 4
             args5: 5

5.4.2 自定义全局过滤器

/**
 * @author: guangjie.liao
 * @Date: 2022/11/7 18:46
 * @Description: 滑动窗口限流
 */
@Component
public class SlidingWindowRateLimiter implements GatewayFilter {

    @Autowired
    private RedisTemplate redisTemplate;
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String requestPath = request.getPath().toString();
        String ip = request.getRemoteAddress().getHostString();

        String key = ip + ":" + requestPath;
        if (!isActionAllow(key,System.currentTimeMillis(),5,10)){
            exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
            exchange.getResponse().setComplete();
        }
        return chain.filter(exchange);
    }
    /**
     *
     * @param key 键值Key
     * @param obj 此对象在set中不能重复,如有重复会更新score
     * @param timeWin 时间窗口
     * @param maxCount 每次时间窗口容忍的最大请求数量
     * @return
     */
    public boolean isActionAllow(String key, Object obj, int timeWin, int maxCount) {
        if (StringUtils.isEmpty(key) || null == obj || timeWin == 0 || maxCount == 0) {
            return false;
        }
        long curTime = System.currentTimeMillis();
        ZSetOperations opsForZSet = redisTemplate.opsForZSet();
        opsForZSet.add(key, curTime, curTime);
        opsForZSet.removeRangeByScore(key, 0, curTime - (timeWin * 1000));
        Long count = opsForZSet.zCard(key);
        redisTemplate.expire(key, timeWin, TimeUnit.SECONDS);
        return count <= maxCount;
    }
}

5.5 集成Nacos实现灰度发布

5.5.1 nacos配置灰度环境服务版本号和测试用户

添加gray_white.yaml文件配置

gray:
    users:
        - test
        - admin
    productVersion: 1
    grayscaleVersion: 2

5.5.2 自定义负载均衡策略

@Slf4j
public class GrayscaleServiceInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    private GrayscaleWhiteListUser grayWhiteListUser;

    /**
     * @param serviceInstanceListSupplierProvider a provider of
     * {@link ServiceInstanceListSupplier} that will be used to get available instances
     */
    public GrayscaleServiceInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
                                                GrayscaleWhiteListUser grayWhiteListUser) {
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        this.grayWhiteListUser = grayWhiteListUser;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        DefaultRequest req = (DefaultRequest) request;
        RequestDataContext context = (RequestDataContext) req.getContext();
        RequestData requestData = context.getClientRequest();
        List<String> usernames = requestData.getHeaders().get("username");
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
                .getIfAvailable(NoopServiceInstanceListSupplier::new);
        return supplier.get(request).next()
                .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances,usernames));
    }

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
                                                              List<ServiceInstance> serviceInstances,
                                                              List<String> usernames) {
        Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances,usernames);
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,List<String> usernames) {
        if (instances.isEmpty()) {
            return new EmptyResponse();
        }
        List<ServiceInstance> grayscaleServers = new ArrayList<>();
        List<ServiceInstance> productServers = new ArrayList<>();
        for (ServiceInstance serverInfo : instances) {
            //获取nacos配置元素据
            NacosServiceInstance nacosServer = (NacosServiceInstance) serverInfo;
            String version = nacosServer.getMetadata().get("version");
            //根据版本号区分灰度还是生产
            if (!StringUtils.isEmpty(version) && version.equals(grayWhiteListUser.getGrayscaleVersion())) {
                grayscaleServers.add(nacosServer);
            }else{
                productServers.add(nacosServer);
            }
        }
        //请求头配置白名单用户,没有配置则访问生产环境
        if (!CollectionUtils.isEmpty(usernames)){
            String username = usernames.get(0);
            //配置的灰度环境服务不为空并且当前配置用户在白名单中
            if (!CollectionUtils.isEmpty(grayscaleServers) && grayWhiteListUser.getUsers().contains(username)){
                return getDefaultResponse(grayscaleServers);
            }
        }
        return getDefaultResponse(productServers);
    }

    private DefaultResponse getDefaultResponse(List<ServiceInstance> servers){
        //打乱随机取一个,也可以设置其他策略
        Collections.shuffle(servers);
        ServiceInstance server = servers.get(0);
        return new DefaultResponse(server);
    }

}

5.5.3 配置负载均衡策略初始化

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class GrayscaleConfiguration {

    @Value("${spring.application.name}")
    private String serverName;

    @Autowired
    private GrayscaleWhiteListUser grayWhiteListUser;

    @Bean
    @ConditionalOnMissingBean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(LoadBalancerClientFactory loadBalancerClientFactory) {
        return new GrayscaleServiceInstanceLoadBalancer(loadBalancerClientFactory.getLazyProvider(serverName, ServiceInstanceListSupplier.class),grayWhiteListUser);
    }
}

5.5.4 启动类配置负载均衡策略

@EnableDiscoveryClient
@SpringBootApplication(scanBasePackages = "com.lgj.**")
@LoadBalancerClients(defaultConfiguration = GrayscaleServiceInstanceLoadBalancer.class)
public class GatewayServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(GatewayServerApplication.class, args);
    }
}

5.6 集成oauth2 统一鉴权认证

5.6.1 引入oauth2.0

<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-security</artifactId>
     <version>2.2.4.RELEASE</version>
</dependency>
<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-oauth2</artifactId>
     <version>2.2.4.RELEASE</version>
</dependency>

5.6.2 初始化TokenStore配置

@Configuration
public class AuthenticationConfig {
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Bean
    public TokenStore tokenStore() {
        return new RedisTokenStore(redisConnectionFactory);
    }
}

5.6.3 配置全局拦截器拦截token校验

@Component
public class AuthenticationFilter implements GlobalFilter, Ordered{
    @Autowired
    private TokenStore tokenStore;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        String url = exchange.getRequest().getURI().toString();
        //过滤请求授权服务的接口
        if (url.indexOf("/auth") > -1){
            return chain.filter(exchange);
        }
        OAuth2AccessToken auth2AccessToken = tokenStore.readAccessToken(getToken(exchange));
        if (auth2AccessToken == null){
            throw new BusinessException(ResultCode.UN_AUTHORIZED);
        }
        //扩展信息
        //Map<String,Object> param = auth2AccessToken.getAdditionalInformation();
        return chain.filter(exchange);
    }

    /**
     * 获取token
     */
    private String getToken(ServerWebExchange exchange) {
        String tokenStr = exchange.getRequest().getHeaders().getFirst("Authorization");
        if (StringUtils.isBlank(tokenStr)) {
            return null;
        }
        String token = tokenStr.split(" ")[1];
        if (StringUtils.isBlank(token)) {
            return null;
        }
        return token;
    }


    @Override
    public int getOrder() {
        return -1;
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容