序
本文主要研究一下reactor extra的retry
maven
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
<version>3.1.4.RELEASE</version>
</dependency>
实例
TcpClient client = TcpClient.create("localhost", 8888);
client.newHandler((inbound,outbound) -> {
return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
.asString().next().log().then());
}).doOnError(e -> e.printStackTrace())
.subscribe();
上面这个TcpClient,在server没有启动的情况下连接会直接报错
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8888
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
... 10 more
连接失败重连
简单重试
client.newHandler((inbound,outbound) -> {
return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
.asString().next().log().then());
}).doOnError(e -> e.printStackTrace())
.retry(3)
.subscribe();
retry可以直接指定重试次数
高级重试
client.newHandler((inbound,outbound) -> {
return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
.asString().next().log().then());
}).doOnError(e -> e.printStackTrace())
.retryWhen(Retry.allBut(RuntimeException.class)
.retryMax(5000)
.fixedBackoff(Duration.ofSeconds(5))
.doOnRetry(e -> {
e.exception().printStackTrace();
})
)
.subscribe();
利用reactor extra项目中的Retry帮助类,可以轻松指定高级重试策略,比如fixedBackoff,亦或是exponentialBackoff等
client.newHandler((inbound,outbound) -> {
return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
.asString().next().log().then());
}).doOnError(e -> e.printStackTrace())
.retryWhen(Retry.allBut(RuntimeException.class)
.retryMax(5000) .exponentialBackoffWithJitter(Duration.ofMillis(100),Duration.ofMillis(500))
.doOnRetry(e -> {
e.exception().printStackTrace();
})
)
.subscribe();
这里使用了exponentialBackoffWithJitter,第一个参数是firstBackoff时间,第二个参数是maxBackoff,也就是maxBackoffInterval,如果为null则相当于Duration.ofSeconds(Long.MAX_VALUE)
Retry
reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/Retry.java
/**
* Returns a retry function that retries errors resulting from all exceptions except
* the specified non-retriable exceptions. More constraints may be added using
* {@link #retryMax(int)} or {@link #timeout(Duration)}.
*
* @param nonRetriableExceptions exceptions that may not be retried
* @return retry function that retries all exceptions except the specified non-retriable exceptions.
*/
@SafeVarargs
static <T> Retry<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) {
Predicate<? super RetryContext<T>> predicate = context -> {
Throwable exception = context.exception();
if (exception == null)
return true;
for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
if (clazz.isInstance(exception))
return false;
}
return true;
};
return DefaultRetry.<T>create(predicate);
}
可以看到使用DefaultRetry来创建
reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/DefaultRetry.java
public static <T> DefaultRetry<T> create(Predicate<? super RetryContext<T>> retryPredicate) {
return new DefaultRetry<T>(retryPredicate,
1,
null,
Backoff.zero(),
Jitter.noJitter(),
null,
NOOP_ON_RETRY,
(T) null);
}
注意这里的maxIterations默认为1,也就是如果不指定retryMax,相当于高级重试策略就白费了,这个要额外注意一下。
小结
Reactor Extra提供的Retry工具类非常好用,值得实验一下。