这几个组件的配置、注册器、事件等都和CircuitBreaker类似,不再赘述,主要说一下他们的工作原理
RateLimiter
RateLimiter提供了两套实现,一个是基于信号量的,一个是基于令牌桶的:
Semaphore
使用一个计数信号量,当请求超过计数值,则在超时时间内等待,基于java concurrent并发包中的Semaphore实现。主要实现逻辑如下:
public boolean acquirePermission() {
try {
// 通过semaphore的tryAcquire()方法获取许可,同时设定等待时间
boolean success = semaphore.tryAcquire(
rateLimiterConfig.get().getTimeoutDuration().toNanos(), TimeUnit.NANOSECONDS);
publishRateLimiterEvent(success);
return success;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
publishRateLimiterEvent(false);
return false;
}
}
void refreshLimit() {
// 通过配置的刷新时间释放semaphore
int permissionsToRelease = this.rateLimiterConfig.get()
.getLimitForPeriod() - semaphore.availablePermits();
semaphore.release(permissionsToRelease);
}
令牌桶
令牌桶的运行如下图所示:
在一个cycle内会放入新令牌,线程进入时会从中拿取令牌,如果令牌没有了,就会等待下一个cycle,如果等待时间比较长还可能等待多个cycle,如果拿到令牌的时间超过等待时间就会拒绝。具体实现如下:
核心的方法是acquirePermission(),每次请求进来都会调用该方法。
@Override
public boolean acquirePermission() {
// 获取线程可以等待的最大时间
long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();
// 更新令牌桶的状态
State modifiedState = updateStateWithBackOff(timeoutInNanos);
// 判断是否能等待拿到令牌
boolean result = waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait);
// 发布事件
publishRateLimiterEvent(result);
return result;
}
看看updateStateWithBackOff(timeoutInNanos)方法是如何更新桶的状态的:
// 这里使用compareAndSet来更新下一个状态,state是一个原子引用
private State updateStateWithBackOff(final long timeoutInNanos) {
AtomicRateLimiter.State prev;
AtomicRateLimiter.State next;
do {
prev = state.get();
next = calculateNextState(timeoutInNanos, prev);
} while (!compareAndSet(prev, next));
return next;
}
获取状态的核心在calculateNextState(timeoutInNanos, prev)方法:
private State calculateNextState(final long timeoutInNanos, final State activeState) {
// 分别拿取令牌刷新时间和每一个周期的时间
long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriod().toNanos();
int permissionsPerCycle = activeState.config.getLimitForPeriod();
// 拿到当前的时间,并计算当前时间经过了多少个cycle
long currentNanos = currentNanoTime();
long currentCycle = currentNanos / cyclePeriodInNanos;
// 目前的cycle其实是前一个state的cycle
long nextCycle = activeState.activeCycle;
// 拿取前一个state剩余的令牌
int nextPermissions = activeState.activePermissions;
// 如果两个cycle不一样
if (nextCycle != currentCycle) {
// 计算中间相差几个cycle
long elapsedCycles = currentCycle - nextCycle;
// cycle数*每个cycle更新的令牌得到增加的令牌总数
long accumulatedPermissions = elapsedCycles * permissionsPerCycle;
// cycle更新
nextCycle = currentCycle;
// 令牌数不能超过限流数,所以取令牌总数和限流数的较小者
nextPermissions = (int) min(nextPermissions + accumulatedPermissions, permissionsPerCycle);
}
// 计算nextPermissions变为正数所需等待的时间
long nextNanosToWait = nanosToWaitForPermission(
cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos, currentCycle
);
// 更新下一个状态的信息,即从桶中拿令牌(如果拿到令牌等待的时间小于最大等待时间,令牌数就会-1)
State nextState = reservePermissions(activeState.config, timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait);
return nextState;
}
拿到下一个状态就可以判断线程能否进入了,waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait)中会分三种情况返回结果:
private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) {
boolean canAcquireImmediately = nanosToWait <= 0;
boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
// 如果不需要等待,就直接返回true
if (canAcquireImmediately) {
return true;
}
// 如果需要等待但等待时间小于最大等待时间,就等待,如果线程没有被中断,则会返回true,否则返回false
if (canAcquireInTime) {
return waitForPermission(nanosToWait);
}
// 如果最终拿不到令牌,就在最大等待时间结束后返回false
waitForPermission(timeoutInNanos);
return false;
}
令牌桶是RateLimeter默认的实现,它的好处是比信号量的RateLimeter更高效,因为桶中的令牌可以被减为负数,它会提前计算在限定时间内能否拿到令牌,而不是信号量为0之后就一直阻塞。
Bulkhead
Bulkhead分为两种,固定线程池和信号量Bulkhead和ThreadPoolBulkhead,由于线程池会有额外开销,不便管理,所以一般使用信号量的Bulkhead。信号量的Bulkhead非常简单,主要方法就是两个:
//在线程进入时调用semaphore的tryAcquire()方法
boolean tryEnterBulkhead() {
boolean callPermitted;
long timeout = config.getMaxWaitDuration().toMillis();
if (timeout == 0) {
callPermitted = semaphore.tryAcquire();
} else {
try {
callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
callPermitted = false;
}
}
return callPermitted;
}
//结束时调用semaphore的release()方法
public void onComplete() {
semaphore.release();
publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
}
Retry
Retry的实现只有一种,也很简单,先从包装函数看:
static <T> CheckedFunction0<T> decorateCheckedSupplier(Retry retry, CheckedFunction0<T> supplier) {
return () -> {
Retry.Context<T> context = retry.context();
do try {
// 拿到执行结果放到result中判断结果
T result = supplier.apply();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
// 如果有效,交给onSuccess()
context.onSuccess();
return result;
}
} catch (Exception exception) {
// 如果抛出错误,交给onError()
context.onError(exception);
// 否则进行重试
} while (true);
};
}
OnResult(result)做了几件事:
public boolean onResult(T result) {
// 1.先由谓词判断结果是否有效,若没有谓词函数表明结果有效,返回false
if (null != resultPredicate && resultPredicate.test(result)) {
// 2.如果谓词判断结果要重试则计算重试次数
int currentNumOfAttempts = numOfAttempts.incrementAndGet();
if (currentNumOfAttempts >= maxAttempts) {
//超出重试次数就返回false表明结果有效
return false;
} else {
//否则等待一段时间,返回true,表明结果无效
waitIntervalAfterFailure(currentNumOfAttempts, null);
return true;
}
}
return false;
}
onSuccess()主要是记录一些结果到事件中。
onError()则对异常进行判断:
public void onError(Exception exception) throws Exception {
// 由谓词函数判断是否需要重试
if (exceptionPredicate.test(exception)) {
//如果需要重试,记录一下
lastException.set(exception);
//由throwOrSleepAfterException()判断是重试还是抛出异常,主要根据是否超出重试次数来判断
throwOrSleepAfterException();
} else {
//如果不需要重试,记录一下,然后抛出异常
failedWithoutRetryCounter.increment();
publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), exception));
throw exception;
}
}