令牌桶算法及实现(二)

接上篇。Guava的令牌桶的实现中,包括一条设计哲学,需要大家注意:它允许瞬间的流量波峰超过QPS,但瞬间过后的请求将会等待较长的时间来缓解上次的波峰,以使得平均的QPS等于预定值。
RateLimiter类提供了令牌桶的接口,它是一个抽象类,其子类有SmoothRateLimiter(也是个抽象类)以及孙子类SmoothBursty,SmoothWarmingUp。SmoothRateLimiter类实现了算法的核心部分,因次我们暂且只讨论SmoothRateLimiter和其实现类SmoothBursty。RateLimiter都是通过静态的create函数实例化。以create(double permitsPerSecond)为例。参数permitsPerSecond为配置的QPS。该方法简洁明了,屏蔽了很多用户无需关心的细节。

public static RateLimiter create(double permitsPerSecond) {
      return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }

接着该方法调用了create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer())方法(该方法由于是包的访问权限,在实际的项目中,基本不会直接调用),同时创建了一个StopWatch,自动启动。

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

该方法创建了SmoothBursty实例,up-casting为RateLimiter。maxBurstSeconds固定为1,说明令牌桶中所能存储的的最大令牌数是1*QPS。接着调用setRate方法,该方法初始化一些重要的参数:

public final void setRate(double permitsPerSecond) {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

主要实现在SmoothRateLimiter中:

@Override
  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

其中resync方法是一个关键的方法,在请求令牌时也会用到,后面还会说明:

 void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }

从中可以看出,如果nowMicros大于nextFreeTicketMicros,会重新计算nextFreeTicketMicros和storedPermit的值。设置stableIntervalMicros,该字段表示1/QPS,即生产令牌的速率。
接着调用doSetRate方法,该方法在SmoothBursty类中。

@Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
      } else {
        storedPermits =
          (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
}

初始化maxPermits和storePermits,后者永远不会大于前者。
到此,rateLimiter初始化完成。除了resync方法,在不重新设置rate的情况,其他方法不在处理请求时用到,暂时忽略。
下面看关键的令牌申请的过程。
首先调用acquire()方法,申请令牌,无参数表示申请一个。

public double acquire() {
    return acquire(1);
  }

接着调用acquire(int permits)方法:

@CanIgnoreReturnValue
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

reserve方法返回获取令牌所需要等待的时间,stopwatch阻塞当前线程,最后返回线程休眠的秒数。如果microsToWait为0,表示立即返回。

final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

reserve需要获取锁才可以操作,这也是令牌桶线程安全的原因,以下操作都在同步代码块中。
继续reserveAndGetWaitLength方法。

final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

首先调用reserveEarliestAvailable,方法名说明了返回值的意义:即返回满足当前请求的最早的时钟,该值大于等于nowMicros。如何保证这一点的呢?我们看该方法:

@Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

这十多行代码是整个算法实现的核心,重点说明:

  1. 首先调用resync(nowMicros),重置nextFreeTicketMicros。如果nowMicros在nextFreeTicketMicros之后,nextFreeTicketMicros=nowMicros,并往storedPermits中增加这段时间能产生的令牌。
    返回值设置为当前的nextFreeTicketMicros。为什么要这样设置呢?因为如果nowMicros大于nextFreeTicketMicros,说明令牌桶肯定能满足需求(无论请求的令牌数目是多少,参见最上面的设计哲学),而resync方法已经修改了nextFreeTicketMicros值为nowMicros值,逐层返回给调用者时,等待时间为0,线程无需等待;反之,如果nowMicros小于等于nextFreeTicketMicros,说明请求过快,线程需要等待,等待的时间就是nextFreeTicketMicros-nowMicros。
  2. 接下来,storedPermitsToSpend代表令牌桶中已有的令牌数,可以用于当前的请求。但未必满足需求。
  3. 其次,freshPermits代表需要新生成的令牌数。如果storedPermits已经满足需求,则freshPermits为0。
  4. 再次,计算新生成令牌需要花费的时间,这些需要后来者偿还。
  5. 然后修改nextFreeTicketMicros的值。
  6. 最后修改storedPermits值。
    至此整个处理过程结束。

经过上面的代码梳理,详细大家对RateLimiter的代码有个比较清晰的认识,但要加深理解,还需要多做debug和test。
Guava包里面包括了很多test case。我们可以把test类单拿出来,根据自己的情况添加相应的case即可。该类是com.google.common.util.concurrent. RateLimiterTest。由于很多类都使用了默认访问权限,我们需要定义一个 com.google.common.util.concurrent包,导入RateLimiterTest类。该类中,guava提供了一个FakeStopwatch的nested class。它能够让时钟按照我们的要求暂停,休眠随意的时长,并记录休眠和请求对应的事件,并已特定的格式输出。例如:R1.00代表请求给定的令牌延迟了1秒;U1.05表示stopwatch休眠1.05秒,即模拟时钟过了1.05秒。例如一个测试通过的case:

    public void testSimple() {
        RateLimiter limiter = RateLimiter.create(5.0, stopwatch);
        limiter.acquire(); // R0.00
        limiter.acquire(); // R0.20
        limiter.acquire(); // R0.20
        stopwatch.sleepMillis(1000); // U1.00
        assertEvents("R0.00", "R0.20", "R0.20", "U1.00");
    }

下面提供一个case,验证下大家的理解。

public void testOneSecondBurst3() {
        RateLimiter limiter = RateLimiter.create(1.0, stopwatch);
        limiter.acquire(1); // R值?
        stopwatch.sleepMillis(1050);//U值?
        limiter.acquire(1); // R值? nowMicros? nextFree?
        stopwatch.sleepMillis(950);
        limiter.acquire(1); // R值? nowMicros? nextFree?
        stopwatch.sleepMillis(1000);
        limiter.acquire(1); // R值? nowMicros? nextFree?
}

关注公众号“码农走向艺术”,回复消息可以获取答案幺:)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容