一、Spring GateWay
Spring Cloud提供了两套方便我们编写网关的中间件,分别是zuul和Spring GateWay,在zuul1的IO模型是使用BIO(图1-1)。而zuul2对IO模型使用NIO进行了重构(图1-2)。而Spring GateWay的IO模型是使用NIO。而在Netflix发布zuul2的时候Spring Cloud已经开始不集成到Spring Cloud中,因为Spring Cloud 等着zuul2集成太久,才有了Spring Gateway。Spring GateWay的架构是基于Spring webflux的基础上开发的。而对webflux的RP中涉及的Back Pressure、Stream、asynchronous好处不多说哈哈。
二、Spring GateWay 转发请求过程
2.1转发请求过程
在Spring mvc是通过HandlerMapping解析请求链接,然后根据请求链接找到执行这个请求Controller类 。而在Spring GateWay中也是使用HandlerMapping对请求的链接进行解析匹配对应的Route进行代理转发到对应的服务。图2-1为整个请求的流程,用户请求先通过DispatcherHandler找到对应GateWwayHandlerMapping,再通过GateWwayHandlerMapping解析匹配到对应的Handler。Handler处理完后,再经过Filter,最终到Proxied Service.
2.2转发请求过程代码分析
1.请求先由DispatcherHanlder进行处理,DispatcherHanlder初始化的时候会从IOC中查找实现HandlerMapping接口的实现类。然后保存到内部变量handlerMappings中,DispatcerHandler调用Handler方法迭代handler
Mappings中的HandlerMapping,
2.这里只讲解RoutePredicateHandlerMapping,因此然后调用RoutePredicateHandlerMapping中的获取路由的方法,当RoutePredicateHandlerMapping获取到对应的路由的时候会将Route存储到ServerWebExchanges的属性中,然后返回实现了WebHandler接口的FilteringWebHandler。FilteringWebHandler是一个存放过滤器的Handler。
3.最后DispatcherHanlder通过SimpleHandlerAdapter适配器的方式调用FilteringWebHandler的handler方法,FilteringWebHandler调用所有的过滤器,包括proxy filter。通过proxyFilter请求被代理的服务。处理完毕后,并将Response响应回去。
通过流程的分析,这里我们可以学习到适配的设计模式的使用。GateWay中的HandlerMapping有RouterFunctionMapping、RoutePredicateHandlerMapping等。这些HandlerMapping返回的结果都是不一样的。那么DispatcherHanlder的后续处理也不会一样。那么可以通过适配器的方式,根据HandlerMapping返回的结果进行适配调用。
三、 HandlerMapping \ RouteLocator\GlobalFilter(WebHandler)
3.1 HandlerMapping
图3-1为handler类关系图。这里主要涉及到Spring GateWay相关类的探讨。如:Spring Webflux使用到的RouteFuntionMapping和SimpleUrlHandlerMapping等不做探讨。
3.1.1AbstractHandlerMapping
HandlerMapping和Ordered接口主要定义了获取getHandler和当前hanler加载顺序。AbstractHandlerMapping在getHanlder封装了CORS处理。因为所有Handler都可能会涉及到CORS的处理,抽象到AbstractHandlerMapping处理,再提供了getHandlerInternal让子类实现具体的查找Handler的方法。
public Mono<Object> getHandler(ServerWebExchange exchange) {
return this.getHandlerInternal(exchange).map((handler) -> {
if (CorsUtils.isCorsRequest(exchange.getRequest())) {
CorsConfiguration configA = this.globalCorsConfigSource.getCorsConfiguration(exchange);
CorsConfiguration configB = this.getCorsConfiguration(handler, exchange);
CorsConfiguration config = configA != null ? configA.combine(configB) : configB;
if (!this.getCorsProcessor().process(config, exchange) || CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return REQUEST_HANDLED_HANDLER;
}
}
return handler;
});
}
protected abstract Mono<?> getHandlerInternal(ServerWebExchange var1);
我们可以通过研究这部分的代码学习到如果所有的接口的实现类都需要在接口定义的方法前或者后执行一些共性的操作使,我们可以通过抽象类实现这些共性的操作,再通过定义抽象方法,由子类实现特有的实现。而这种模式叫做模板模式。
3.1.2 RoutePredicateHandlerMapping
RoutePredicateHandlerMapping是处理获取路由的hanlder。Route
PredicateHandlerMapping中的RouteLocator是存储了我们启动的时候加载的路由对象信息。获取路由的时候,调用RoutePredicateHanlderMapping的getHandlerInternal方法从RouteLocator获取路由存放在ServerWebExchange中,返回webFilter。
private final RouteLocator routeLocator;
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getClass().getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator
.getRoutes()
//individually filter routes so that filterWhen error delaying is not a problem
.concatMap(route -> Mono
.just(route)
.filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
//instead of immediately stopping main flux due to error, log and swallow it
.doOnError(e -> logger.error("Error applying predicate for route: "+route.getId(), e))
.onErrorResume(e -> Mono.empty())
)
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
//TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
/* TODO: trace logging
if (logger.isTraceEnabled()) {
logger.trace("RouteDefinition did not match: " + routeDefinition.getId());
}*/
RoutePredicateHandlerMapping的创建是在GatewayAutoConfiguration进行自动创建。创建的时候指定webHandler和routeLocator。webHandler就是一个封装了Global Filter的对象,而routeLocator就是保存了所有Route的对象。
@Bean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(
FilteringWebHandler webHandler, RouteLocator routeLocator,
GlobalCorsProperties globalCorsProperties) {
return new RoutePredicateHandlerMapping(webHandler, routeLocator,
globalCorsProperties);
}
3.2 RouteLocator
RouteLocator主要作用是提供获取路由的类型。我们在分析Route
PredicateHandlerMapping的时候,知道RoutePredicateHandlerMapping获取路由是通过RouteLocator进行获取的。那么我们这里分析下RouteLocator加载路由。
3.2.1 Route组成
Route主要为三部分:
- Proxy:代理的信息包括被代理的uri。
- Predicate:包含接受请求的方法、path等信息
- Filter:Route自定义的过滤器。
3.2.2 RouteLocator加载过程
最总的 RouteLocator是CachingRoutelocator。加载过程是自上而下进行创建。
- Route加载来源分为三种来源:
第一种是通过RouteLocatorBuilder方式,在创建RouteLocator的方法上通过注入RouteLocatorBuilder来创建路由。如:
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
RouteLocator routeLocator = builder.routes()
.route("get",r->r.path("/security/**").uri("lb://security-server"))
.build();
return routeLocator;
}
第二种方式是通过Properties文件进行创建路由。Properties路由的创建包括:PropertiesRouteDefinitionLocator和DiscoveryClientRouteDefinitionLocator.
- PropertiesRouteDefinitionLocator 通过代码创建Route
- DiscoveryClientRouteDefinitionLocator 是通过配置文件中lb://security-serve 指定的服务名称,在Eurek查找Service中的uri进行创建Route。
spring:
cloud:
gateway:
routes:
- id: cookie_route
uri: http://example.org
predicates:
- Cookie=chocolate, ch.p
第三种方式是通过MYSQL或者Reids、内存(InMemoryRouteDefinitionRepository)方式创建路由。实现RouteDefinitionRepository接口实现接口中的方式。InMemoryRouteDefinitionRepository为默认方式。
public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {
private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());
@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap( r -> {
routes.put(r.getId(), r);
return Mono.empty();
});
}
@Override
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
if (routes.containsKey(id)) {
routes.remove(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition not found: "+routeId)));
});
}
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(routes.values());
}
}
3.2 GlobalFilter
3.2.1全局Filter的加载过程
Filter我们区分为全局Filter和RouteFilter
- Globle Filter是上图所列的Filter。而这些全局的Filter是在GatewayAutoConfiguration中通过@Bean的方式进行创建。
- Route Filter是在创建的Route的时候指定。这种路由属于自定义的路由。
3.2.3NettyRoutingFilter
在转发过程分析中我们知道最终的代理请求是通过一个Proxy Filter进行请求Proxy Service,那么这个Proxy Filter就是NettyRoutingFilter。通过下面的源码我们可以看到在proxyRequest.sendHeaders() .send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer).getNativeBuffer()));
中请求Proxy Service.
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest();
final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
final String url = requestUrl.toString();
HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(),
exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
Mono<HttpClientResponse> responseMono = this.httpClient.request(method, url, req -> {
final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
.headers(httpHeaders)
.chunkedTransfer(chunkedTransfer)
.failOnServerError(false)
.failOnClientError(false);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
proxyRequest.header(HttpHeaders.HOST, host);
}
return proxyRequest.sendHeaders() //I shouldn't need this
.send(request.getBody().map(dataBuffer ->
((NettyDataBuffer) dataBuffer).getNativeBuffer()));
});
if (properties.getResponseTimeout() != null) {
responseMono.timeout(properties.getResponseTimeout(),
Mono.error(new TimeoutException("Response took longer than timeout: " +
properties.getResponseTimeout())));
}
return responseMono.doOnNext(res -> {
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
if (headers.getContentType() != null) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, headers.getContentType());
}
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE);
response.getHeaders().putAll(filteredResponseHeaders);
HttpStatus status = HttpStatus.resolve(res.status().code());
if (status != null) {
response.setStatusCode(status);
} else if (response instanceof AbstractServerHttpResponse) {
// https://jira.spring.io/browse/SPR-16748
((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code());
} else {
throw new IllegalStateException("Unable to set status code on response: " +res.status().code()+", "+response.getClass());
}
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
}).then(chain.filter(exchange));
}