- 1.gateway基础介绍
- 2.gateway跨域处理
- 3.gateway动态路由
- 4.gateway自定义局部拦截器和全局拦截器
- 5.Weblux启动和gateway处理request源码分析
- 6.自定义拦截器滑动窗口限流
- 7.集成nacos灰度发布
- 8.集成oauth2.0统一授权
1.gateway 介绍
spring cloud gatewya 是专门为微服务框架提供简单的api路由关键组件,它建立在 Spring Boot 2.x、 Spring WebFlux 和 Project Reactor之上 需要Spring Boot和Spring Webflux提供的Netty运行时。不能在传统的Servlet容器中工作,也不能以WAR的形式构建。
通常的作用如下:
- 协议转换,路由转发
- 流量聚合,对流量进行监控,日志输出
- 作为整个系统的前端工程,对流量进行控制,有限流的作用
- 作为系统的前端边界,外部流量只能通过网关才能访问系统
- 可以在网关层做权限的判断
- 可以在网关层做缓存
2.gateway 重要概念
路由(route):路由信息由ID,目的url,一组断言工厂和一组Filter组成,断言为true说明请求的url和配置的路由匹配
断言(predicates):gateway中断言函数类型是spring5.0框架中的ServerWebExchange,允许开发自定义规则匹配来自Http request的任何信息
过滤器(filter):标准的Spring webFilter 分为两种Gateway Filter和Global Filter
3.gateway 工作原理
1.客户端向 Spring Cloud Gateway 发出请求。
2.如果 Gateway Handler Mapping 找到与请求相匹配的路由,将其发送到 Gateway Web Handler。
3.Gateway Web Handler再通过指定的 过滤器链 来将请求发送到我们实际的服务执行业务逻辑,然后返回。
4.过滤器之间用虚线分开是因为过滤器可能会在发送代理请求之前(“pre”)或之后(“post”)执行业务逻辑。
4.简单案例
引入pom配置
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
4.1 快捷配置与完全展开配置
配置application文件
4.1.1 快捷配置
spring:
cloud:
gateway:
routes:
- id: after_route
uri: https://example.org
predicates:
- Cookie=mycookie,mycookievalue
4.1.2 完全展开配置配置
spring:
cloud:
gateway:
routes:
- id: after_route
uri: https://example.org
predicates:
- name: Cookie
args:
name: mycookie
regexp: mycookievalue
4.2 路由断言predicates详解
predicates 的类型型有以下几种:
断言类型 | 说明 |
---|---|
After | 时间过滤,设定一个时间之后才能访问 |
Before | 时间过滤,设定一个时间之前才能访问 |
Between | 时间过滤,设定两个时间之间才能访问 |
Cookie | Cookie 匹配过滤,cookie名和值过滤访问 |
Header | 请求头过滤,请求头参数名与值过滤访问 |
Host | 域名地址过滤 |
Method | 请求方式过滤,get post |
Path | 路径匹配过滤 |
Query | 请求参数过滤 |
RemoteAddr | 主机ip过滤 |
Weight | 权重过滤 |
常用配置规则如下
spring:
cloud:
gateway:
routes:
- id: after_route
uri: http://localhost:8001/
predicates:
- After=2019-06-20T17:42:47.789-07:00[America/Denver]
# 时间之前路由
# - Before=2019-06-20T17:42:47.789-07:00[America/Denver]
# 时间之间路由
# - Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789- 07:00[America/Denver]
# Cookie 匹配,cookei名为chocolate,值与ch.p正则表达式匹配的cookie
# - Cookie=chocolate, ch.p
# 请求具有名称,X-Request-Id其值与\d+正则表达式匹配(具有一个或多个数字的值),则此路由匹配。
# - Header=X-Request-Id, \d+
# 如果请求的Host标头具有值www.somehost.org或www.anotherhost.org,则此路由将匹配
# - Host=**.somehost.org,**.anotherhost.org
# 所有get方法将匹配此规则
# - Method=GET
#
# - Path=/foo/**,/bar/**
# 查询条件中,参数名包含baz的请求配置此规则
# - Query=baz
4.3 gateway内置filter
filter 的类型常用的有以下几种:
filter类型 | 说明 |
---|---|
AddRequestHeader | 添加参赛到下游请求的header信息 |
AddRequestHeadersIfNotPresent | 添加参赛到下游请求的header信息,多个逗号隔开,与 AddRequestHeader 不同的是,它只在header 信息不存在的情况下才会这样做。否则,客户端请求中的原始值将被发送 |
AddRequestParameter | 添加请求参数到下游请求中 |
AddResponseHeader | 返回结果中添加参数 |
CircuitBreaker | gateway 断路器 |
CacheRequestBody | 请求体缓存过滤器 |
gateway 有很多内置filters ,可以在官网了解更多
5.Gateway实战
5.1 跨域
5.1.1 全局跨域处理
1.配置文件配置
spring:
cloud:
gateway:
globalcors:
cors-configurations:
'[/**]':
allowedOrigins: "https://docs.spring.io"
allowedMethods:
- GET
2.代码配置
@Configuration
public class CorsConfig {
@Bean
public CorsWebFilter corsFilter() {
// 初始化cors配置对象
CorsConfiguration config = new CorsConfiguration();
//设置允许哪些url访问跨域资源,*表示全部允许
config.addAllowedOrigin("*");
//允许访问的头信息,*表示全部
config.addAllowedHeader("*");
//允许所有请求方法(GET,POST等)访问跨域资源
config.addAllowedMethod("*");
// 预检请求的缓存时间(秒),即在这个时间内,对于相同的跨域请求不会再预检了
config.setMaxAge(18000L);
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(new PathPatternParser());
source.registerCorsConfiguration("/**", config);
return new CorsWebFilter(source);
}
}
5.1.2 路由跨域处理
spring:
cloud:
gateway:
routes:
- id: cors_route
uri: https://example.org
predicates:
- Path=/service/**
metadata:
cors
allowedOrigins: '*'
allowedMethods:
- GET
- POST
allowedHeaders: '*'
maxAge: 30
5.2 动态路由
5.2.1 静态路由
gateway中,在配置文件中写死配置路由策略,即为静态路由,无法实时调整路由策略,需要修改配置文件,但是生产有时候需要调整路由策略或者增加,减少路由对象,但是又不想频繁的更改配置文件,而且配置文件没有办法进行系统管理,这时候就需要动态路由
配置文件静态配置如下
spring:
cloud:
gateway:
routes:
- id: nameRoot
uri: https://www.provder.com
predicates:
- Path=/name/test/**
filters:
- StripPrefix=2
5.2.2 动态路由
实现路由最重要的三点
1.RouteDefinition 路由对象,封装了路由id,url,断言,filter等信息
2.RouteDefinitionRepository 路由信息缓存器,目前支持自带支持2种inMemoryRouteDefinitionRepository和RedisRouteDefinitionRepository,但是RedisRouteDefinitionRepository需要配置gateway.redis-route-definition-repository.enabled=true才能开启使用
3.ApplicationEventPublisher 路由刷新事件发布
下面是具体实现
/**
* @author guangjie.liao
* @Date: 2022/10/24 19:09
* @Description: 动态路由实现
*/
@Slf4j
public class GateWayDynamicRoute implements ApplicationEventPublisherAware {
/**
* 路由缓存方式
* 可以是InMemory 和 Redis(3.1.x版本后支持),在路由集群部署下建议使用Redis,
* 使用InMemory 需要通知到各个节点才能全部刷新
*/
private RouteDefinitionRepository repository;
/**
* 路由刷新事件
*/
private ApplicationEventPublisher publisher;
/**
* 路由持久化
* 通过读取mysql持久化的路由信息我们可以在后台配置化路由信息
*/
private RouteRepository routeRepository;
public GateWayDynamicRoute(RouteDefinitionRepository repository,
ApplicationEventPublisher publisher,
RouteRepository routeRepository){
this.repository = repository;
this.publisher = publisher;
this.routeRepository = routeRepository;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
/**
* 刷新路由
* @return
*/
public Mono<Void> refresh() {
this.loadRoute();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return Mono.empty();
}
/**
* 加载所有路由
*/
public void loadRoute(){
List<RouteDefinition> routeDefinitions = getRouteDefinition();
//每次加载前把缓存的全部清空
routeDefinitions.stream().forEach(routeDefinition -> {
repository.delete(Mono.just(routeDefinition.getId())).subscribe();
});
routeDefinitions.stream().forEach(routeDefinition -> {
repository.save(Mono.just(routeDefinition)).subscribe();
});
log.info("动态加载路由:{}",routeDefinitions.size());
}
/**
* 构建 RouteDefinition
* RouteDefinition 为路由对象,包含断言和filter
* @return
*/
private List<RouteDefinition> getRouteDefinition(){
List<RouteDefinition> routeDefinitions = new ArrayList<>();
List<GatewayRoute> routeList = routeRepository.selectAllRoute();
if (CollectionUtils.isEmpty(routeList)){
return routeDefinitions;
}
routeList.stream().forEach(route->{
RouteDefinition routeDef =new RouteDefinition();
routeDef.setPredicates(getPredicateDefs(route));
routeDef.setFilters(getFilters(route));
routeDef.setId(route.getRouteName());
routeDef.setUri(getServerUri(route));
routeDefinitions.add(routeDef);
});
return routeDefinitions;
}
/**
* 构建断言
* @param route
* @return
*/
private List<PredicateDefinition> getPredicateDefs(GatewayRoute route){
List<PredicateDefinition> predicateDefinitions = new ArrayList<>();
PredicateDefinition predicatePath = new PredicateDefinition();
Map<String, String> predicatePathParams = new HashMap<>(8);
predicatePath.setName("Path");
predicatePathParams.put("name", route.getRouteName());
predicatePathParams.put("pattern", route.getPath());
predicatePathParams.put("pathPattern", route.getPath());
predicatePath.setArgs(predicatePathParams);
predicateDefinitions.add(predicatePath);
return predicateDefinitions;
}
/**
* 构建 filter
* @param route
* @return
*/
private List<FilterDefinition> getFilters(GatewayRoute route){
List<FilterDefinition> filterDefinitions = new ArrayList<>();
FilterDefinition retryDefinition = new FilterDefinition();
Map<String, String> retryParams = new HashMap<>(8);
retryDefinition.setName("Retry");
retryParams.put("exceptions", "java.lang.Exception");
retryDefinition.setArgs(retryParams);
FilterDefinition stripPrefixDefinition = new FilterDefinition();
Map<String, String> stripPrefixParams = new HashMap<>(8);
stripPrefixDefinition.setName("StripPrefix");
stripPrefixParams.put(NameUtils.generateName(0), route.getStripPrefix()+"");
stripPrefixDefinition.setArgs(stripPrefixParams);
filterDefinitions.add(retryDefinition);
filterDefinitions.add(stripPrefixDefinition);
return filterDefinitions;
}
/**
* 构建路由地址
* @param route
* @return
*/
private URI getServerUri(GatewayRoute route){
String urlStr = route.getUrl() + route.getServiceId() ;
URI uri = UriComponentsBuilder.fromUriString(urlStr).build().toUri();
return uri;
}
}
5.3 动态路由实现原理
5.3.1 Gateway的服务启动
5.3.1.1 SpringApplication#run方法
SpringApplication.run方法是springboot项目服务启动的入口,在这里会初始化很多配置,如下代码,在这里我们主要关注refreshContext 方法,因为这个方法深入跟踪会看到我们gateway处理客户端请求的HttpWebHandlerAdapter,handlerMapping,webHandler,filter以及GatewayAutoConfiguration是如何初始化的。这里会有相关springboot服务启动过程的内容,所有有另外一篇springboot 启动过程介绍整个启动过程。下面看完这个run方法,就进入refreshContext 方法的介绍。
public ConfigurableApplicationContext run(String... args) {
//省略部分代码
try {
//省略部分代码
refreshContext(context);
// 刷新后操作
afterRefresh(context, applicationArguments);
//省略部分代码
// 事件广播 启动完成了
listeners.started(context);
//执行程序启动后需要执行的任务
callRunners(context, applicationArguments);
}
//省略部分代码
return context;
}
5.3.1.3 SpringApplication#refreshContext方法
refreshContext方法同样定义在ApplicationContext类中,如下
private void refreshContext(ConfigurableApplicationContext context) {
if (this.registerShutdownHook) {
//向JVM运行时注册一个关闭挂钩,在JVM关闭时关闭此上下文时回调此方法
shutdownHook.registerApplicationContext(context);
}
refresh(context);
}
5.3.1.4 SpringApplication#refresh方法
下一步就进入到了这个refresh方法中,也定义在ApplicationContext中,如下
protected void refresh(ConfigurableApplicationContext applicationContext) {
applicationContext.refresh();
}
5.3.1.5 ConfigurableApplicationContext #refresh方法
ConfigurableApplicationContext 有三个实现类,分别是
- AbstractApplicationContext
它实现了ConfigurableApplicationContext接口,提供了refresh方法的具体实现 - ReactiveWebServerApplicationContext
WebFlux容器实现,它实现了ConfigurableApplicationContext接口,同时继承了AbstractApplicationContext ,它提供的refresh方法也是调用了super.refresh() - ServletWebServerApplicationContext
SpringMvc容器实现,它实现了ConfigurableApplicationContext接口,同时继承了AbstractApplicationContext ,它提供的refresh方法也是调用了super.refresh()
所以上面的refresh方法就是调用了AbstractApplicationContext 的refresh方法,下面进入这个方法
5.3.1.6 AbstractApplicationContext #refresh方法
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
//省略部分代码
try {
// 创建web容器
onRefresh();
//注册监听器
registerListeners();
//完成bean初始化
finishBeanFactoryInitialization(beanFactory);
//刷新,启动web容器
finishRefresh();
}
catch (BeansException ex) {
//省略部分代码
}
//省略部分代码
}
}
5.3.1.7 AbstractApplicationContext #onRefresh方法
AbstractApplicationContext 里面的onRefresh是个空的方法,具体的实现在其子类里面,由于gateway使用的是webflux组件,所以这里的onRefresh走的是ReactiveWebServerApplicationContext的onRefresh方法,如下
protected void onRefresh() throws BeansException {
// For subclasses: do nothing by default.
}
5.3.1.8 ReactiveWebServerApplicationContext#onRefresh方法
protected void onRefresh() {
super.onRefresh();
try {
createWebServer();
}
catch (Throwable ex) {
throw new ApplicationContextException("Unable to start reactive web server", ex);
}
}
5.3.1.9 ReactiveWebServerApplicationContext#createWebServer方法
private void createWebServer() {
WebServerManager serverManager = this.serverManager;
if (serverManager == null) {
StartupStep createWebServer = this.getApplicationStartup().start("spring.boot.webserver.create");
//获取创建web容器的工具类名字
String webServerFactoryBeanName = getWebServerFactoryBeanName();
// 通过bean的名称获取具体实例
ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
createWebServer.tag("factory", webServerFactory.getClass().toString());
boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
//注入ReactiveWebServerFactory 工具类和getHttpHandler 方法构建一个WebServerManager对象,
//web容器的启动关闭都由它管理
this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
getBeanFactory().registerSingleton("webServerGracefulShutdown",
new WebServerGracefulShutdownLifecycle(this.serverManager.getWebServer()));
getBeanFactory().registerSingleton("webServerStartStop",
new WebServerStartStopLifecycle(this.serverManager));
createWebServer.end();
}
initPropertySources();
}
说明
-
ReactiveWebServerFactory
它有三个实现如下图,getWebServerFactoryBeanName()放里面获ReactiveWebServerFactory 所有实现类,返回数组中的第一个,即TomcatReactiveWebServerFactory,web容器具体的初始化内容都在各自的工具类中,如tomcat就在TomcatReactiveWebServerFactory中,netty就在NettyReactiveWebServerFactory等等。
getHttpHandler
- getHttpHandler方法获取HttpHandler接口下面实现类,即使HttpWebHandlerAdapter
- getHttpHandler()方法定义在ReactiveWebServerApplicationContext,如下代码,但是在这里通过Supplier<HttpHandler >将这个方法传递到WebServerManager函数中,在调用Supplier的get方法的地方真正的执行getHttpHandler()。
5.3.1.10 ReactiveWebServerApplicationContext#createWebServer方法
protected HttpHandler getHttpHandler() {
// 找到HttpHandler接口下面所有实现类
String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
//没有找到抛异常
if (beanNames.length == 0) {
throw new ApplicationContextException(
"Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
}
//大于1个也抛异常
if (beanNames.length > 1) {
throw new ApplicationContextException(
"Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
+ StringUtils.arrayToCommaDelimitedString(beanNames));
}
//有且只有一个的时候返回
return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
}
5.3.1.11 WebServerManager
WebServerManager 中部分代码
WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
this.applicationContext = applicationContext;
Assert.notNull(factory, "Factory must not be null");
//这个handlerSupplier就是上面传递进来的getHttpHandler() 方法,把他传递到这个
//DelayedInitializationHttpHandler内部类中
this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
//构建web容器,在这里传入了handler对象,并且这个getWebServer方法中还初始化了一个TomcatHttpHandlerAdapter对象,它继承了ServletHttpHandlerAdapter适配器
this.webServer = factory.getWebServer(this.handler);
}
//启动web容器的方法
void start() {
this.handler.initializeHandler();
this.webServer.start();
this.applicationContext
.publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
}
static final class DelayedInitializationHttpHandler implements HttpHandler {
private final Supplier<HttpHandler> handlerSupplier;
private final boolean lazyInit;
private volatile HttpHandler delegate = this::handleUninitialized;
private DelayedInitializationHttpHandler(Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
this.handlerSupplier = handlerSupplier;
this.lazyInit = lazyInit;
}
//省略部分代码
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return this.delegate.handle(request, response);
}
//this.handlerSupplier.get() 真正调用getHttpHandler()地方,到这里HttpHandler被初始化
void initializeHandler() {
this.delegate = this.lazyInit ? new LazyHttpHandler(this.handlerSupplier) : this.handlerSupplier.get();
}
//省略部分代码
}
ReactiveWebServerFactory#getWebServer方法
ReactiveWebServerFactory接口的getWebServer方法有多个实现类,如下图,WebFlux使用的是TomcatReactiveWebServerFactory。
以下是TomcatReactiveWebServerFactory实现的getWebServer方法
@Override
public WebServer getWebServer(HttpHandler httpHandler) {
if (this.disableMBeanRegistry) {
Registry.disableRegistry();
}
//初始化tomcat对象,并配置tomcat连接信息
Tomcat tomcat = new Tomcat();
File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
tomcat.setBaseDir(baseDir.getAbsolutePath());
for (LifecycleListener listener : this.serverLifecycleListeners) {
tomcat.getServer().addLifecycleListener(listener);
}
Connector connector = new Connector(this.protocol);
connector.setThrowOnFailure(true);
tomcat.getService().addConnector(connector);
customizeConnector(connector);
tomcat.setConnector(connector);
tomcat.getHost().setAutoDeploy(false);
configureEngine(tomcat.getEngine());
for (Connector additionalConnector : this.additionalTomcatConnectors) {
tomcat.getService().addConnector(additionalConnector);
}
//初始http请求处理适配器,并将httpHandler传进去,TomcatHttpHandlerAdapter 继承了ServletHttpHandlerAdapter适配器。
TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);
prepareContext(tomcat.getHost(), servlet);
return getTomcatWebServer(tomcat);
}
在WebServerManager的构造函数执行完后,HttpWebHandlerAdapter被初始化,AbstractApplicationContext #refresh方法真个链路也就结束了,下一个是AbstractApplicationContext #finishBeanFactoryInitialization
5.3.1.12 AbstractApplicationContext#finishBeanFactoryInitialization
finishBeanFactoryInitialization是所有bean初始化的入口包括自动装配的配置类,在这里对这个类的初始化不过多展开,最主看看GatewayAutoConfiguration,WebFluxConfigurationSupport,HttpHandlerAutoConfiguration这几个类初始化的内容, 看看在gateway请求执行过程中涉及到的节点,了解了这些类的初始化节点在梳理处理过程更不会显得那么突兀,不知道怎么来的。
5.3.1.13 GatewayAutoConfiguration
- 1.RouteLocatorBuilder 将配置的路由信息转化成Route对象的工具类
- 2.RouteLocator 定义路由获取方法的接口
- 3.RouteDefinitionRouteLocator 将路由配置信息转化成Rout对象的具体实现
- 4.RouteDefinitionRepository 定义路由信息存储的接口,实现可以是基于内存InMemoryRouteDefinitionRepository,基于redis存储的RedisRouteDefinitionRepository(在GatewayRedisAutoConfiguration里面初始化),基于配置文件的PropertiesRouteDefinitionLocator
- 5.RouteRefreshListener 路由事件监听
- 6.RoutePredicateHandlerMapping 匹配路由信息,校验断言,判断路由是否可用
- 7.FilteringWebHandler 过滤器链,处理filtter
- 8.Filter 具体filter如RouteToRequestUrlFilter,NettyRoutingFilter等等
源码如下
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class, WebFluxAutoConfiguration.class })
@AutoConfigureAfter({ GatewayReactiveLoadBalancerClientAutoConfiguration.class,
GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {
//省略部分代码
@Bean
public RouteLocatorBuilder routeLocatorBuilder(ConfigurableApplicationContext context) {
return new RouteLocatorBuilder(context);
}
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
@Bean
public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
List<GatewayFilterFactory> gatewayFilters, List<RoutePredicateFactory> predicates,
RouteDefinitionLocator routeDefinitionLocator, ConfigurationService configurationService) {
return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates, gatewayFilters, properties,
configurationService);
}
@Bean
@ConditionalOnClass(name = "org.springframework.cloud.client.discovery.event.HeartbeatMonitor")
public RouteRefreshListener routeRefreshListener(ApplicationEventPublisher publisher) {
return new RouteRefreshListener(publisher);
}
@Bean
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
return new FilteringWebHandler(globalFilters);
}
@Bean
@ConditionalOnMissingBean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {
return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
}
//将route信息转换成请求地址
@Bean
@ConditionalOnEnabledGlobalFilter
public RouteToRequestUrlFilter routeToRequestUrlFilter() {
return new RouteToRequestUrlFilter();
}
}
5.3.1.14 WebFluxConfigurationSupport
WebFluxConfigurationSupport的初始化由子类DelegatingWebFluxConfiguration实现自动初始化,它主要初始化的有
1.DispatcherHandler 主要遍历处理已经初始化好的HandlerMapping,这个在request请求中会调用到
- WebExceptionHandler 异常处理类
3.RequestMappingHandlerMapping 请求映射处理类
4.ResponseBodyResultHandler 请求返回结果处理类等等。
源码如下
public class WebFluxConfigurationSupport implements ApplicationContextAware {
@Bean
public DispatcherHandler webHandler() {
return new DispatcherHandler();
}
@Bean
@Order(0)
public WebExceptionHandler responseStatusExceptionHandler() {
return new WebFluxResponseStatusExceptionHandler();
}
protected RequestMappingHandlerMapping createRequestMappingHandlerMapping() {
return new RequestMappingHandlerMapping();
}
@Bean
public RequestMappingHandlerMapping requestMappingHandlerMapping(
@Qualifier("webFluxContentTypeResolver") RequestedContentTypeResolver contentTypeResolver) {
RequestMappingHandlerMapping mapping = createRequestMappingHandlerMapping();
mapping.setOrder(0);
mapping.setContentTypeResolver(contentTypeResolver);
PathMatchConfigurer configurer = getPathMatchConfigurer();
configureAbstractHandlerMapping(mapping, configurer);
Map<String, Predicate<Class<?>>> pathPrefixes = configurer.getPathPrefixes();
if (pathPrefixes != null) {
mapping.setPathPrefixes(pathPrefixes);
}
return mapping;
}
@Bean
public ResponseEntityResultHandler responseEntityResultHandler(
@Qualifier("webFluxAdapterRegistry") ReactiveAdapterRegistry reactiveAdapterRegistry,
ServerCodecConfigurer serverCodecConfigurer,
@Qualifier("webFluxContentTypeResolver") RequestedContentTypeResolver contentTypeResolver) {
return new ResponseEntityResultHandler(serverCodecConfigurer.getWriters(),
contentTypeResolver, reactiveAdapterRegistry);
}
}
5.3.1.15 HttpHandlerAutoConfiguration
这个类最主要是初始化了一个HttpHandler类并且将一个初始化好的WebHandler放到HttpHandler对象里面,如下代码,在applicationContext方法中做构建HttpHandler的一些前置工作,如获取实例化好的WebHandler对象,在build方法中实例化HttpWebHandlerAdapter对象,同时将初始化好的DispatcherHandler放进去
@Bean
public HttpHandler httpHandler(ObjectProvider<WebFluxProperties> propsProvider) {
HttpHandler httpHandler = WebHttpHandlerBuilder.applicationContext(this.applicationContext).build();
WebFluxProperties properties = propsProvider.getIfAvailable();
if (properties != null && StringUtils.hasText(properties.getBasePath())) {
Map<String, HttpHandler> handlersMap = Collections.singletonMap(properties.getBasePath(), httpHandler);
return new ContextPathCompositeHandler(handlersMap);
}
return httpHandler;
}
WebHttpHandlerBuilder
public static WebHttpHandlerBuilder applicationContext(ApplicationContext context) {
//初始化WebHttpHandlerBuilder 对象,并且传入实例化好的WebHandler(DispatcherHandler)对象
WebHttpHandlerBuilder builder = new WebHttpHandlerBuilder(
context.getBean(WEB_HANDLER_BEAN_NAME, WebHandler.class), context);
//初始化所有WebFilter
List<WebFilter> webFilters = context
.getBeanProvider(WebFilter.class)
.orderedStream()
.collect(Collectors.toList());
builder.filters(filters -> filters.addAll(webFilters));
//省略部分代码
}
public HttpHandler build() {
//将已经初始化好的webHandler,和上面方法获取的所有filters放入FilteringWebHandler中
WebHandler decorated = new FilteringWebHandler(this.webHandler, this.filters);
decorated = new ExceptionHandlingWebHandler(decorated, this.exceptionHandlers);
//将上面实例化好的DispatcherHandler对象放到HttpWebHandlerAdapter对象里面
HttpWebHandlerAdapter adapted = new HttpWebHandlerAdapter(decorated);
}
看完以上代码到这里,我们了解到了
- AbstractApplicationContext #refresh方法中的onRefresh()初始化了Web容器,并创建WebServerManager作为容器启动,关闭的管理类,在创建WebServerManager对象时,传入了一个this::getHttpHandler方法,这个方法的目的是给WebServerManager对象初始化一个HttpHandler类,但是这个方法不会立马执行,因为它是一个Supplier<HttpHandler>参数,在调用get方法真正执行,这个get方法在WebServerManager#start方法内调用initializeHandler()方法执行
2.AbstractApplicationContext #refresh方法中的finishBeanFactoryInitialization(beanFactory)方法初始了所有的配置类这其中有GatewayAutoConfiguration,WebFluxConfigurationSupport和HttpHandlerAutoConfiguration
3.上面三者中第一个初始化的是GatewayAutoConfiguration类,它主要配置了gateway的路由信息转化,存储,刷新和路由处理的HandlerMapping,WebHandler还有Filter等信息,其次是WebFluxConfigurationSupport,它主要初始化了一个WebHandler(DispatcherHandler),它的作用是遍历所有handler Mapping进行处理,最后是HttpHandlerAutoConfiguration 初始化了HttpWebHandlerAdapter对象,并且将DispatcherHandler放进去。
5.3.1.16 AbstractApplicationContext #finishRefresh()
在这个finishRefres方法中最终会调用WebServerManager的start方法去启动容器,如下
void start() {
this.handler.initializeHandler();
this.webServer.start();
this.applicationContext
.publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
}
void initializeHandler() {
//真正调用AbstractApplicationContext #getHttpHandler方法的地方,之所有在这里调是因为到这里的时候上面HttpWebHandlerAdapter才初始化完成了
this.delegate = this.lazyInit ? new LazyHttpHandler(this.handlerSupplier) : this.handlerSupplier.get();
}
5.3.2 Gateway的request请求
5.3.2.1 ServletHttpHandlerAdapter#service
ServletHttpHandlerAdapter 实现Servlet接口,在service方法中将ServletRequest和ServletResponse转化成了ServletServerHttpRequest和ServerHttpResponse,并调用了httpHandler的handler方法,实际上ServletHttpHandlerAdapter 是通过在上面讲到的TomcatHttpHandlerAdapter 被调用的,因为TomcatHttpHandlerAdapter 继承了它,并且初始化Web容器的时候创建的是TomcatHttpHandlerAdapter 对象
@Override
public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException {
//省略部分代码
ServletServerHttpRequest httpRequest;
try {
httpRequest = createRequest(((HttpServletRequest) request), asyncContext);
//省略部分代码
}
ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext, httpRequest);
//省略部分代码
this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);
}
这个httpHandler就是初始化Web容器管理类WebServerManager构造函数中factory.getWebServer(this.handler)时传入的DelayedInitializationHttpHandler,它通过TomcatReactiveWebServerFactory类的getWebServer方法中new TomcatHttpHandlerAdapter(httpHandler)传入,而DelayedInitializationHttpHandler的delegate对象正是handlerSupplier.get()获取的HttpWebHandlerAdapter对象。
5.3.2.2 HttpWebHandlerAdapter#handle
public HttpWebHandlerAdapter(WebHandler delegate) {
super(delegate);
}
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
//省略部分代码
ServerWebExchange exchange = createExchange(request, response);
//省略部分代码
//调用父类getDelegate()获取WebHandler 对象并调用handle方法
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
HttpWebHandlerAdapter 的构造函数我们在看HttpHandlerAutoConfiguration配置类时,在初始化HttpWebHandlerAdapter 对象时,调用了WebHttpHandlerBuilder的applicationContext方法和build方法将DispatcherHandler传入了HttpWebHandlerAdapter 的构造函数,所以这里的这个getDelegate()实际上就是获取DispatcherHandler对象并调用
5.3.2.3 DispatcherHandler#handle
public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {
@Nullable
private List<HandlerMapping> handlerMappings;
@Nullable
private List<HandlerAdapter> handlerAdapters;
@Nullable
private List<HandlerResultHandler> resultHandlers;
protected void initStrategies(ApplicationContext context) {
//获取所有HandlerMapping接口已经实例化好的实现类
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false);
ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
AnnotationAwareOrderComparator.sort(mappings);
this.handlerMappings = Collections.unmodifiableList(mappings);
//省略部分代码
}
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
//遍历所有handlerMapping,并调用其getHandler方法,
//在获得返回的handler对象后,放入invokeHandler方法执行
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
return Mono.empty(); // CORS rejection
}
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
//选用能够处理handler的适配器调用handler对象的handle方法
if (handlerAdapter.supports(handler)) {
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
}
handle方法内遍历handlerMappings,调用getHandler实际上是通过RoutePredicateHandlerMapping对象调用了其父类AbstractHandlerMapping的getHandler方法
5.3.2.4 AbstractHandlerMapping#getHandler
@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
return getHandlerInternal(exchange).map(handler -> {
//省略部分代码
ServerHttpRequest request = exchange.getRequest();
//跨域处理
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
CorsConfiguration config = (this.corsConfigurationSource != null ?
this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (config != null) {
config.validateAllowCredentials();
}
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return NO_OP_HANDLER;
}
}
return handler;
});
}
在AbstractHandlerMapping中定义了一个抽象的getHandlerInternal方法,他的具体实现在子类RoutePredicateHandlerMapping中,它主要是获取路由对象Route放到exchange中,并返回一个FilteringWebHandler对象。
5.3.2.5 RoutePredicateHandlerMapping#getHandlerInternal
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
private final RouteLocator routeLocator;
private final FilteringWebHandler webHandler;
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
//省略部分代码
//获取路由Route对象
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
//路由Route对象放入到exchange对象中
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
//返回FilteringWebHandler 对象
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
//调用routeLocator获取Route对象
return this.routeLocator.getRoutes()
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
//校验路由是否满足断言要求
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
//校验路由,这个方法默认是个空方法,需要校验的话需要子类覆盖这个方法实现
validateRoute(route, exchange);
return route;
});
}
}
5.3.2.6 RouteDefinitionRouteLocator#getRoutes
public class RouteDefinitionRouteLocator implements RouteLocator {
private final RouteDefinitionLocator routeDefinitionLocator;
@Override
public Flux<Route> getRoutes() {
//获取路由RouteDefinition对象,并通过convertToRoute方法转换成Route对象
Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
if (!gatewayProperties.isFailOnRouteDefinitionError()) {
// instead of letting error bubble up, continue
routes = routes.onErrorContinue((error, obj) -> {
if (logger.isWarnEnabled()) {
logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()
+ " will be ignored. Definition has invalid configs, " + error.getMessage());
}
});
}
return routes.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});
}
//将RouteDefinition对象转换成Route对象
private Route convertToRoute(RouteDefinition routeDefinition) {
AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
return Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters).build();
}
}
5.3.2.7 RouteDefinitionLocator#getRouteDefinitions
RouteDefinitionLocator接口的getRouteDefinitions方法有多个类的实现,如下图
[图片上传失败...(image-4a21bc-1692157918199)]
RouteDefinitionRepository继承RouteDefinitionLocator接口,RedisRouteDefinitionRepository和InMemoryRouteDefinitionRepository实现RouteDefinitionRepository接口,它两就是我们在配置或数据库读取到的路由配置经过getRouteDefinitions方法转换成RouteDefinition对象后的存储地方,基于内存或redis
5.3.2.8 DispatcherHandler#invokeHandler
上面从DispatcherHandler#handle进入一直到RouteDefinitionRouteLocator的getRoutes方法这个方法链路结束。返回到DispatcherHandler的invokeHandler方法,这个方法的参数handler是RoutePredicateHandlerMapping的getHandlerInternal方法执行后返回的FilteringWebHandler 对象
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
return Mono.empty(); // CORS rejection
}
if (this.handlerAdapters != null) {
//遍历所有适配器,获取能够执行FilteringWebHandler 的适配器
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
5.3.2.9 FilteringWebHandler#handle
public class FilteringWebHandler extends WebHandlerDecorator {
private final List<GatewayFilter> globalFilters;
//构造函数传入所有GatewayFilter
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);
}
//将所有GlobalFilter对象转GatewayFilterAdapter对象,利用GatewayFilterAdapter对象执行filter方法
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
//将Route路由配置的Filter加入到gatewayFilters集合里面
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
//排序
AnnotationAwareOrderComparator.sort(combined);
//省略部分代码
//创建DefaultGatewayFilterChain对象,传入所有filter,调用fiter方法
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
private final int index;
private final List<GatewayFilter> filters;
DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}
public List<GatewayFilter> getFilters() {
return filters;
}
//fitlter方法,利用DefaultGatewayFilterChain 实现递归调用所有GatewayFilterAdapter的filter方法
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
private static class GatewayFilterAdapter implements GatewayFilter {
private final GlobalFilter delegate;
GatewayFilterAdapter(GlobalFilter delegate) {
this.delegate = delegate;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return this.delegate.filter(exchange, chain);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");
sb.append("delegate=").append(delegate);
sb.append('}');
return sb.toString();
}
}
}
5.3.2.10 GlobalFilter
GlobalFilter 的实现有很多,这里介绍几个常用的实现。
- 1.NettyRoutingFilter 利用HttpClientConnect发出请求
- 2.NettyWriteResponseFilter 请求结果处理
- 3.ReactiveLoadBalancerClientFilter 请求负载均衡处理
- 4.RouteToRequestUrlFilter 将路由对象转换成真正请求的url
5.4 自定义拦截器
5.4.1 自定义局部过滤器
代码
/**
* @author: guangjie.liao
* @Date: 2023/8/16 13:48
* @Description: 局部过滤的命名固定为 【前缀+GatewayFilterFactory】
*/
@Component
public class PartGatewayFilterFactory extends AbstractGatewayFilterFactory<PartGatewayFilterFactory.Config> {
private static String FAIL_URL = "http://localhost:8001/test/fail";
/**
* 重新父类apply方法,使用Config接收配置参数
* @param config
* @return
*/
@Override
public GatewayFilter apply(Config config) {
//这里可以做具体的拦截处理,如限流,规则校验
return (exchange, chain) -> {
ServerHttpRequest request = exchange.getRequest().mutate().build();
URI uri = exchange.getRequest().getURI();
String query = uri.getRawQuery();
//请求地址中如果不包括token惨,就跳转到失败地址
if (query.indexOf("token") == -1){
request = exchange.getRequest().mutate().uri(URI.create(FAIL_URL)).build();
return chain.filter(exchange.mutate().request(request).build());
}
return chain.filter(exchange.mutate().request(request).build());
};
}
/**
* 指定配置参数接收顺序
* @return
*/
@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("arg1","arg2","arg3","arg4","arg5");
}
/**
* 定义配置参数接收字段
*/
@Data
public static class Config{
private Object arg1;
private Object arg2;
private Object arg3;
private Object arg4;
private Object arg5;
}
}
使用配置
spring:
cloud:
gateway:
routes:
- id: microServiceGateway
# lb 表示从服务注册中心获取服务名称
uri: lb://microServiceCustomer
#uri: http://localhost:8001
predicates:
- Path=/test/**
#自定义Filter
filters:
# Part对应PartGatewayFilterFactory 类前缀
- name: Part
#args对应PreGatewayFilterFactory 内部类Config的字段
args:
args1: 1
args2: 2
args3: 3
args4: 4
args5: 5
5.4.2 自定义全局过滤器
/**
* @author: guangjie.liao
* @Date: 2022/11/7 18:46
* @Description: 滑动窗口限流
*/
@Component
public class SlidingWindowRateLimiter implements GatewayFilter {
@Autowired
private RedisTemplate redisTemplate;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String requestPath = request.getPath().toString();
String ip = request.getRemoteAddress().getHostString();
String key = ip + ":" + requestPath;
if (!isActionAllow(key,System.currentTimeMillis(),5,10)){
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
/**
*
* @param key 键值Key
* @param obj 此对象在set中不能重复,如有重复会更新score
* @param timeWin 时间窗口
* @param maxCount 每次时间窗口容忍的最大请求数量
* @return
*/
public boolean isActionAllow(String key, Object obj, int timeWin, int maxCount) {
if (StringUtils.isEmpty(key) || null == obj || timeWin == 0 || maxCount == 0) {
return false;
}
long curTime = System.currentTimeMillis();
ZSetOperations opsForZSet = redisTemplate.opsForZSet();
opsForZSet.add(key, curTime, curTime);
opsForZSet.removeRangeByScore(key, 0, curTime - (timeWin * 1000));
Long count = opsForZSet.zCard(key);
redisTemplate.expire(key, timeWin, TimeUnit.SECONDS);
return count <= maxCount;
}
}
5.5 集成Nacos实现灰度发布
5.5.1 nacos配置灰度环境服务版本号和测试用户
添加gray_white.yaml文件配置
gray:
users:
- test
- admin
productVersion: 1
grayscaleVersion: 2
5.5.2 自定义负载均衡策略
@Slf4j
public class GrayscaleServiceInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private GrayscaleWhiteListUser grayWhiteListUser;
/**
* @param serviceInstanceListSupplierProvider a provider of
* {@link ServiceInstanceListSupplier} that will be used to get available instances
*/
public GrayscaleServiceInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
GrayscaleWhiteListUser grayWhiteListUser) {
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.grayWhiteListUser = grayWhiteListUser;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
DefaultRequest req = (DefaultRequest) request;
RequestDataContext context = (RequestDataContext) req.getContext();
RequestData requestData = context.getClientRequest();
List<String> usernames = requestData.getHeaders().get("username");
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances,usernames));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances,
List<String> usernames) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances,usernames);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances,List<String> usernames) {
if (instances.isEmpty()) {
return new EmptyResponse();
}
List<ServiceInstance> grayscaleServers = new ArrayList<>();
List<ServiceInstance> productServers = new ArrayList<>();
for (ServiceInstance serverInfo : instances) {
//获取nacos配置元素据
NacosServiceInstance nacosServer = (NacosServiceInstance) serverInfo;
String version = nacosServer.getMetadata().get("version");
//根据版本号区分灰度还是生产
if (!StringUtils.isEmpty(version) && version.equals(grayWhiteListUser.getGrayscaleVersion())) {
grayscaleServers.add(nacosServer);
}else{
productServers.add(nacosServer);
}
}
//请求头配置白名单用户,没有配置则访问生产环境
if (!CollectionUtils.isEmpty(usernames)){
String username = usernames.get(0);
//配置的灰度环境服务不为空并且当前配置用户在白名单中
if (!CollectionUtils.isEmpty(grayscaleServers) && grayWhiteListUser.getUsers().contains(username)){
return getDefaultResponse(grayscaleServers);
}
}
return getDefaultResponse(productServers);
}
private DefaultResponse getDefaultResponse(List<ServiceInstance> servers){
//打乱随机取一个,也可以设置其他策略
Collections.shuffle(servers);
ServiceInstance server = servers.get(0);
return new DefaultResponse(server);
}
}
5.5.3 配置负载均衡策略初始化
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class GrayscaleConfiguration {
@Value("${spring.application.name}")
private String serverName;
@Autowired
private GrayscaleWhiteListUser grayWhiteListUser;
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(LoadBalancerClientFactory loadBalancerClientFactory) {
return new GrayscaleServiceInstanceLoadBalancer(loadBalancerClientFactory.getLazyProvider(serverName, ServiceInstanceListSupplier.class),grayWhiteListUser);
}
}
5.5.4 启动类配置负载均衡策略
@EnableDiscoveryClient
@SpringBootApplication(scanBasePackages = "com.lgj.**")
@LoadBalancerClients(defaultConfiguration = GrayscaleServiceInstanceLoadBalancer.class)
public class GatewayServerApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayServerApplication.class, args);
}
}
5.6 集成oauth2 统一鉴权认证
5.6.1 引入oauth2.0
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-security</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-oauth2</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
5.6.2 初始化TokenStore配置
@Configuration
public class AuthenticationConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public TokenStore tokenStore() {
return new RedisTokenStore(redisConnectionFactory);
}
}
5.6.3 配置全局拦截器拦截token校验
@Component
public class AuthenticationFilter implements GlobalFilter, Ordered{
@Autowired
private TokenStore tokenStore;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String url = exchange.getRequest().getURI().toString();
//过滤请求授权服务的接口
if (url.indexOf("/auth") > -1){
return chain.filter(exchange);
}
OAuth2AccessToken auth2AccessToken = tokenStore.readAccessToken(getToken(exchange));
if (auth2AccessToken == null){
throw new BusinessException(ResultCode.UN_AUTHORIZED);
}
//扩展信息
//Map<String,Object> param = auth2AccessToken.getAdditionalInformation();
return chain.filter(exchange);
}
/**
* 获取token
*/
private String getToken(ServerWebExchange exchange) {
String tokenStr = exchange.getRequest().getHeaders().getFirst("Authorization");
if (StringUtils.isBlank(tokenStr)) {
return null;
}
String token = tokenStr.split(" ")[1];
if (StringUtils.isBlank(token)) {
return null;
}
return token;
}
@Override
public int getOrder() {
return -1;
}
}