RateLimiter、Bulkhead、Retry详解

这几个组件的配置、注册器、事件等都和CircuitBreaker类似,不再赘述,主要说一下他们的工作原理

RateLimiter

RateLimiter提供了两套实现,一个是基于信号量的,一个是基于令牌桶的:

rateLimiter

Semaphore

使用一个计数信号量,当请求超过计数值,则在超时时间内等待,基于java concurrent并发包中的Semaphore实现。主要实现逻辑如下:

public boolean acquirePermission() {
    try {
        // 通过semaphore的tryAcquire()方法获取许可,同时设定等待时间
        boolean success = semaphore.tryAcquire(
            rateLimiterConfig.get().getTimeoutDuration().toNanos(), TimeUnit.NANOSECONDS);
        publishRateLimiterEvent(success);
        return success;
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        publishRateLimiterEvent(false);
        return false;
    }
}

void refreshLimit() {
    // 通过配置的刷新时间释放semaphore
    int permissionsToRelease = this.rateLimiterConfig.get()
        .getLimitForPeriod() - semaphore.availablePermits();
    semaphore.release(permissionsToRelease);
}

令牌桶

令牌桶的运行如下图所示:

rate_limiter

在一个cycle内会放入新令牌,线程进入时会从中拿取令牌,如果令牌没有了,就会等待下一个cycle,如果等待时间比较长还可能等待多个cycle,如果拿到令牌的时间超过等待时间就会拒绝。具体实现如下:

核心的方法是acquirePermission(),每次请求进来都会调用该方法。

@Override
public boolean acquirePermission() {
    // 获取线程可以等待的最大时间
    long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();
    // 更新令牌桶的状态
    State modifiedState = updateStateWithBackOff(timeoutInNanos);
    // 判断是否能等待拿到令牌
    boolean result = waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait);
    // 发布事件
    publishRateLimiterEvent(result);
    return result;
}

看看updateStateWithBackOff(timeoutInNanos)方法是如何更新桶的状态的:

// 这里使用compareAndSet来更新下一个状态,state是一个原子引用
private State updateStateWithBackOff(final long timeoutInNanos) {
    AtomicRateLimiter.State prev;
    AtomicRateLimiter.State next;
    do {
        prev = state.get();
        next = calculateNextState(timeoutInNanos, prev);
    } while (!compareAndSet(prev, next));
    return next;
}

获取状态的核心在calculateNextState(timeoutInNanos, prev)方法:

private State calculateNextState(final long timeoutInNanos, final State activeState) {
    // 分别拿取令牌刷新时间和每一个周期的时间
    long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriod().toNanos();
    int permissionsPerCycle = activeState.config.getLimitForPeriod();

    // 拿到当前的时间,并计算当前时间经过了多少个cycle
    long currentNanos = currentNanoTime();
    long currentCycle = currentNanos / cyclePeriodInNanos;

    // 目前的cycle其实是前一个state的cycle
    long nextCycle = activeState.activeCycle;
    // 拿取前一个state剩余的令牌
    int nextPermissions = activeState.activePermissions;
    // 如果两个cycle不一样
    if (nextCycle != currentCycle) {
        // 计算中间相差几个cycle
        long elapsedCycles = currentCycle - nextCycle;
        // cycle数*每个cycle更新的令牌得到增加的令牌总数
        long accumulatedPermissions = elapsedCycles * permissionsPerCycle;
        // cycle更新
        nextCycle = currentCycle;
        // 令牌数不能超过限流数,所以取令牌总数和限流数的较小者
        nextPermissions = (int) min(nextPermissions + accumulatedPermissions, permissionsPerCycle);
    }
    // 计算nextPermissions变为正数所需等待的时间
    long nextNanosToWait = nanosToWaitForPermission(
        cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos, currentCycle
    );
    // 更新下一个状态的信息,即从桶中拿令牌(如果拿到令牌等待的时间小于最大等待时间,令牌数就会-1)
    State nextState = reservePermissions(activeState.config, timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait);
    return nextState;
}

拿到下一个状态就可以判断线程能否进入了,waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait)中会分三种情况返回结果:

private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) {
    boolean canAcquireImmediately = nanosToWait <= 0;
    boolean canAcquireInTime = timeoutInNanos >= nanosToWait;

    // 如果不需要等待,就直接返回true
    if (canAcquireImmediately) {
        return true;
    }
    // 如果需要等待但等待时间小于最大等待时间,就等待,如果线程没有被中断,则会返回true,否则返回false
    if (canAcquireInTime) {
        return waitForPermission(nanosToWait);
    }
    // 如果最终拿不到令牌,就在最大等待时间结束后返回false
    waitForPermission(timeoutInNanos);
    return false;
}

令牌桶是RateLimeter默认的实现,它的好处是比信号量的RateLimeter更高效,因为桶中的令牌可以被减为负数,它会提前计算在限定时间内能否拿到令牌,而不是信号量为0之后就一直阻塞。

Bulkhead

Bulkhead分为两种,固定线程池和信号量BulkheadThreadPoolBulkhead,由于线程池会有额外开销,不便管理,所以一般使用信号量的Bulkhead。信号量的Bulkhead非常简单,主要方法就是两个:

//在线程进入时调用semaphore的tryAcquire()方法
boolean tryEnterBulkhead() {
    boolean callPermitted;
    long timeout = config.getMaxWaitDuration().toMillis();

    if (timeout == 0) {
        callPermitted = semaphore.tryAcquire();
    } else {
        try {
            callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            callPermitted = false;
        }
    }
    return callPermitted;
}

//结束时调用semaphore的release()方法
public void onComplete() {
    semaphore.release();
    publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
}

Retry

Retry的实现只有一种,也很简单,先从包装函数看:

static <T> CheckedFunction0<T> decorateCheckedSupplier(Retry retry, CheckedFunction0<T> supplier) {
    return () -> {
        Retry.Context<T> context = retry.context();
        do try {
            // 拿到执行结果放到result中判断结果
            T result = supplier.apply();
            final boolean validationOfResult = context.onResult(result);
            if (!validationOfResult) {
                // 如果有效,交给onSuccess()
                context.onSuccess();
                return result;
            }
        } catch (Exception exception) {
            // 如果抛出错误,交给onError()
            context.onError(exception);
            // 否则进行重试
        } while (true);
    };
}

OnResult(result)做了几件事:

public boolean onResult(T result) {
    // 1.先由谓词判断结果是否有效,若没有谓词函数表明结果有效,返回false
    if (null != resultPredicate && resultPredicate.test(result)) {
        // 2.如果谓词判断结果要重试则计算重试次数
        int currentNumOfAttempts = numOfAttempts.incrementAndGet();
        if (currentNumOfAttempts >= maxAttempts) {
            //超出重试次数就返回false表明结果有效
            return false;
        } else {
            //否则等待一段时间,返回true,表明结果无效
            waitIntervalAfterFailure(currentNumOfAttempts, null);
            return true;
        }
    }
    return false;
}

onSuccess()主要是记录一些结果到事件中。

onError()则对异常进行判断:

public void onError(Exception exception) throws Exception {
    // 由谓词函数判断是否需要重试
    if (exceptionPredicate.test(exception)) {
        //如果需要重试,记录一下
        lastException.set(exception);
        //由throwOrSleepAfterException()判断是重试还是抛出异常,主要根据是否超出重试次数来判断
        throwOrSleepAfterException();
    } else {
        //如果不需要重试,记录一下,然后抛出异常
        failedWithoutRetryCounter.increment();
        publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), exception));
        throw exception;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容