go流量控制

目录

链接地址

1.导读-为什么需要限流

因为资源是有限的(如服务器资源),当资源供不应求就会发现一系列问题(如请求延迟),因此我们通常会对资源增加访问的限制,以现实为例,如为防止十字路流量过大且保证安全性,引入红绿灯机制,到下班时间段,甚至特定路段会安排交警指挥,还有春运的交通限流、超市排队结算等等,可以说限流机制在日常生活中都是无处不在。那放到API概念上时,常见的限流应用场景有哪些?

  • 防止流量突发时,服务出现雪崩,如抢购或DDOS攻击
  • 用户SLA的分级,如针对免费用户和付费用户,提供不同的API QPPS额度
  • API市场的API商品,会通过限流来满足商品库存的调用限制

2.限流指标

限流有两个重要概念:阈值拒绝策略,下面是对常用阈值的介绍

2.1TPS

TPS(Transaction Per Second)即事务数/秒。与mysql的事务不同,这里指一个客端端向服务器发送请求做出响应的过程,客户端在发送请求开始计算到服务器响应后结束计算。
在单机系统上一个请求完成一笔事务,但是在分布式系统中,一笔请求通常需要多个系统配合(一笔事务多个请求),有的服务需要异步返回,因此完成一笔事务花费时间可能很长,因此如果按照TPS进行限流,时间粒度很大,很难准确评估系统的响应性能。

2.2HPS

每秒请求数,指每秒钟服务端收到客户端的请求数量。单机系统一笔请求完成一笔事务,TPS和QPS相同,分布式不同,目前主流的限流方法多采用HPS作为限流指标

2.3QPS

服务端每秒能够响应的客户端查询请求数量,同理单机HPS和QPS相同

2.4 性能压测工具wrk

wrk

3.限流方法

即拒绝策略,限流方法根据限流范围分为单机限流分布式限流,下面以单机限流进行分析(分布式之后补充)

3.1流量计数器(固定窗口限流)

特点:

  • 将时间划分固定大小窗口,如每秒一个
  • 每个窗口记录请求数量
  • 请求到达+1
  • 请求超过阈值拒绝
  • 窗口结束重置请求

最直观的解释,我直接限制每秒请求数量100,超过100就拒绝掉
我这里用gin中间件实现一个最简单的流量计数器

type ratelimitByCountBuilder struct {
    count int64
}

func newRateLimit(count int64) *ratelimitByCountBuilder {
    //todo 启动协程清空count
    builder := &ratelimitByCountBuilder{
        count: count,
    }

    go func() {
        ticker := time.NewTicker(time.Second)
        for {
            select {
            case <-ticker.C:
                atomic.StoreInt64(&builder.count, count)
            }
        }
    }()
    return builder
}

func (builder *ratelimitByCountBuilder) Build() gin.HandlerFunc {
    return func(ctx *gin.Context) {
        if atomic.LoadInt64(&builder.count) <= 0 {
            fmt.Printf("rejection req!\n")
            ctx.Abort()
            return
        }

        atomic.AddInt64(&builder.count, -1)
    }
}

或者直接使用自带的

gin.LimitRequests(100, time.Second)

原理就是请求过来时先进行判断
缺点是什么?

  • 单位时间很难把控,请求分布不均匀


    image.png

    从上面看hps超过100

  • 无法应对突发流量,窗口大小是固定的,不够灵活
  • 窗口结束时的请求重置可能导致请求的不公平性

3.2滑动窗口

实例代码
特点:

  • 确定一个窗口大小和请求数阈值,如1秒
  • 每次请求到达请求数+1,如果请求数超过阈值则 拒绝
  • 随时间推移窗口滑动,移出过期请求,即检查最早到达的请求,计算与当前时间差,判断是否超过窗口大小
  • 窗口大小可以根据流量变化
type limitBySlidingWindowBuilder struct {
    windowSize  time.Duration //窗口大小
    maxRequests int           //最大请求数量
    requests    []time.Time
    lock        sync.Mutex
}

func newBuilder(size time.Duration, max int) *limitBySlidingWindowBuilder {
    return &limitBySlidingWindowBuilder{
        windowSize:  size,
        maxRequests: max,
        requests:    make([]time.Time, 0, max),
    }
}

func (limit *limitBySlidingWindowBuilder) Build() gin.HandlerFunc {
    return func(ctx *gin.Context) {
        if !limit.allowRequest() {
            ctx.Abort()
            return
        }
    }
}

func (limit *limitBySlidingWindowBuilder) allowRequest() bool {
    currentTime := time.Now()
    limit.lock.Lock()
    defer limit.lock.Unlock()

    //清理过期请求
    if len(limit.requests) > 0 && currentTime.Sub(limit.requests[0]) > limit.windowSize {
        limit.requests = limit.requests[1:]
    }

    //检查请求数
    if len(limit.requests) >= limit.maxRequests {
        fmt.Println("reject")
        return false
    }

    limit.requests = append(limit.requests, currentTime)

    return true
}

// todo 自定义动态调整窗口大小
func (limit *limitBySlidingWindowBuilder) update(t time.Duration) {
    limit.lock.Lock()
    defer limit.lock.Unlock()
    limit.windowSize = t

}

缺点:

  • 内存消耗:当随时间推移,窗口大小较大或请求频率高的情况下,内存消耗增加
  • 更多CPU计算开销


    image.png

3.3漏桶

demo
是对滑动窗口的改进:

  • 确定一个固定的漏桶容量,表示存储最大请求数
  • 漏桶速率,表示每秒可以处理请求数
  • 当请求到达,讲请求放入漏桶
  • 漏桶以固定速率从漏桶中消费请求
  • 如果漏桶满了则丢弃或延迟处理
    image.png
type leakybucket struct {
    rate       float64 //漏桶速度 请求数/秒
    size       int
    water      int   //当前水量
    lastLeakMs int64 //上次漏水时间戳
    lock       sync.Locker
}

func newleak(rate float64, size int) *leakybucket {
    return &leakybucket{
        rate:       rate,
        size:       size,
        water:      0,
        lastLeakMs: time.Now().Unix(),
    }
}

func (limit *leakybucket) Build() gin.HandlerFunc {
    return func(ctx *gin.Context) {
        if !limit.allow() {
            ctx.Abort()
            return
        }
    }
}

func (limit *leakybucket) allow() bool {
    now := time.Now().Unix()

    limit.lock.Lock()
    defer limit.lock.Unlock()

    //之前漏出的水量
    leakAmount := int(float64(now-limit.lastLeakMs) / 1000 * limit.rate)

    if leakAmount > 0 {
        if leakAmount > limit.water {
            limit.water = 0
        } else {
            limit.water -= leakAmount
        }
    }

    //计算当前是否超过容量
    if limit.water > limit.size {
        limit.water--
        fmt.Println("reject")
        return false
    }

    limit.water++

    limit.lastLeakMs = now
    return true
}

3.4 令牌桶

令牌桶算法就跟病人去医院看病一样,找医生之前需要先挂号,而医院每天放的号是有限的。当天的号用完了,第二天又会放一批号


image.png

具体用法参考"golang.org/x/time/rate"

4.分布式限流

4.1基于中心化

即通过一个中心化的限流器控制所有服务器的请求

  • 选择一个中心化组件,如redis
  • 定义限流规则,如设置每秒最大请求数,每个IP单位时间最多访问次数
  • 对于每个请求要先向redis请求令牌
  • 没拿到就被限流
    lua脚本如下
local ratelimit_info = redis.pcall('HMGET',KEYS[1],'last_time','current_token')
local last_time = ratelimit_info[1]
local current_token = tonumber(ratelimit_info[2])
local max_token = tonumber(ARGV[1])
local token_rate = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
if current_token == nil then
  current_token = max_token
  last_time = current_time
else
  local past_time = current_time-last_time
  
  if past_time>1000 then
      current_token = current_token+token_rate
      last_time = current_time
  end

  ## 防止溢出
  if current_token>max_token then
    current_token = max_token
    last_time = current_time
  end
end

local result = 0
if(current_token>0) then
  result = 1
  current_token = current_token-1
  last_time = current_time
end
redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)
return result

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容