uber在Github上开源了一套用于服务限流的go语言库ratelimit, 该组件基于Leaky Bucket(漏桶)实现。
我在之前写过《Golang限流器time/rate实现剖析》,讲了Golang标准库中提供的基于Token Bucket实现限流组件的time/rate
原理,同时也讲了限流的一些背景。
相比于TokenBucket,只要桶内还有剩余令牌,调用方就可以一直消费。而Leaky Bucket相对来说比较严格,调用方只能严格按照这个间隔顺序进行消费调用。(实际上,uber-go对这个限制也做了一些优化,具体可以看下文详解)
还是老规矩,在正式讲其实现之前,我们先看下ratelimit的使用方法。
ratelimit的使用
我们直接看下uber-go官方库给的例子:
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
在这个例子中,我们给定限流器每秒可以通过100个请求,也就是平均每个请求间隔10ms。
因此,最终会每10ms打印一行数据。输出结果如下:
// Output:
// 0 0
// 1 10ms
// 2 10ms
// 3 10ms
// 4 10ms
// 5 10ms
// 6 10ms
// 7 10ms
// 8 10ms
// 9 10ms
基本实现
要实现以上每秒固定速率的目的,其实还是比较简单的。
在ratelimit的New函数中,传入的参数是每秒允许请求量(RPS)。
我们可以很轻易的换算出每个请求之间的间隔:
limiter.perRequest = time.Second / time.Duration(rate)
以上limiter.perRequest
指的就是每个请求之间的间隔时间。
如下图,当请求1处理结束后, 我们记录下请求1的处理完成的时刻, 记为limiter.last
。
稍后请求2到来, 如果此刻的时间与limiter.last
相比并没有达到perRequest
的间隔大小,那么sleep一段时间即可。
[图片上传失败...(image-4b37d-1574514204743)]
对应ratelimit的实现代码如下:
sleepFor = t.perRequest - now.Sub(t.last)
if sleepFor > 0 {
t.clock.Sleep(sleepFor)
t.last = now.Add(sleepFor)
} else {
t.last = now
}
最大松弛量
我们讲到,传统的Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go对Leaky Bucket做了一些改良,引入了最大松弛量(maxSlack)的概念。
我们先理解下整体背景: 假如我们要求每秒限定100个请求,平均每个请求间隔10ms。但是实际情况下,有些请求间隔比较长,有些请求间隔比较短。如下图所示:
[图片上传失败...(image-81ea99-1574514204743)]
请求1完成后,15ms后,请求2才到来,可以对请求2立即处理。请求2完成后,5ms后,请求3到来,这个时候距离上次请求还不足10ms,因此还需要等待5ms。
但是,对于这种情况,实际上三个请求一共消耗了25ms才完成,并不是预期的20ms。在uber-go实现的ratelimit中,可以把之前间隔比较长的请求的时间,匀给后面的使用,保证每秒请求数(RPS)即可。
对于以上case,因为请求2相当于多等了5ms,我们可以把这5ms移给请求3使用。加上请求3本身就是5ms之后过来的,一共刚好10ms,所以请求3无需等待,直接可以处理。此时三个请求也恰好一共是20ms。
如下图所示:
[图片上传失败...(image-994a4e-1574514204743)]
在ratelimit的对应实现中很简单,是把每个请求多余出来的等待时间累加起来,以给后面的抵消使用。
t.sleepFor += t.perRequest - now.Sub(t.last)
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
注意:这里跟上述代码不同的是,这里是+=
。而同时t.perRequest - now.Sub(t.last)
是可能为负值的,负值代表请求间隔时间比预期的长。
当t.sleepFor > 0
,代表此前的请求多余出来的时间,无法完全抵消此次的所需量,因此需要sleep相应时间, 同时将t.sleepFor
置为0。
当t.sleepFor < 0
,说明此次请求间隔大于预期间隔,将多出来的时间累加到t.sleepFor
即可。
但是,对于某种情况,请求1完成后,请求2过了很久到达(好几个小时都有可能),那么此时对于请求2的请求间隔now.Sub(t.last)
,会非常大。以至于即使后面大量请求瞬时到达,也无法抵消完这个时间。那这样就失去了限流的意义。
为了防止这种情况,ratelimit就引入了最大松弛量(maxSlack)的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现。
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
ratelimit中maxSlack的值为-10 * time.Second / time.Duration(rate)
, 是十个请求的间隔大小。我们也可以理解为ratelimit允许的最大瞬时请求为10。
高级用法
ratelimit的New函数,除了可以配置每秒请求数(QPS), 其实还提供了一套可选配置项Option。
func New(rate int, opts ...Option) Limiter
Option的类型为type Option func(l *limiter)
, 也就是说我们可以提供一些这样类型的函数,作为Option,传给ratelimit, 定制相关需求。
但实际上,自定义Option的用处比较小,因为limiter
结构体本身就是个私有类型,我们并不能拿它做任何事情。
我们只需要了解ratelimit目前提供的两个配置项即可:
WithoutSlack
我们上文讲到ratelimit中引入了最大松弛量的概念,而且默认的最大松弛量为10个请求的间隔时间。
但是确实会有这样需求场景,需要严格的限制请求的固定间隔。那么我们就可以利用WithoutSlack来取消松弛量的影响。
limiter := ratelimit.New(100, ratelimit.WithoutSlack)
WithClock(clock Clock)
我们上文讲到,ratelimit的实现时,会计算当前时间与上次请求时间的差值,并sleep相应时间。
在ratelimit基于go标准库的time实现时间相关计算。如果有精度更高或者特殊需求的计时场景,可以用WithClock来替换默认时钟。
通过该方法,只要实现了Clock的interface,就可以自定义时钟了。
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
clock &= MyClock{}
limiter := ratelimit.New(100, ratelimit.WithClock(clock))