Reslience4j限流组件RateLimiter源码分析

限流简介

在我们开发的软件系统中,做压测的时候需要估算我们的系统能抗住多少并发,在系统出现并发过高的时候也不至于服务器不可用状态。常见的做法是限流、熔断和降级。在Resilienc4j中给我们提供了限流和熔断组件,我们可以轻松的实现本机的熔断和限流功能。为了保护我们的系统的稳定性和可用性,一般我们想到的就是在服务器压力大的时候拒绝请求或使用缓存队列的方式暂存请求。不过限流有可以基于Redis+Lua实现限流,也可以使用阿里开源的Sentinel限流。常用的限流算法不外乎(固定)计数法、滑动窗口计数法、令牌桶法和漏桶法。今天我们要介绍的就是Resilience4j提供的RateLimiter组件实现的就是令牌桶法。

令牌桶限流原理
image.png

令牌桶算法的原理是以一个恒定的速度往桶里放入令牌,如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。令牌桶算法能够满足具有突发特性的流量。通常突发流量最大值对于我们自己维护的服务是清晰可控的,为保证系统的最大可用性(处理更多的请求),同时防止自己的服务被打垮。

Resilience4j限流原理

在Resilience4j中的限流模块是ratelimiter,基于函数式编程。可以理解为通过AOP的方式拦截请求实现限流。在ratelimiter模块中,RateLimiter定义了限流的接口,基于计数AtomicRateLimiter和基于信号量SemaphoreBasedRateLimiter的两个实现类。由于SemaphoreBasedRateLimiter是基于java的Semaphore实现,所以就不分析了。

1.RateLimiterConfig配置类
    private final Duration timeoutDuration;
    private final Duration limitRefreshPeriod;
    private final int limitForPeriod;
    private final Function<Either<? extends Throwable, ?>, Boolean> drainPermissionsOnResult;
    private final boolean writableStackTraceEnabled;
  • timeoutDuration: 当请求被限流时,线程等待的时间
  • limitRefreshPeriod: 令牌刷新的周期(period),以纳秒为单位
  • limitForPeriod: 在每个(period)周期内的令牌数
  • drainPermissionsOnResult:
  • writableStackTraceEnabled:
2.RateLimiter接口
    boolean acquirePermission(int permits);
    long reservePermission(int permits);
    void drainPermissions();
    ......
       static <T> Callable<T> decorateCallable(RateLimiter rateLimiter, int permits,
        Callable<T> callable) {
        return () -> {
            waitForPermission(rateLimiter, permits);
            try {
                T result = callable.call();
                rateLimiter.onResult(result);
                return result;
            } catch (Exception exception) {
                rateLimiter.onError(exception);
                throw exception;
            }
        };
    }

  default void onError(Throwable throwable) {
        drainIfNeeded(Either.left(throwable));
    }

 default void onSuccess() {
        drainIfNeeded(Either.right(null));
    }

   default void onResult(Object result) {
        drainIfNeeded(Either.right(result));
    }

    default void drainIfNeeded(Either<? extends Throwable, ?> callsResult) {
        Function<Either<? extends Throwable, ?>, Boolean> checker = getRateLimiterConfig()
            .getDrainPermissionsOnResult();
        if (checker == null) {
            return;
        }
        boolean drainNeeded = checker.apply(callsResult);
        if (drainNeeded) {
            drainPermissions();
        }
    }

    static void waitForPermission(final RateLimiter rateLimiter, int permits) {
        boolean permission = rateLimiter.acquirePermission(permits);
        if (Thread.currentThread().isInterrupted()) {
            throw new AcquirePermissionCancelledException();
        }
        if (!permission) {
            throw RequestNotPermitted.createRequestNotPermitted(rateLimiter);
        }
    }
   ......

在RateLimiter接口中除了静态的获取限流器的方法外,还有静态方法装饰请求,目前可以装饰的有Callable, Supplier, Runnable, Consumer, CheckedRunnable, CheckedSupplier, CheckedConsumer or CompletionStage,剩下的就是限流相关的接口了。

  • acquirePermission阻塞的获取给定数量的令牌,直到获取到令牌或者线程发生中断
  • reservePermission预定给定数量的令牌,返回线程需要等待的时间,如果返回负数,则表示预定失败,因为timeoutDuration配置的等待时间小于需要获取令牌的时间。
  • drainPermissions 扔掉当前周期内所有的令牌
  • decorateCallable 我们以这个方法为例,主要调用waitForPermission方法来获取令牌,之后调用方法,成功了调用onSuccess,失败了回调onError
  • onSuccess、onError、onResult最终都调用了drainIfNeeded
  • drainIfNeeded 主要是根据配置来判断是否丢弃当前周期的令牌,如果丢弃还是调用drainPermissions
  • waitForPermission 在timeoutDuration时间内等待给定数量的令牌,可以看出最终还是调用acquirePermission方法,如果线程发生中断,则抛出异常,未获取到令牌,也抛出异常

接下来我们就来重点分析下RateLimiter的实现类AtomicRateLimiter

3. 限流算法核心实现类AtomicRateLimiter

AtomicRateLimiter是线程安全的原子计数的限流算法

    // 记录限流器启动时的系统时间(纳秒)
    private long nanoTimeStart;
    // 限流器名称
    private final String name;
    // 等待线程数
    private final AtomicInteger waitingThreads;
    // 限流器状态
    private final AtomicReference<State> state;
    private final Map<String,String> tags;
    // 时间处理器
    private final RateLimiterEventProcessor eventProcessor;
  • nanoTimeStart: 记录系统启动的时间
  • name: 限流器名称
  • waitingThreads 等待获取令牌的线程数
  • state 线程安全的状态

State类是限流器状态类,主要有4个重要字段:

        // 限流器配置
        private final RateLimiterConfig config;
        //当前的周期号
        private final long activeCycle;
        // 当前周期下的token
        private final int activePermissions;
        // 没有可用的token时,获取下一个周期的token,需等待的时间(纳秒)
        private final long nanosToWait;
  • config: 限流器相关的配置
  • activeCycle: 当前的周期号,初始为0,在AtomicRateLimiter创建的时候设置。
  • activePermissions:每个周期的令牌数
  • nanosToWait:获取下一个令牌需要等待的时间,初始为0,刚开始不需要线程等待

接下来我们重点分析下获取令牌的过程 acquirePermission(int permits)

    public boolean acquirePermission(int permits) {
        // 获取线程等待的超时时间(纳秒)
        long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();
        // 进入下一个状态
        State modifiedState = updateStateWithBackOff(permits,timeoutInNanos);
        boolean result = waitForPermissionIfNecessary(timeoutInNanos,modifiedState.nanosToWait);
        publishRateLimiterAcquisitionEvent(result,permits);
        return result;
    }

首先获取配置中线程等待的时间,之后调用updateStateWithBackOff在给定的等待时间内获取令牌,线程获取到了当前的状态则需要判断是否需要等待及返回。

updateStateWithBackOff方法循环原子的更新state状态,直到更新成功

    private State updateStateWithBackOff(int permits, long timeoutInNanos) {
        AtomicRateLimiter.State prev;
        AtomicRateLimiter.State next;
        do{
            prev = state.get();
            // 计算下一个状态
            next = calculateNextState(permits,timeoutInNanos,prev);
        }while (!compareAndSet(prev,next));
        return next;
    }

    private boolean compareAndSet(final State current, final State next) {
        if (state.compareAndSet(current, next)) {
            return true;
        }
        parkNanos(1); // back-off
        return false;
    }

核心实现在calculateNextState方法中

// 从当前状态计算下一个状态
1  private State calculateNextState(final int permits,final long timeoutInNanos, final State activeState) {
        // 时间周期,默认500纳秒
2       long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriod().toNanos();
        // 每个周期允许的token数,默认50个
3       int permissionsPerCycle = activeState.config.getLimitForPeriod();

        // 从限流器启动到当前,经过的纳秒数
4       long currentNanos = currentNanoTime();
        // 周期号,计算当前应该属于第几个周期
5       long currentCycle = currentNanos / cyclePeriodInNanos;

        // 将当前状态的属性暂时赋值给下一个状态
6       long nextCycle = activeState.activeCycle;
        // 有可能为负值
7       int nextPermissions = activeState.activePermissions;
        // 判断下一个状态的周期是否与计算的实际周期相同,不同则重新计算周期号及允许的token数
8       if(nextCycle != currentCycle) {
9            long elapsedCycles = currentCycle - nextCycle;
10          long accumulatedPermissions = elapsedCycles * permissionsPerCycle;
11          nextCycle = currentCycle;
12          nextPermissions = (int)min(nextPermissions + accumulatedPermissions,permissionsPerCycle);
13      }
        // 计算获取下一个token所需等待的时间
14      long nextNanosToWait = nanosToWaitForPermission(permits,cyclePeriodInNanos,permissionsPerCycle,nextPermissions,currentNanos,currentCycle);
        // 生成新的状态
15      State nextState = reservePermission(activeState.config,permits,timeoutInNanos,nextCycle,nextPermissions,nextNanosToWait);
16      return nextState;
17   }

calculateNextState方法从输入的参数: 需要获取的令牌数permits、线程可以等待的时间timeoutInNanos、当前状态activeState来生成新的状态,新的状态中包含了当前线程能获取令牌的时间,来决定当前线程是否需要等待。

1~5 行用来计算系统启动以来在第几个时间周期currentNanoTime方法用来计算经过的纳秒数。
经过的时间 / 每个周期的时间 = 当前所属于那个周期

  private long currentNanoTime() {
        return nanoTime() - nanoTimeStart;
    }

6~7行 第6行是获取现在状态中保存的周期,第7行获取状态中剩余的令牌数,可能为负值是因为在线程等待时间内,需要给线程保留令牌。

  • 如下图所示,如果当前令牌桶中没有令牌,下一个令牌的发放时间是第3个周期,而在第二个周期的时候有一个线程T1请求令牌该如何处理呢?


    image.png
  • 对于这个请求令牌的线程而言,很显然需要等待1个周期,因为1个周期之后(第3个周期)它就能拿到令牌了。此时需要注意的点是下一个令牌发放的令牌也需要增加1个周期,因为第3个发放的令牌已经被线程T1预占了。如下图所示。


    image.png
  • 假设T1在预占了第3个周期的令牌之后,马上又有一个新的线程T2请求令牌,如下图所示。


    image.png
  • 很明显,由于下一个令牌的时间是第4个周期,所以T2线程需要等待2个周期,才能获取到令牌,同时由于线程T2预占了第4个周期的令牌,所以下一个令牌的产生时间还是1个周期,完全处理之后,如下图所示。
image.png

8~13行 是判断当前的周期和状态State中保存的周期是否是同一个,如果不是,则需要计算新的周期号及令牌数。

  • 第9行计算当前状态中的周期到现在的周期还差多少个周期
  • 第10行计算当前状态中的周期到现在的周期总共能预占多少个令牌
  • 第11行为需要设置的新的周期号
  • 第12行计算需要设置新的令牌数,计算方式为每个周期的令牌数和(上一个状态周期到现在总共能预占的令牌数 + 原来状态中的令牌数)取最小值
    14~17行用来获取token需要等待的最少时间并生成新的状态返回。
  • 14行调用nanosToWaitForPermission方法来计算需要等待的最少时间
    private long nanosToWaitForPermission(final int permits, final long cyclePeriodInNanos,
                                          final int permissionsPerCycle,
                                          final int availablePermissions, final long currentNanos, final long currentCycle) {
        if (availablePermissions >= permits) {
            return 0L;
        }
        long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos;
        long nanosToNextCycle = nextCycleTimeInNanos - currentNanos;
        int permissionsAtTheStartOfNextCycle = availablePermissions + permissionsPerCycle;
        int fullCyclesToWait = divCeil(-(permissionsAtTheStartOfNextCycle - permits), permissionsPerCycle);
        return (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle;
    }
  • 15行用来给需要等待的线程保留令牌
   private State reservePermissions(final RateLimiterConfig config, final int permits,
                                     final long timeoutInNanos,
                                     final long cycle, final int permissions, final long nanosToWait) {
        boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
        int permissionsWithReservation = permissions;
        if (canAcquireInTime) {
            permissionsWithReservation -= permits;
        }
        return new State(config, cycle, permissionsWithReservation, nanosToWait);
    }

在updateStateWithBackOff方法结束返回新的状态后,需要判断线程通过等待是否能拿到令牌,并发布一个事件,在waitForPermissionIfNecessary中,如果能拿到令牌返回ture,否则返回false。

    private boolean waitForPermissionIfNecessary(long timeoutInNanos, long nanosToWait) {
        boolean canAcquireImmediately = nanosToWait <= 0;
        boolean canAcquireInTime = timeoutInNanos >= nanosToWait;

        // 如果无需等待,则返回true,表示拿到token
        if(canAcquireImmediately) {
            return true;
        }

        if(canAcquireInTime) {
            return waitForPermission(nanosToWait);
        }
        waitForPermission(timeoutInNanos);
        return false;
    }

waitForPermissionIfNecessary方法主要是判断线程是否需要等待,如果最新返回的State中需要等待时间小于0,表示立即可以获得令牌,返回true。否则判断线程可以等待的时间是否大于State中需要等待的时间,如果大于则只需要等待State中的时间即可,否则等待Config中配置的时间。在waitForPermission实现具体的等待逻辑。

private boolean waitForPermission(final long nanosToWait) {
        // 等待线程数加一
        waitingThreads.incrementAndGet();
        long deadline = currentNanoTime() + nanosToWait;
        boolean wasInterrupted = false;
        // 阻塞到下一个周期,或者当前线程被中断
        while(currentNanoTime() < deadline && !wasInterrupted) {
            long sleepBlockDuration = deadline - currentNanoTime();
            parkNanos(sleepBlockDuration);
            wasInterrupted = Thread.interrupted();
        }
        waitingThreads.decrementAndGet();
        if(wasInterrupted) {
            currentThread().interrupt();
        }
        return !wasInterrupted;
    }

waitForPermission方法中增加等待线程数,计算线程等待的截止时间,然后线程阻塞到截止时间,到了截止时间后,表示当前线程可以获取令牌,则把等待线程数减1,当前线程如果发生了中断,则直接报错返回线程,未发生中断则返回true。

4.总结

AtomicRateLimiter限流算法中,通过计算当前周期线程能否获取到令牌,如果能获取到,则立即返回,否则比较线程等待时间是否大于能获取到令牌的时间,如果大于,则获取不到。并且在计算获取令牌的方法中会计算线程等待获取令牌的最小时间,即获取线程需要等待的时间和可以发放给当前线程的最小时间,让线程取最小值进行等待。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容