RateLimiter、Bulkhead、Retry详解

这几个组件的配置、注册器、事件等都和CircuitBreaker类似,不再赘述,主要说一下他们的工作原理

RateLimiter

RateLimiter提供了两套实现,一个是基于信号量的,一个是基于令牌桶的:

rateLimiter

Semaphore

使用一个计数信号量,当请求超过计数值,则在超时时间内等待,基于java concurrent并发包中的Semaphore实现。主要实现逻辑如下:

public boolean acquirePermission() {
    try {
        // 通过semaphore的tryAcquire()方法获取许可,同时设定等待时间
        boolean success = semaphore.tryAcquire(
            rateLimiterConfig.get().getTimeoutDuration().toNanos(), TimeUnit.NANOSECONDS);
        publishRateLimiterEvent(success);
        return success;
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        publishRateLimiterEvent(false);
        return false;
    }
}

void refreshLimit() {
    // 通过配置的刷新时间释放semaphore
    int permissionsToRelease = this.rateLimiterConfig.get()
        .getLimitForPeriod() - semaphore.availablePermits();
    semaphore.release(permissionsToRelease);
}

令牌桶

令牌桶的运行如下图所示:

rate_limiter

在一个cycle内会放入新令牌,线程进入时会从中拿取令牌,如果令牌没有了,就会等待下一个cycle,如果等待时间比较长还可能等待多个cycle,如果拿到令牌的时间超过等待时间就会拒绝。具体实现如下:

核心的方法是acquirePermission(),每次请求进来都会调用该方法。

@Override
public boolean acquirePermission() {
    // 获取线程可以等待的最大时间
    long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();
    // 更新令牌桶的状态
    State modifiedState = updateStateWithBackOff(timeoutInNanos);
    // 判断是否能等待拿到令牌
    boolean result = waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait);
    // 发布事件
    publishRateLimiterEvent(result);
    return result;
}

看看updateStateWithBackOff(timeoutInNanos)方法是如何更新桶的状态的:

// 这里使用compareAndSet来更新下一个状态,state是一个原子引用
private State updateStateWithBackOff(final long timeoutInNanos) {
    AtomicRateLimiter.State prev;
    AtomicRateLimiter.State next;
    do {
        prev = state.get();
        next = calculateNextState(timeoutInNanos, prev);
    } while (!compareAndSet(prev, next));
    return next;
}

获取状态的核心在calculateNextState(timeoutInNanos, prev)方法:

private State calculateNextState(final long timeoutInNanos, final State activeState) {
    // 分别拿取令牌刷新时间和每一个周期的时间
    long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriod().toNanos();
    int permissionsPerCycle = activeState.config.getLimitForPeriod();

    // 拿到当前的时间,并计算当前时间经过了多少个cycle
    long currentNanos = currentNanoTime();
    long currentCycle = currentNanos / cyclePeriodInNanos;

    // 目前的cycle其实是前一个state的cycle
    long nextCycle = activeState.activeCycle;
    // 拿取前一个state剩余的令牌
    int nextPermissions = activeState.activePermissions;
    // 如果两个cycle不一样
    if (nextCycle != currentCycle) {
        // 计算中间相差几个cycle
        long elapsedCycles = currentCycle - nextCycle;
        // cycle数*每个cycle更新的令牌得到增加的令牌总数
        long accumulatedPermissions = elapsedCycles * permissionsPerCycle;
        // cycle更新
        nextCycle = currentCycle;
        // 令牌数不能超过限流数,所以取令牌总数和限流数的较小者
        nextPermissions = (int) min(nextPermissions + accumulatedPermissions, permissionsPerCycle);
    }
    // 计算nextPermissions变为正数所需等待的时间
    long nextNanosToWait = nanosToWaitForPermission(
        cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos, currentCycle
    );
    // 更新下一个状态的信息,即从桶中拿令牌(如果拿到令牌等待的时间小于最大等待时间,令牌数就会-1)
    State nextState = reservePermissions(activeState.config, timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait);
    return nextState;
}

拿到下一个状态就可以判断线程能否进入了,waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait)中会分三种情况返回结果:

private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) {
    boolean canAcquireImmediately = nanosToWait <= 0;
    boolean canAcquireInTime = timeoutInNanos >= nanosToWait;

    // 如果不需要等待,就直接返回true
    if (canAcquireImmediately) {
        return true;
    }
    // 如果需要等待但等待时间小于最大等待时间,就等待,如果线程没有被中断,则会返回true,否则返回false
    if (canAcquireInTime) {
        return waitForPermission(nanosToWait);
    }
    // 如果最终拿不到令牌,就在最大等待时间结束后返回false
    waitForPermission(timeoutInNanos);
    return false;
}

令牌桶是RateLimeter默认的实现,它的好处是比信号量的RateLimeter更高效,因为桶中的令牌可以被减为负数,它会提前计算在限定时间内能否拿到令牌,而不是信号量为0之后就一直阻塞。

Bulkhead

Bulkhead分为两种,固定线程池和信号量BulkheadThreadPoolBulkhead,由于线程池会有额外开销,不便管理,所以一般使用信号量的Bulkhead。信号量的Bulkhead非常简单,主要方法就是两个:

//在线程进入时调用semaphore的tryAcquire()方法
boolean tryEnterBulkhead() {
    boolean callPermitted;
    long timeout = config.getMaxWaitDuration().toMillis();

    if (timeout == 0) {
        callPermitted = semaphore.tryAcquire();
    } else {
        try {
            callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            callPermitted = false;
        }
    }
    return callPermitted;
}

//结束时调用semaphore的release()方法
public void onComplete() {
    semaphore.release();
    publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
}

Retry

Retry的实现只有一种,也很简单,先从包装函数看:

static <T> CheckedFunction0<T> decorateCheckedSupplier(Retry retry, CheckedFunction0<T> supplier) {
    return () -> {
        Retry.Context<T> context = retry.context();
        do try {
            // 拿到执行结果放到result中判断结果
            T result = supplier.apply();
            final boolean validationOfResult = context.onResult(result);
            if (!validationOfResult) {
                // 如果有效,交给onSuccess()
                context.onSuccess();
                return result;
            }
        } catch (Exception exception) {
            // 如果抛出错误,交给onError()
            context.onError(exception);
            // 否则进行重试
        } while (true);
    };
}

OnResult(result)做了几件事:

public boolean onResult(T result) {
    // 1.先由谓词判断结果是否有效,若没有谓词函数表明结果有效,返回false
    if (null != resultPredicate && resultPredicate.test(result)) {
        // 2.如果谓词判断结果要重试则计算重试次数
        int currentNumOfAttempts = numOfAttempts.incrementAndGet();
        if (currentNumOfAttempts >= maxAttempts) {
            //超出重试次数就返回false表明结果有效
            return false;
        } else {
            //否则等待一段时间,返回true,表明结果无效
            waitIntervalAfterFailure(currentNumOfAttempts, null);
            return true;
        }
    }
    return false;
}

onSuccess()主要是记录一些结果到事件中。

onError()则对异常进行判断:

public void onError(Exception exception) throws Exception {
    // 由谓词函数判断是否需要重试
    if (exceptionPredicate.test(exception)) {
        //如果需要重试,记录一下
        lastException.set(exception);
        //由throwOrSleepAfterException()判断是重试还是抛出异常,主要根据是否超出重试次数来判断
        throwOrSleepAfterException();
    } else {
        //如果不需要重试,记录一下,然后抛出异常
        failedWithoutRetryCounter.increment();
        publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), exception));
        throw exception;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容