目录
1.导读-为什么需要限流
因为资源是有限的(如服务器资源),当资源供不应求就会发现一系列问题(如请求延迟),因此我们通常会对资源增加访问的限制,以现实为例,如为防止十字路流量过大且保证安全性,引入红绿灯机制,到下班时间段,甚至特定路段会安排交警指挥,还有春运的交通限流、超市排队结算等等,可以说限流机制在日常生活中都是无处不在。那放到API概念上时,常见的限流应用场景有哪些?
- 防止流量突发时,服务出现雪崩,如抢购或DDOS攻击
- 用户SLA的分级,如针对免费用户和付费用户,提供不同的API QPPS额度
- API市场的API商品,会通过限流来满足商品库存的调用限制
2.限流指标
限流有两个重要概念:阈值和拒绝策略,下面是对常用阈值的介绍
2.1TPS
TPS(Transaction Per Second)即事务数/秒。与mysql的事务不同,这里指一个客端端向服务器发送请求做出响应的过程,客户端在发送请求开始计算到服务器响应后结束计算。
在单机系统上一个请求完成一笔事务,但是在分布式系统中,一笔请求通常需要多个系统配合(一笔事务多个请求),有的服务需要异步返回,因此完成一笔事务花费时间可能很长,因此如果按照TPS进行限流,时间粒度很大,很难准确评估系统的响应性能。
2.2HPS
每秒请求数,指每秒钟服务端收到客户端的请求数量。单机系统一笔请求完成一笔事务,TPS和QPS相同,分布式不同,目前主流的限流方法多采用HPS作为限流指标
2.3QPS
服务端每秒能够响应的客户端查询请求数量,同理单机HPS和QPS相同
2.4 性能压测工具wrk
3.限流方法
即拒绝策略,限流方法根据限流范围分为单机限流和分布式限流,下面以单机限流进行分析(分布式之后补充)
3.1流量计数器(固定窗口限流)
特点:
- 将时间划分固定大小窗口,如每秒一个
- 每个窗口记录请求数量
- 请求到达+1
- 请求超过阈值拒绝
- 窗口结束重置请求
最直观的解释,我直接限制每秒请求数量100,超过100就拒绝掉
我这里用gin中间件实现一个最简单的流量计数器
type ratelimitByCountBuilder struct {
count int64
}
func newRateLimit(count int64) *ratelimitByCountBuilder {
//todo 启动协程清空count
builder := &ratelimitByCountBuilder{
count: count,
}
go func() {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
atomic.StoreInt64(&builder.count, count)
}
}
}()
return builder
}
func (builder *ratelimitByCountBuilder) Build() gin.HandlerFunc {
return func(ctx *gin.Context) {
if atomic.LoadInt64(&builder.count) <= 0 {
fmt.Printf("rejection req!\n")
ctx.Abort()
return
}
atomic.AddInt64(&builder.count, -1)
}
}
或者直接使用自带的
gin.LimitRequests(100, time.Second)
原理就是请求过来时先进行判断
缺点是什么?
-
单位时间很难把控,请求分布不均匀
从上面看hps超过100
- 无法应对突发流量,窗口大小是固定的,不够灵活
- 窗口结束时的请求重置可能导致请求的不公平性
3.2滑动窗口
实例代码
特点:
- 确定一个窗口大小和请求数阈值,如1秒
- 每次请求到达请求数+1,如果请求数超过阈值则 拒绝
- 随时间推移窗口滑动,移出过期请求,即检查最早到达的请求,计算与当前时间差,判断是否超过窗口大小
- 窗口大小可以根据流量变化
type limitBySlidingWindowBuilder struct {
windowSize time.Duration //窗口大小
maxRequests int //最大请求数量
requests []time.Time
lock sync.Mutex
}
func newBuilder(size time.Duration, max int) *limitBySlidingWindowBuilder {
return &limitBySlidingWindowBuilder{
windowSize: size,
maxRequests: max,
requests: make([]time.Time, 0, max),
}
}
func (limit *limitBySlidingWindowBuilder) Build() gin.HandlerFunc {
return func(ctx *gin.Context) {
if !limit.allowRequest() {
ctx.Abort()
return
}
}
}
func (limit *limitBySlidingWindowBuilder) allowRequest() bool {
currentTime := time.Now()
limit.lock.Lock()
defer limit.lock.Unlock()
//清理过期请求
if len(limit.requests) > 0 && currentTime.Sub(limit.requests[0]) > limit.windowSize {
limit.requests = limit.requests[1:]
}
//检查请求数
if len(limit.requests) >= limit.maxRequests {
fmt.Println("reject")
return false
}
limit.requests = append(limit.requests, currentTime)
return true
}
// todo 自定义动态调整窗口大小
func (limit *limitBySlidingWindowBuilder) update(t time.Duration) {
limit.lock.Lock()
defer limit.lock.Unlock()
limit.windowSize = t
}
缺点:
- 内存消耗:当随时间推移,窗口大小较大或请求频率高的情况下,内存消耗增加
-
更多CPU计算开销
3.3漏桶
demo
是对滑动窗口的改进:
- 确定一个固定的漏桶容量,表示存储最大请求数
- 漏桶速率,表示每秒可以处理请求数
- 当请求到达,讲请求放入漏桶
- 漏桶以固定速率从漏桶中消费请求
-
如果漏桶满了则丢弃或延迟处理
type leakybucket struct {
rate float64 //漏桶速度 请求数/秒
size int
water int //当前水量
lastLeakMs int64 //上次漏水时间戳
lock sync.Locker
}
func newleak(rate float64, size int) *leakybucket {
return &leakybucket{
rate: rate,
size: size,
water: 0,
lastLeakMs: time.Now().Unix(),
}
}
func (limit *leakybucket) Build() gin.HandlerFunc {
return func(ctx *gin.Context) {
if !limit.allow() {
ctx.Abort()
return
}
}
}
func (limit *leakybucket) allow() bool {
now := time.Now().Unix()
limit.lock.Lock()
defer limit.lock.Unlock()
//之前漏出的水量
leakAmount := int(float64(now-limit.lastLeakMs) / 1000 * limit.rate)
if leakAmount > 0 {
if leakAmount > limit.water {
limit.water = 0
} else {
limit.water -= leakAmount
}
}
//计算当前是否超过容量
if limit.water > limit.size {
limit.water--
fmt.Println("reject")
return false
}
limit.water++
limit.lastLeakMs = now
return true
}
3.4 令牌桶
令牌桶算法就跟病人去医院看病一样,找医生之前需要先挂号,而医院每天放的号是有限的。当天的号用完了,第二天又会放一批号
具体用法参考"golang.org/x/time/rate"
4.分布式限流
4.1基于中心化
即通过一个中心化的限流器控制所有服务器的请求
- 选择一个中心化组件,如redis
- 定义限流规则,如设置每秒最大请求数,每个IP单位时间最多访问次数
- 对于每个请求要先向redis请求令牌
- 没拿到就被限流
lua脚本如下
local ratelimit_info = redis.pcall('HMGET',KEYS[1],'last_time','current_token')
local last_time = ratelimit_info[1]
local current_token = tonumber(ratelimit_info[2])
local max_token = tonumber(ARGV[1])
local token_rate = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
if current_token == nil then
current_token = max_token
last_time = current_time
else
local past_time = current_time-last_time
if past_time>1000 then
current_token = current_token+token_rate
last_time = current_time
end
## 防止溢出
if current_token>max_token then
current_token = max_token
last_time = current_time
end
end
local result = 0
if(current_token>0) then
result = 1
current_token = current_token-1
last_time = current_time
end
redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)
return result