Policy包
- RetryPolicy是这个包的基接口,也就是说这个包都是各种实际可能用到的重试策略。
- NeverRetryPolicy类:
有个内部静态类-NeverRetryContext,扩展自RetryContextSupport类,额外加上finished属性。该属性默认为false.(open()方法中构造),在registerThrowable()方法中就变成true了,之后再调用canRetry()判断都是返回false。
也就是说,这个策略只有一次机会,出现异常也再也不能重试了。
P.S. 内部静态类:首先是内部类,其他类都不可见。其次是静态类,就不依附于外部类,可以独立实例化。但如果是内部类,还藏着一个this指针,可以方便的使用外部类的属性和方法。两种方式各有利弊,需要看情况使用。 - AlwaysRetryPolicy类:
扩展自NeverRetryPolicy类,其canRetry()始终返回true,也就是说,可以无限次重试。 - CompositeRetryPolicy类:
是一个组合策略,有个内部静态类CompositeRetryContext,包含RetryContext[]数组和RetryPolicy[]数组.内部维护一个RetryPolicy[]数组,可通过set()方法设置,然后在open()方法中传递给CompositeRetryContext。
另外有个optimistic指示,用了做组合策略判断canRetry的标尺。
a. 当optimistic为true,积极型时,canRetry遍历各个RetryPolicy,只要有一个策略返回可以重试,即认为组合为可以重试。
b. 当optimistic为false,消极型时,canRetry遍历各个RetryPolicy,只要有一个策略返回不可以重试,即认为组合为不可以重试。 - SimpleRetryPolicy类:
这里就是应用到Classifier的地方了,内部定义了一个BinaryExceptionClassifier,在构造器中初始化.traverseCauses是传递给BinaryExceptionClassifier用的,默认值为false。
public SimpleRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
boolean traverseCauses, boolean defaultValue) {
super();
this.maxAttempts = maxAttempts;
//通过给定的映射Map和默认值构造BinaryExceptionClassifier
this.retryableClassifier = new BinaryExceptionClassifier(retryableExceptions, defaultValue);
this.retryableClassifier.setTraverseCauses(traverseCauses);
}
它的canRetry()就比较简单了,从上下文中获取canRetry,然后调用retryableClassifier.classify()的结果和最大次数匹配一下即可。
6.ExpressionRetryPolicy类:
扩展自SimpleRetryPolicy,而且是1.2新增的一个策略。在父类canRetry的基础上,加上对lastThrowable的的表达式判断,符合特地表达式的异常才能重试。
- CircuitBreakerRetryPolicy类:
有个内部类CircuitBreakerRetryContext,断路器重试上下文。提供过载保护的策略,如果在时间间隔openTimeout内,直接短路,不允许重试,只有超过间隔的才能重试,具体判断是否熔断的代码如下:
public boolean isOpen() {
long time = System.currentTimeMillis() - this.start;
boolean retryable = this.policy.canRetry(this.context);
//当前不允许重试
if (!retryable) {
//如果已经超过重置时间,重新闭合,关闭熔断器
if (time > this.timeout) {
logger.trace("Closing");
this.context = createDelegateContext(policy, getParent());
this.start = System.currentTimeMillis();
retryable = this.policy.canRetry(this.context);
}
// 如果小于熔断器打开时间,读取关闭状态,如果熔断器是关闭的,就打开熔断器,重置熔断计时器
else if (time < this.openWindow) {
if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
logger.trace("Opening circuit");
setAttribute(CIRCUIT_OPEN, true);
}
this.start = System.currentTimeMillis();
return true;
}
}
//允许重试
else {
//判断是否在openWindow熔断器电路打开的超时时间之外,超过打开时间,就重置上下文,并且返回false
if (time > this.openWindow) {
logger.trace("Resetting context");
this.start = System.currentTimeMillis();
this.context = createDelegateContext(policy, getParent());
}
}
if (logger.isTraceEnabled()) {
logger.trace("Open: " + !retryable);
}
setAttribute(CIRCUIT_OPEN, !retryable);
return !retryable;
}
需要配合看
public boolean canRetry(RetryContext context) {
CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
//如果熔断器处于打开状态,就直接短路,返回失败
if (circuit.isOpen()) {
circuit.incrementShortCircuitCount();
return false;
}
else {
//重置熔断器
circuit.reset();
}
return this.delegate.canRetry(circuit.context);
}
- 当重试失败,且在熔断器打开时间窗口[0,openWindow) 内,立即熔断;
- 当重试失败,且在指定超时时间后(>timeout),熔断器电路重新闭合;
- 在熔断器半打开状态[openWindow, timeout]时,只要重试成功则重置上下文,断路器闭合。
- TimeoutRetryPolicy类:
有个TimeoutRetryContext,超时上下文,默认一秒内不允许重试,只有超时之后才能进行重试。 - RetryContextCache接口及其实现:
专门存放RetryContext的缓存,有两种实现MapRetryContextCache,使用HashMap保存。SoftReferenceMapRetryContextCache,使用SoftReference进行保存。
stats包
stats包含了重试的统计信息
- MutableRetryStatistics接口:
增加了incrementc××()方法,增加统计值 - DefaultRetryStatistics类:
统计的默认实现,定义了一堆的AtomicInterger存储统计值,同时扩展自AttributeAccessorSupport,还能存放其他信息。 - ExponentialAverageRetryStatistics类:
增加了指数平均指标的统计值, - RetryStatisticsFactory接口和DefaultRetryStatisticsFactory实现:
就一个create()方法,构造MutableRetryStatistics。默认是生产ExponentialAverageRetryStatistics统计类。 - StatisticsRepository接口:
统计仓库,可以存放多个统计信息的接口 - DefaultStatisticsRepository类:
统计仓库的默认实现,内部有一个DefaultRetryStatisticsFactory,如果找不到对应名字的统计,就由工厂生产一个。
listener包
就一个类RetryListenerSupport,其具体实现子类StatisticsListener位于stats包中。主要监听close()和onError()事件,并调用repository进行记录统计信息。
support包
- DefaultRetryState类:
代表了一个新的重试尝试的状态。包含3个属性:
//该状态的键值
final private Object key;
//是否需要强制刷新
final private boolean forceRefresh;
//回滚的转换器,当转换为true值是,这个异常将会引起回滚操作
final private Classifier<? super Throwable, Boolean> rollbackClassifier;
有个rollbackFor()用来判断某个异常是否需要引起回滚,如果没有rollbackClassifier,默认返回true,否则按照rollbackClassifier转换值进行判断。
- RetrySimulation类:
代表一个发生器,有个SleepSequence内部类,代表一组sleep值,有最长的,有总和的。而发生器根据序列维护内部的sleepHistogram直方图,在获得百分比是能返回对应值。 - RetrySimulator类:
用来执行补偿+重试的操作的工具类。在构造函数中传入SleepingBackOffPolicy和RetryPolicy作为内部属性。在executeSingleSimulation()方法中,设置好补偿机制和重置策略,然后直接通过template执行失败的FailingRetryException,模拟失败的动作,进行补偿和重试的组合操作。 - RetrySynchronizationManager类:
在ThreadLocal中存放RetryContext,用来保证一个线程只能维护一个重试上下文,进行一个重试操作。毕竟Sleep是用Tread.sleep,如果多个重试,这个补偿机制就无法生效了。 - RetryTemplate类:
这个是这个包最重要的一个类了,之前看到重试策略,回退策略,缓存、监听器都是应用在这里。
a.它实现了RetryOperations接口。
b.很有风格的是,execute是给外部调用的,真正内部起作用的是doExecute()
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state)
throws E, ExhaustedRetryException {
//重试策略
RetryPolicy retryPolicy = this.retryPolicy;
//补偿策略
BackOffPolicy backOffPolicy = this.backOffPolicy;
// Allow the retry policy to initialise itself...
// 根据当前策略,状态,重新生成重试上下文
RetryContext context = open(retryPolicy, state);
if (this.logger.isTraceEnabled()) {
this.logger.trace("RetryContext retrieved: " + context);
}
// Make sure the context is available globally for clients who need
// it...
// 在当前现场注册这个重试上下文,方便需要的客户端取出。
RetrySynchronizationManager.register(context);
Throwable lastException = null;
boolean exhausted = false;
try {
// 调用拦截器,执行RetryListener中open
boolean running = doOpenInterceptors(retryCallback, context);
// 判断拦截器是否允许重试
if (!running) {
throw new TerminatedRetryException(
"Retry terminated abnormally by interceptor before first attempt");
}
// 从conext中获取补偿上下文
BackOffContext backOffContext = null;
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext) resource;
}
// 如果当前没有补充上下文,就构建一个新的,并放入重试上下文中
if (backOffContext == null) {
backOffContext = backOffPolicy.start(context);
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
// 在允许情况下,使用while循环控制重试流程
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
//根据日志等级,打印重试次数日志
if (this.logger.isDebugEnabled()) {
this.logger.debug("Retry: count=" + context.getRetryCount());
}
lastException = null;
// 回调传入的具体执行动作
return retryCallback.doWithRetry(context);
}
catch (Throwable e) {
// 调用失败处理
lastException = e;
try {
//先在上下文中注册异常
registerThrowable(retryPolicy, state, context, e);
}
catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable",
ex);
}
finally {
// 调用拦截器,执行RetryListener中onError
doOnErrorInterceptors(retryCallback, context, e);
}
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
// 支持补偿机制,比如延迟2秒
backOffPolicy.backOff(backOffContext);
}
catch (BackOffInterruptedException ex) {
lastException = e;
// back off was prevented by another thread - fail the retry
if (this.logger.isDebugEnabled()) {
this.logger
.debug("Abort retry because interrupted: count="
+ context.getRetryCount());
}
throw ex;
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Checking for rethrow: count=" + context.getRetryCount());
}
if (shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count="
+ context.getRetryCount());
}
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
// 打印日志
if (state == null && this.logger.isDebugEnabled()) {
this.logger.debug(
"Retry failed last attempt: count=" + context.getRetryCount());
}
exhausted = true;
//重试失败后,如果有RecoveryCallback,则执行此回调
return handleRetryExhausted(recoveryCallback, context, state);
}
catch (Throwable e) {
//保底的catch,根据情况再包装一下抛出的异常
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null || exhausted);
// 调用拦截器,执行RetryListener中close
doCloseInterceptors(retryCallback, context, lastException);
// 清理线程中重试上下文
RetrySynchronizationManager.clear();
}
}
参考: