kratos aegis 限流器的使用和实现过程解析

一。基础知识点说明

Kraots 的限流算法

kratos 借鉴了 Sentinel 项目的自适应限流系统,通过综合分析服务的 cpu 使用率、请求成功的 load(负载), qps 和请求成功的 rt(请求成功的响应耗时) 来做自适应限流保护。从官方文档上看,限流算法要实现的核心目标有如下两点:

  1. ** 自动 ** 嗅探负载和 qps,减少人工配置 && 干预
  2. 削顶, 保证超载时系统不被拖垮,并能以高水位 qps 继续运行

自适应限流怎么做

前面我们遇到的主要问题就是每个服务实例的限流阈值实际应该是动态变化的,我们应该根据系统能够承载的最大吞吐量,来进行限流,当当前的流量大于最大吞吐的时候就限制流量进入,反之则允许通过。那现在的问题就是

  • 系统的吞吐量该如何计算?
  • 什么时候系统的吞吐量就是最大的吞吐量了?

计算吞吐量:利特尔法则 L = λ * W

利特尔法则由麻省理工大学斯隆商学院(MIT Sloan School of Management)的教授 John Little﹐于 1961 年所提出与证明。它是一个有关提前期与在制品关系的简单数学公式,这一法则为精益生产的改善方向指明了道路。 —- MBA 智库百科 (mbalib.com)

如上图所示,如果我们开一个小店,平均每分钟进店 2 个客人(λ),每位客人从等待到完成交易需要 4 分钟(W),那我们店里能承载的客人数量就是 2 * 4 = 8 个人

同理,我们可以将 λ 当做 QPS, W 呢是每个请求需要花费的时间,那我们的系统的吞吐就是 L = λ * W ,所以我们可以使用利特尔法则来计算系统的吞吐量。

什么时候系统的吞吐量就是最大的吞吐量?

首先我们可以通过统计过去一段时间的数据,获取到平均每秒的请求量,也就是 QPS,以及请求的耗时时间,为了避免出现前面 900ms 一个请求都没有最后 100ms 请求特别多的情况,我们可以使用滑动窗口算法来进行统计。

最容易想到的就是我们从系统启动开始,就把这些值给保存下来,然后计算一个吞吐的最大值,用这个来表示我们的最大吞吐量就可以了。但是这样存在一个问题是,我们很多系统其实都不是独占一台机器的,一个物理机上面往往有很多服务,并且一般还存在一些超卖,所以可能第一个小时最大处理能力是 100,但是这台节点上其他服务实例同时都在抢占资源的时候,这个处理能力最多就只能到 80 了

所以我们需要一个数据来做启发阈值,只要这个指标达到了阈值那我们就进入流控当中。常见的选择一般是 CPU、Memory、System Load,这里我们以 CPU 为例

只要我们的 CPU 负载超过 80% 的时候,获取过去 5s 的最大吞吐数据,然后再统计当前系统中的请求数量,只要当前系统中的请求数大于最大吞吐那么我们就丢弃这个请求。

kratos 自适应限流分析

二。实际代码及使用解释

限流公式

  • cpu > 800 表示 CPU 负载大于 80% 进入限流,这里是800,而不是0.8,因为在计算的时候,源码中乘了个1e3,地址在 cgroupCPU 的 Usage方法中 aegis/pkg/cpu/cgroup_cpu.go 文件中
  • (Now - PrevDrop) < 1s 这个表示只要触发过 1 次限流,那么 1s 内都会去做限流的判定,这是为了避免反复出现限流恢复导致请求时间和系统负载产生大量毛刺
  • (MaxPass * MinRt * windows / 1000) < InFlight 判断当前负载是否大于最大负载
    • InFlight 表示当前系统中有多少请求
    • (MaxPass * MinRt * windows / 1000) 表示过去一段时间的最大负载
    • MaxPass 表示最近 5s 内,单个采样窗口中最大的请求数
    • MinRt 表示最近 5s 内,单个采样窗口中最小的响应时间
    • windows 表示一秒内采样窗口的数量,默认配置中是 5s 50 个采样,那么 windows 的值为 10。

源码分析

type BBR struct {
    cpu             cpuGetter   // 请求数,和响应时间的采样数据,使用滑动窗口进行统计    
    passStat        window.RollingCounter
    rtStat          window.RollingCounter
    inFlight        int64
    bucketPerSecond int64
    bucketDuration  time.Duration

    // prevDropTime defines previous start drop since initTime
    prevDropTime atomic.Value
    maxPASSCache atomic.Value
    minRtCache   atomic.Value

    opts options
}

Allow: 判断请求是否允许通过

// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow() (ratelimit.DoneFunc, error) {
    if l.shouldDrop() {
        return nil, ratelimit.ErrLimitExceed
    }
    atomic.AddInt64(&l.inFlight, 1)
    start := time.Now().UnixNano()
    return func(ratelimit.DoneInfo) {
        rt := (time.Now().UnixNano() - start) / int64(time.Millisecond)
        l.rtStat.Add(rt)
        atomic.AddInt64(&l.inFlight, -1)
        l.passStat.Add(1)
    }, nil
}

这个方法主要是给中间件使用的

  • 首先使用 shouldDrop 方法判断这个请求是否应该丢弃
  • 如果成功放行,那么当前系统中的请求数就 +1
  • 然后返回一个 function 用于请求结束之后
    • 统计请求的响应时间
    • 如果请求成功了,给成功的请求数 +1
    • 并且当前系统中的请求数量 Inflight -1

判断请求是否应该被丢弃

func (l *BBR) shouldDrop() bool {
    now := time.Duration(time.Now().UnixNano())
    if l.cpu() < l.opts.CPUThreshold {    //判断是否达到 cpu 的最高压力,cpu()获取当前cpu利用率,获取方式在下面讲解
        // current cpu payload below the threshold
        prevDropTime, _ := l.prevDropTime.Load().(time.Duration)//prevDropTime保存了上一次cpu达到峰值的时间,如果是0,说明还没有到峰值的记录
        if prevDropTime == 0 {
            // haven't start drop,
            // accept current request
            return false
        }
        if time.Duration(now-prevDropTime) <= time.Second {//一秒内
            // just start drop one second ago,
            // check current inflight count
            inFlight := atomic.LoadInt64(&l.inFlight)
            return inFlight > 1 && inFlight > l.maxInFlight()//判断实际的正在处理逻辑数量是否达到最高值
        }
        l.prevDropTime.Store(time.Duration(0))//记录此次达到峰值的时间
        return false
    }
    // current cpu payload exceeds the threshold
    inFlight := atomic.LoadInt64(&l.inFlight)
    drop := inFlight > 1 && inFlight > l.maxInFlight()
    if drop {//如果cpu未达到峰值,但是协程达到了高峰,判断是否有达到过cpu的峰值
        prevDrop, _ := l.prevDropTime.Load().(time.Duration)
        if prevDrop != 0 {//如果达到过cpu最高峰值,拒绝请求
            // already started drop, return directly
            return drop
        }
        // store start drop time
        l.prevDropTime.Store(now)//没有达到过就记录此次时间,当做一次峰值
    }
    return drop
}

cpu模块,aegis/pkg/cpu

其中,cpu 利用率部分使用了linuxcgroup的工具使用以及gopsutil包,主要考虑到了宿主机和 docker 容器的不同。

第一部分,cpu内容的来源

每500ms获取一次,使用了指数加权平均算法的公式 ,地址代码中有。将每次获取的cpu信息保存在全局变量中。

func init() {
    go cpuproc()
}

// cpu = cpuᵗ⁻¹ * decay + cpuᵗ * (1 - decay)
func cpuproc() {
    ticker := time.NewTicker(time.Millisecond * 500) // same to cpu sample rate
    defer func() {
        ticker.Stop()
        if err := recover(); err != nil {
            go cpuproc()
        }
    }()

    // EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863
    for range ticker.C {
        stat := &cpu.Stat{}
        cpu.ReadStat(stat)
        prevCPU := atomic.LoadInt64(&gCPU)
        curCPU := int64(float64(prevCPU)*decay + float64(stat.Usage)*(1.0-decay))
        atomic.StoreInt64(&gCPU, curCPU)
    }
}

上述只是获取,真正的cpu利用率的计算,来源于 pkg/cpu 包中的逻辑。以下代码

const (
    interval time.Duration = time.Millisecond * 500 //每隔500ms收集一次
)
var (
    stats CPU
    usage uint64
)

// CPU is cpu stat usage.
type CPU interface {
    Usage() (u uint64, e error)
    Info() Info
}

func init() {
    var (
        err error
    )
    stats, err = newCgroupCPU()//这里内部使用了linux的cgroup的方法,考虑到docker 容器的资源限制
    if err != nil {
        // fmt.Printf("cgroup cpu init failed(%v),switch to psutil cpu\n", err)
        stats, err = newPsutilCPU(interval)//这里直接使用 `gopsutil` 包获取了cpu的利用率
        if err != nil {
            panic(fmt.Sprintf("cgroup cpu init failed!err:=%v", err))
        }
    }
    go func() {
        ticker := time.NewTicker(interval)
        defer ticker.Stop()
        for {
            <-ticker.C
            u, err := stats.Usage()
            if err == nil && u != 0 {
                atomic.StoreUint64(&usage, u)
            }
        }
    }()
}

注意:cpu 这部分,需要有一定的linux cgroup 的基础知识才能看懂

上述代码中 newCgroupCPU 和 newPsutilCPU 就是对应了两种不同的情况的处理。其中,大量使用了原子包,将获取 cpu 利用率保存在一个全局变变量中(代码跟下去就能看到,太多这里不作截取了)。

回到 shouldDrop 这个方法其实就是开头讲到的限流公式了,逻辑如下图所示

  • 首先看 CPU 的使用率是否达到了阈值
  • 如果没到,则回去判断一下上次触发限流到现在是否在一秒以内
    • 如果在一秒内,就判断当前负载是否超过限制,如果超过了就需要丢弃
    • 如果不在 1s 内或者是请求数量已经降下来了,那么就吧 prevDrop 清零然后返回 false
  • 如果到了,则判断一下当前负载是否超过限制
    • 如果超过了,则设置丢弃时间 prevDrop,返回 true 需要丢弃请求
    • 如果没超过直接返回 false

系统的最大负载

func (l *BBR) maxInFlight() int64 {
    return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)
}

这个就是计算过去一段时间系统的最大负载是多少

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容