一。基础知识点说明
Kraots 的限流算法
kratos 借鉴了 Sentinel 项目的自适应限流系统,通过综合分析服务的 cpu 使用率、请求成功的 load(负载), qps 和请求成功的 rt(请求成功的响应耗时) 来做自适应限流保护。从官方文档上看,限流算法要实现的核心目标有如下两点:
- ** 自动 ** 嗅探负载和 qps,减少人工配置 && 干预
- 削顶, 保证超载时系统不被拖垮,并能以高水位 qps 继续运行
自适应限流怎么做
前面我们遇到的主要问题就是每个服务实例的限流阈值实际应该是动态变化的,我们应该根据系统能够承载的最大吞吐量,来进行限流,当当前的流量大于最大吞吐的时候就限制流量进入,反之则允许通过。那现在的问题就是
- 系统的吞吐量该如何计算?
- 什么时候系统的吞吐量就是最大的吞吐量了?
计算吞吐量:利特尔法则 L = λ * W
利特尔法则由麻省理工大学斯隆商学院(MIT Sloan School of Management)的教授 John Little﹐于 1961 年所提出与证明。它是一个有关提前期与在制品关系的简单数学公式,这一法则为精益生产的改善方向指明了道路。 —- MBA 智库百科 (mbalib.com)
如上图所示,如果我们开一个小店,平均每分钟进店 2 个客人(λ),每位客人从等待到完成交易需要 4 分钟(W),那我们店里能承载的客人数量就是 2 * 4 = 8 个人
同理,我们可以将 λ
当做 QPS, W
呢是每个请求需要花费的时间,那我们的系统的吞吐就是 L = λ * W
,所以我们可以使用利特尔法则来计算系统的吞吐量。
什么时候系统的吞吐量就是最大的吞吐量?
首先我们可以通过统计过去一段时间的数据,获取到平均每秒的请求量,也就是 QPS,以及请求的耗时时间,为了避免出现前面 900ms 一个请求都没有最后 100ms 请求特别多的情况,我们可以使用滑动窗口算法来进行统计。
最容易想到的就是我们从系统启动开始,就把这些值给保存下来,然后计算一个吞吐的最大值,用这个来表示我们的最大吞吐量就可以了。但是这样存在一个问题是,我们很多系统其实都不是独占一台机器的,一个物理机上面往往有很多服务,并且一般还存在一些超卖,所以可能第一个小时最大处理能力是 100,但是这台节点上其他服务实例同时都在抢占资源的时候,这个处理能力最多就只能到 80 了
所以我们需要一个数据来做启发阈值,只要这个指标达到了阈值那我们就进入流控当中。常见的选择一般是 CPU、Memory、System Load,这里我们以 CPU 为例
只要我们的 CPU 负载超过 80% 的时候,获取过去 5s 的最大吞吐数据,然后再统计当前系统中的请求数量,只要当前系统中的请求数大于最大吞吐那么我们就丢弃这个请求。
kratos 自适应限流分析
二。实际代码及使用解释
限流公式
-
cpu > 800
表示 CPU 负载大于 80% 进入限流,这里是800,而不是0.8,因为在计算的时候,源码中乘了个1e3,地址在 cgroupCPU 的 Usage方法中aegis/pkg/cpu/cgroup_cpu.go
文件中 -
(Now - PrevDrop) < 1s
这个表示只要触发过 1 次限流,那么 1s 内都会去做限流的判定,这是为了避免反复出现限流恢复导致请求时间和系统负载产生大量毛刺 -
(MaxPass * MinRt * windows / 1000) < InFlight
判断当前负载是否大于最大负载-
InFlight
表示当前系统中有多少请求 -
(MaxPass * MinRt * windows / 1000)
表示过去一段时间的最大负载 -
MaxPass
表示最近 5s 内,单个采样窗口中最大的请求数 -
MinRt
表示最近 5s 内,单个采样窗口中最小的响应时间 -
windows
表示一秒内采样窗口的数量,默认配置中是 5s 50 个采样,那么 windows 的值为 10。
-
源码分析
type BBR struct {
cpu cpuGetter // 请求数,和响应时间的采样数据,使用滑动窗口进行统计
passStat window.RollingCounter
rtStat window.RollingCounter
inFlight int64
bucketPerSecond int64
bucketDuration time.Duration
// prevDropTime defines previous start drop since initTime
prevDropTime atomic.Value
maxPASSCache atomic.Value
minRtCache atomic.Value
opts options
}
Allow: 判断请求是否允许通过
// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {
if l.shouldDrop() {
return nil, ratelimit.ErrLimitExceed
}
atomic.AddInt64(&l.inFlight, 1)
start := time.Now().UnixNano()
return func(ratelimit.DoneInfo) {
rt := (time.Now().UnixNano() - start) / int64(time.Millisecond)
l.rtStat.Add(rt)
atomic.AddInt64(&l.inFlight, -1)
l.passStat.Add(1)
}, nil
}
这个方法主要是给中间件使用的
- 首先使用
shouldDrop
方法判断这个请求是否应该丢弃 - 如果成功放行,那么当前系统中的请求数就 +1
- 然后返回一个
function
用于请求结束之后- 统计请求的响应时间
- 如果请求成功了,给成功的请求数 +1
- 并且当前系统中的请求数量
Inflight
-1
判断请求是否应该被丢弃
func (l *BBR) shouldDrop() bool {
now := time.Duration(time.Now().UnixNano())
if l.cpu() < l.opts.CPUThreshold { //判断是否达到 cpu 的最高压力,cpu()获取当前cpu利用率,获取方式在下面讲解
// current cpu payload below the threshold
prevDropTime, _ := l.prevDropTime.Load().(time.Duration)//prevDropTime保存了上一次cpu达到峰值的时间,如果是0,说明还没有到峰值的记录
if prevDropTime == 0 {
// haven't start drop,
// accept current request
return false
}
if time.Duration(now-prevDropTime) <= time.Second {//一秒内
// just start drop one second ago,
// check current inflight count
inFlight := atomic.LoadInt64(&l.inFlight)
return inFlight > 1 && inFlight > l.maxInFlight()//判断实际的正在处理逻辑数量是否达到最高值
}
l.prevDropTime.Store(time.Duration(0))//记录此次达到峰值的时间
return false
}
// current cpu payload exceeds the threshold
inFlight := atomic.LoadInt64(&l.inFlight)
drop := inFlight > 1 && inFlight > l.maxInFlight()
if drop {//如果cpu未达到峰值,但是协程达到了高峰,判断是否有达到过cpu的峰值
prevDrop, _ := l.prevDropTime.Load().(time.Duration)
if prevDrop != 0 {//如果达到过cpu最高峰值,拒绝请求
// already started drop, return directly
return drop
}
// store start drop time
l.prevDropTime.Store(now)//没有达到过就记录此次时间,当做一次峰值
}
return drop
}
cpu模块,aegis/pkg/cpu
其中,cpu 利用率部分使用了linux
的cgroup
的工具使用以及gopsutil
包,主要考虑到了宿主机和 docker 容器的不同。
第一部分,cpu内容的来源
每500ms获取一次,使用了指数加权平均算法的公式 ,地址代码中有。将每次获取的cpu信息保存在全局变量中。
func init() {
go cpuproc()
}
// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)
func cpuproc() {
ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate
defer func() {
ticker.Stop()
if err := recover(); err != nil {
go cpuproc()
}
}()
// EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863
for range ticker.C {
stat := &cpu.Stat{}
cpu.ReadStat(stat)
prevCPU := atomic.LoadInt64(&gCPU)
curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay))
atomic.StoreInt64(&gCPU, curCPU)
}
}
上述只是获取,真正的cpu利用率的计算,来源于 pkg/cpu 包中的逻辑。以下代码
const (
interval time.Duration = time.Millisecond * 500 //每隔500ms收集一次
)
var (
stats CPU
usage uint64
)
// CPU is cpu stat usage.
type CPU interface {
Usage() (u uint64, e error)
Info() Info
}
func init() {
var (
err error
)
stats, err = newCgroupCPU()//这里内部使用了linux的cgroup的方法,考虑到docker 容器的资源限制
if err != nil {
// fmt.Printf("cgroup cpu init failed(%v),switch to psutil cpu\n", err)
stats, err = newPsutilCPU(interval)//这里直接使用 `gopsutil` 包获取了cpu的利用率
if err != nil {
panic(fmt.Sprintf("cgroup cpu init failed!err:=%v", err))
}
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
<-ticker.C
u, err := stats.Usage()
if err == nil && u != 0 {
atomic.StoreUint64(&usage, u)
}
}
}()
}
注意:cpu 这部分,需要有一定的linux cgroup 的基础知识才能看懂
上述代码中 newCgroupCPU 和 newPsutilCPU 就是对应了两种不同的情况的处理。其中,大量使用了原子包,将获取 cpu 利用率保存在一个全局变变量中(代码跟下去就能看到,太多这里不作截取了)。
回到 shouldDrop 这个方法其实就是开头讲到的限流公式了,逻辑如下图所示
- 首先看 CPU 的使用率是否达到了阈值
- 如果没到,则回去判断一下上次触发限流到现在是否在一秒以内
- 如果在一秒内,就判断当前负载是否超过限制,如果超过了就需要丢弃
- 如果不在 1s 内或者是请求数量已经降下来了,那么就吧
prevDrop
清零然后返回 false
- 如果到了,则判断一下当前负载是否超过限制
- 如果超过了,则设置丢弃时间
prevDrop
,返回 true 需要丢弃请求 - 如果没超过直接返回 false
- 如果超过了,则设置丢弃时间
系统的最大负载
func (l *BBR) maxInFlight() int64 {
return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)
}
这个就是计算过去一段时间系统的最大负载是多少