修改请求报文、响应报文是API网关框架的基础功能,然而在Spring Cloud Gateway中修改报文体似乎并不是一件容易的事,本文以3.0.3
版本为例,讲讲在Spring Cloud Gateway如何优雅的修改请求报文、响应报文。
一、官方方法
在Spring Cloud Gateway官方文档中,有如下方法,可供参考:
1.1 修改请求报文
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("rewrite_request_obj", r -> r.host("*.rewriterequestobj.org")
.filters(f -> f.prefixPath("/httpbin")
.modifyRequestBody(String.class, Hello.class, MediaType.APPLICATION_JSON_VALUE,
(exchange, s) -> return Mono.just(new Hello(s.toUpperCase())))).uri(uri))
.build();
}
static class Hello {
String message;
public Hello() { }
public Hello(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
1.2 修改响应报文
@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("rewrite_response_upper", r -> r.host("*.rewriteresponseupper.org")
.filters(f -> f.prefixPath("/httpbin")
.modifyResponseBody(String.class, String.class,
(exchange, s) -> Mono.just(s.toUpperCase()))).uri(uri))
.build();
}
当然,这种方式有其局限性:
- 只能写死在生成Route的地方,一旦API变多,或者是动态路由,不太优雅
- 无法在自定义的Global Filter、Gateway Filter中直接调用
二、优雅实现
一开始,当我接触Spring Cloud Gateway时,想自己通过实现Global Filter实现修改请求报文、响应报文,摸不着头脑。一个看似很简单的问题,在zuul1中只需要修改两下变量,就可以轻松改掉。换了异步非阻塞的Spring Cloud Gateway,仿若掉入了天坑,想修改一次,没有100行代码,办不了这个事情。
看互联网上有很多文章,代码不仅冗余、复杂、不够优雅、易读性差,还不能够支持HTTP 1.1、Gzip,总给人一种hacky实现的感觉。这就让我顿时疑惑了起来,一个堂堂的Gateway网关,修改请求报文、响应报文居然要这么麻烦。
后来,随着阅读官方文档、官方源码的不断深入,我理解了其实Spring Cloud Gateway的初衷,似乎并不是想做一个网关“框架”,而更像是做一个开箱即用的网关应用程序,任何网关相关的参数,均可通过参数配置实现,无需自行编码,或者使用轻量级的函数式编程语句。确实,这很好,对于微服务网关,足够了。但是,如果要深度定制网关的功能,就会感到十分为难,一个封装十足彻底的工具,要想不动引用包源码的情况下,从外层修改它,犹如把一个豪华法拉利改装成特斯拉,使用网上的hacky办法,总给人一种,里外里套了两层的感觉。
2.1 实现原理
为了解决不够优雅的问题,通过借鉴Spring Cloud Gateway 如下类的 原生的rewrite方法,重新实现Config的响应式参数传递,从而实现在Filter中修改请求报文、响应报文的函数式编程,一劳永逸。
org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory
org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory
通过该方式实现修改body体,相较于网络上的通用方法,好处如下:
- 代码统一封装,不用牵一发动全身;
- 函数式编程,实现优雅;
- 支持gzip、chunked等HTTP特性;
- 请求、响应的修改,都还在Filter中修改;
值得注意的是,需要对Mono或Flux的异常进行捕获,捕获方式不一定是try catch的方式,而是.just(xxx).doOnError()
2.2 基础封装
在工程中,创建3个类,放到基础目录下,用于调用,如果Spring Cloud Gateway更新了请求、响应相关的代码,只需更新如下代码即可。
RewriteConfig.java
import org.springframework.cloud.gateway.filter.factory.rewrite.RewriteFunction;
import java.util.Map;
public class RewriteConfig {
private Class inClass;
private Class outClass;
private Map<String, Object> inHints;
private Map<String, Object> outHints;
private String newContentType;
private String contentType;
private RewriteFunction rewriteFunction;
public Class getInClass() {
return inClass;
}
public RewriteConfig setInClass(Class inClass) {
this.inClass = inClass;
return this;
}
public Class getOutClass() {
return outClass;
}
public RewriteConfig setOutClass(Class outClass) {
this.outClass = outClass;
return this;
}
public Map<String, Object> getInHints() {
return inHints;
}
public RewriteConfig setInHints(Map<String, Object> inHints) {
this.inHints = inHints;
return this;
}
public Map<String, Object> getOutHints() {
return outHints;
}
public RewriteConfig setOutHints(Map<String, Object> outHints) {
this.outHints = outHints;
return this;
}
public String getNewContentType() {
return newContentType;
}
public RewriteConfig setNewContentType(String newContentType) {
this.newContentType = newContentType;
return this;
}
public RewriteFunction getRewriteFunction() {
return rewriteFunction;
}
public RewriteConfig setRewriteFunction(RewriteFunction rewriteFunction) {
this.rewriteFunction = rewriteFunction;
return this;
}
public <T, R> RewriteConfig setRewriteFunction(Class<T> inClass, Class<R> outClass,
RewriteFunction<T, R> rewriteFunction) {
setInClass(inClass);
setOutClass(outClass);
setRewriteFunction(rewriteFunction);
return this;
}
public String getContentType() {
return "application/json;charset=utf-8";
}
public RewriteConfig setContentType(String contentType) {
this.contentType = contentType;
return this;
}
}
ModifiedRequestDecorator.java
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.function.Function;
public class ModifiedRequestDecorator {
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
private final RewriteConfig config;
public ModifiedRequestDecorator(ServerWebExchange exchange, RewriteConfig config) {
this.config = config;
}
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Class inClass = config.getInClass();
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
// TODO: flux or mono
Mono<?> modifiedBody = serverRequest.bodyToMono(inClass)
.flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody))
.switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction().apply(exchange, null)));
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, config.getOutClass());
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
// if the body is changing content types, set it here, to the bodyInserter
// will know about it
if (config.getContentType() != null) {
headers.set(HttpHeaders.CONTENT_TYPE, config.getContentType());
}
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
// .log("modify_request", Level.INFO)
.then(Mono.defer(() -> {
ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
return chain.filter(exchange.mutate().request(decorator).build());
})).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange,
outputMessage, throwable));
}
protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage,
Throwable throwable) {
return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
}
ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
}
else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
}
ModifiedResponseDecorator.java
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.filter.factory.rewrite.GzipMessageBodyResolver;
import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyDecoder;
import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyEncoder;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.function.Function.identity;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;
public class ModifiedResponseDecorator extends ServerHttpResponseDecorator {
private final ServerWebExchange exchange;
private final RewriteConfig config;
private final Map<String, MessageBodyDecoder> messageBodyDecoders ;
private final Map<String, MessageBodyEncoder> messageBodyEncoders;
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
public ModifiedResponseDecorator(ServerWebExchange exchange, RewriteConfig config) {
super(exchange.getResponse());
this.exchange = exchange;
this.config = config;
Set<MessageBodyDecoder> messageBodyDecodersSet = new HashSet<>();
Set<MessageBodyEncoder> messageBodyEncodersSet = new HashSet<>();
MessageBodyDecoder messageBodyDecoder = new GzipMessageBodyResolver();
MessageBodyEncoder messageBodyEncoder = new GzipMessageBodyResolver();
messageBodyDecodersSet.add(messageBodyDecoder);
messageBodyEncodersSet.add(messageBodyEncoder);
this.messageBodyDecoders = messageBodyDecodersSet.stream()
.collect(Collectors.toMap(MessageBodyDecoder::encodingType, identity()));
this.messageBodyEncoders = messageBodyEncodersSet.stream()
.collect(Collectors.toMap(MessageBodyEncoder::encodingType, identity()));
}
@SuppressWarnings("unchecked")
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Class inClass = config.getInClass();
Class outClass = config.getOutClass();
String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
HttpHeaders httpHeaders = new HttpHeaders();
// explicitly add it in this way instead of
// 'httpHeaders.setContentType(originalResponseContentType)'
// this will prevent exception in case of using non-standard media
// types like "Content-Type: image"
httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);
ClientResponse clientResponse = prepareClientResponse(body, httpHeaders);
// TODO: flux or mono
Mono modifiedBody = extractBody(exchange, clientResponse, inClass)
.flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody))
.switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction().apply(exchange, null)));
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, outClass);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
exchange.getResponse().getHeaders());
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
Mono<DataBuffer> messageBody = writeBody(getDelegate(), outputMessage, outClass);
HttpHeaders headers = getDelegate().getHeaders();
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
|| headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
}
// TODO: fail if isStreamingMediaType?
return getDelegate().writeWith(messageBody);
}));
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
}
private ClientResponse prepareClientResponse(Publisher<? extends DataBuffer> body, HttpHeaders httpHeaders) {
ClientResponse.Builder builder;
builder = ClientResponse.create(exchange.getResponse().getStatusCode(), messageReaders);
return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build();
}
private <T> Mono<T> extractBody(ServerWebExchange exchange, ClientResponse clientResponse, Class<T> inClass) {
// if inClass is byte[] then just return body, otherwise check if
// decoding required
if (byte[].class.isAssignableFrom(inClass)) {
return clientResponse.bodyToMono(inClass);
}
List<String> encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
for (String encoding : encodingHeaders) {
MessageBodyDecoder decoder = messageBodyDecoders.get(encoding);
if (decoder != null) {
return clientResponse.bodyToMono(byte[].class).publishOn(Schedulers.parallel()).map(decoder::decode)
.map(bytes -> exchange.getResponse().bufferFactory().wrap(bytes))
.map(buffer -> prepareClientResponse(Mono.just(buffer),
exchange.getResponse().getHeaders()))
.flatMap(response -> response.bodyToMono(inClass));
}
}
return clientResponse.bodyToMono(inClass);
}
private Mono<DataBuffer> writeBody(ServerHttpResponse httpResponse, CachedBodyOutputMessage message,
Class<?> outClass) {
Mono<DataBuffer> response = DataBufferUtils.join(message.getBody());
if (byte[].class.isAssignableFrom(outClass)) {
return response;
}
List<String> encodingHeaders = httpResponse.getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
for (String encoding : encodingHeaders) {
MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
if (encoder != null) {
DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
response = response.publishOn(Schedulers.parallel()).map(buffer -> {
byte[] encodedResponse = encoder.encode(buffer);
DataBufferUtils.release(buffer);
return encodedResponse;
}).map(dataBufferFactory::wrap);
break;
}
}
return response;
}
}
修改请求
filter()
方法返回参考代码
// 修改请求内容
return new ModifiedRequestDecorator(exchange, new RewriteConfig()
.setRewriteFunction(String.class, String.class, (ex, requestData)
-> Mono.just(要修改请求内容的方法(requestData))
)).filter(exchange, chain);
修改响应
filter()
方法返回参考代码
// 修改响应内容
return chain.filter(exchange.mutate().response(
new ModifiedResponseDecorator(exchange, new RewriteConfig().
setRewriteFunction(String.class, String.class, (ex, responseData)
-> Mono.just(要修改响应内容的方法(responseData))
))).build());
修改请求、响应
filter()
方法返回参考代码
// 修改请求内容
return new ModifiedRequestDecorator(exchange, new RewriteConfig()
.setRewriteFunction(String.class, String.class, (ex, requestData)
-> Mono.just(要修改请求内容的方法(requestData))
)).filter(exchange.mutate().response(
// 修改响应内容
new ModifiedResponseDecorator(exchange, new RewriteConfig().
setRewriteFunction(String.class, String.class, (ex, responseData)
-> Mono.just(要修改响应内容的方法(responseData))
))).build(),chain);