go-zero高可用-自适应熔断器

为什么需要熔断器

微服务集群中,每个应用基本都会依赖一定数量的外部服务。有可能随时都会遇到网络连接缓慢,超时,依赖服务过载,服务不可用的情况,在高并发场景下如果此时调用方不做任何处理,继续持续请求故障服务的话很容易引起整个微服务集群雪崩。
比如高并发场景的用户订单服务,一般需要依赖一下服务:

  1. 商品服务
  2. 账户服务
  3. 库存服务
image

假如此时 账户服务 过载,订单服务持续请求账户服务只能被动的等待账户服务报错或者请求超时,进而导致订单请求被大量堆积,这些无效请求依然会占用系统资源:cpu,内存,数据连接...导致订单服务整体不可用。即使账户服务恢复了订单服务也无法自我恢复。

image

这时如果有一个主动保护机制应对这种场景的话订单服务至少可以保证自身的运行状态,等待账户服务恢复时订单服务也同步自我恢复,这种自我保护机制在服务治理中叫熔断机制。

熔断

熔断是调用方自我保护的机制(客观上也能保护被调用方),熔断对象是外部服务。

降级

降级是被调用方(服务提供者)的防止因自身资源不足导致过载的自我保护机制,降级对象是自身。

image

熔断这一词来源时我们日常生活电路里面的熔断器,当负载过高时(电流过大)保险丝会自行熔断防止电路被烧坏,很多技术都是来自生活场景的提炼。

工作原理

image

熔断器一般具有三个状态:

  1. 关闭:默认状态,请求能被到达目标服务,同时统计在窗口时间成功和失败次数,如果达到错误率阈值将会进入断开状态。
  2. 断开: 此状态下将会直接返回错误,如果有 fallback 配置则直接调用 fallback 方法。
  3. 半断开:进行断开状态会维护一个超市时间,到达超时时间开始进入 半断开 状态,尝试允许一部门请求正常通过并统计成功数量,如果请求正常则认为此时目标服务已恢复进入 关闭 状态,否则进入 断开 状态。半断开 状态存在的目的在于实现了自我修复,同时防止正在恢复的服务再次被大量打垮。

使用较多的熔断组件:

  1. hystrix circuit breaker(不再维护)
  2. hystrix-go
  3. resilience4j(推荐)
  4. sentinel(推荐)

什么是自适应熔断器

基于上面提到的熔断器原理,项目中我们要使用好熔断器通常需要准备以下参数:

  1. 错误比例阈值:达到该阈值进入 断开 状态。
  2. 断开状态超时时间:超时后进入 半断开 状态。
  3. 半断开状态允许请求数量。
  4. 窗口时间大小。

实际上可选的配置参数还有非常非常多,参考 https://resilience4j.readme.io/docs/circuitbreaker

对于经验不够丰富的开发人员而言,这些参数设置多少合适心里其实并没有底。

那么有没有一种自适应的熔断算法能让我们不关注参数,只要简单配置就能满足大部分场景?

其实是有的,google sre提供了一种自适应熔断算法来计算丢弃请求的概率:

image

算法参数:

  1. requests: 窗口时间内的请求总数
  2. accepts:正常请求数量
  3. K:敏感度,K 越小越容易丢请求,一般推荐 1.5-2 之间

算法解释:

  1. 正常情况下 requests=accepts,所以概率是 0。
  2. 随着正常请求数量减少,当达到 requests == K* accepts 继续请求时,概率 P 会逐渐比 0 大开始按照概率逐渐丢弃一些请求,如果故障严重则丢包会越来越多,假如窗口时间内 accepts==0 则完全熔断。
  3. 当应用逐渐恢复正常时,accepts、requests 同时都在增加,但是 K*accepts 会比 requests 增加的更快,所以概率很快就会归 0,关闭熔断。

代码实现

接下来思考一个熔断器如何实现。

初步思路是:

  1. 无论什么熔断器都得依靠指标统计来转换状态,而统计指标一般要求是最近的一段时间内的数据(太久的数据没有参考意义也浪费空间),所以通常采用一个 滑动时间窗口 数据结构 来存储统计数据。同时熔断器的状态也需要依靠指标统计来实现可观测性,我们实现任何系统第一步需要考虑就是可观测性,不然系统就是一个黑盒。
  2. 外部服务请求结果各式各样,所以需要提供一个自定义的判断方法,判断请求是否成功。可能是 http.code 、rpc.code、body.code,熔断器需要实时收集此数据。
  3. 当外部服务被熔断时使用者往往需要自定义快速失败的逻辑,考虑提供自定义的 fallback() 功能。

下面来逐步分析 go-zero 的源码实现:

core/breaker/breaker.go

熔断器接口定义

兵马未动,粮草先行,明确了需求后就可以开始规划定义接口了,接口是我们编码思维抽象的第一步也是最重要的一步。

核心定义包含两种类型的方法:

Allow():需要手动回调请求结果至熔断器,相当于手动挡。

DoXXX():自动回调请求结果至熔断器,相当于自动挡,实际上 DoXXX() 类型方法最后都是调用

DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error

    //自定义判定执行结果
    Acceptable func(err error) bool
    
    //手动回调
    Promise interface {
        // Accept tells the Breaker that the call is successful.
        //请求成功
        Accept()
        // Reject tells the Breaker that the call is failed.
        //请求失败
        Reject(reason string)
    }   

    Breaker interface {
        //熔断器名称
        Name() string

        //熔断方法,执行请求时必须手动上报执行结果
        //适用于简单无需自定义快速失败,无需自定义判定请求结果的场景
        //相当于手动挡。。。
        Allow() (Promise, error)

        //熔断方法,自动上报执行结果
        //自动挡。。。
        Do(req func() error) error

        //熔断方法
        //acceptable - 支持自定义判定执行结果
        DoWithAcceptable(req func() error, acceptable Acceptable) error

        //熔断方法
        //fallback - 支持自定义快速失败
        DoWithFallback(req func() error, fallback func(err error) error) error

        //熔断方法
        //fallback - 支持自定义快速失败
        //acceptable - 支持自定义判定执行结果
        DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
    }

熔断器实现

circuitBreaker 继承 throttle,实际上这里相当于静态代理,代理模式可以在不改变原有对象的基础上增强功能,后面我们会看到 go-zero 这样做的原因是为了收集熔断器错误数据,也就是为了实现可观测性。

熔断器实现采用静态代理模式,看起来稍微有点绕脑。

image
    //熔断器结构体
    circuitBreaker struct {
        name string
        //实际上 circuitBreaker熔断功能都代理给 throttle来实现
        throttle
    }
    //熔断器接口
    throttle interface {
        //熔断方法
        allow() (Promise, error)
        //熔断方法
        //DoXXX()方法最终都会该方法
        doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
    }
    
    func (cb *circuitBreaker) Allow() (Promise, error) {
        return cb.throttle.allow()
    }
    
    func (cb *circuitBreaker) Do(req func() error) error {
        return cb.throttle.doReq(req, nil, defaultAcceptable)
    }
    
    func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error {
        return cb.throttle.doReq(req, nil, acceptable)
    }
    
    func (cb *circuitBreaker) DoWithFallback(req func() error, fallback func(err error) error) error {
        return cb.throttle.doReq(req, fallback, defaultAcceptable)
    }
    
    func (cb *circuitBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error,
        acceptable Acceptable) error {
        return cb.throttle.doReq(req, fallback, acceptable)
    }   
    

throttle 接口实现类:

loggedThrottle 增加了为了收集错误日志的滚动窗口,目的是为了收集当请求失败时的错误日志。


//带日志功能的熔断器
type loggedThrottle struct {
    //名称
    name string
    //代理对象
    internalThrottle
    //滚动窗口,滚动收集数据,相当于环形数组
    errWin *errorWindow
}

//熔断方法
func (lt loggedThrottle) allow() (Promise, error) {
    promise, err := lt.internalThrottle.allow()
    return promiseWithReason{
        promise: promise,
        errWin:  lt.errWin,
    }, lt.logError(err)
}

//熔断方法
func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
    return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
        accept := acceptable(err)
        if !accept {
            lt.errWin.add(err.Error())
        }
        return accept
    }))
}

func (lt loggedThrottle) logError(err error) error {
    if err == ErrServiceUnavailable {
        // if circuit open, not possible to have empty error window
        stat.Report(fmt.Sprintf(
            "proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s",
            proc.ProcessName(), proc.Pid(), lt.name, lt.errWin))
    }

    return err
}

错误日志收集 errorWindow

errorWindow 是一个环形数组,新数据不断滚动覆盖最旧的数据,通过取余实现。

//滚动窗口
type errorWindow struct {
    reasons [numHistoryReasons]string
    index   int
    count   int
    lock    sync.Mutex
}

//添加数据
func (ew *errorWindow) add(reason string) {
    ew.lock.Lock()
    //添加错误日志
    ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
    //更新index,为下一次写入数据做准备
    //这里用的取模实现了滚动功能
    ew.index = (ew.index + 1) % numHistoryReasons
    //统计数量
    ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
    ew.lock.Unlock()
}

//格式化错误日志
func (ew *errorWindow) String() string {
    var reasons []string

    ew.lock.Lock()
    // reverse order
    for i := ew.index - 1; i >= ew.index-ew.count; i-- {
        reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
    }
    ew.lock.Unlock()

    return strings.Join(reasons, "\n")
}

看到这里我们还没看到实际的熔断器实现,实际上真正的熔断操作被代理给了 internalThrottle 对象。

    internalThrottle interface {
        allow() (internalPromise, error)
        doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
    }

internalThrottle 接口实现 googleBreaker 结构体定义

type googleBreaker struct {
    //敏感度,go-zero中默认值为1.5
    k float64
    //滑动窗口,用于记录最近一段时间内的请求总数,成功总数
    stat *collection.RollingWindow
    //概率生成器
    //随机产生0.0-1.0之间的双精度浮点数
    proba *mathx.Proba
}

可以看到熔断器属性其实非常简单,数据统计采用的是滑动时间窗口来实现。

RollingWindow 滑动窗口

滑动窗口属于比较通用的数据结构,常用于最近一段时间内的行为数据统计。

它的实现非常有意思,尤其是如何模拟窗口滑动过程。

先来看滑动窗口的结构体定义:

    RollingWindow struct {
        //互斥锁
        lock sync.RWMutex
        //滑动窗口数量
        size int
        //窗口,数据容器
        win *window
        //滑动窗口单元时间间隔
        interval time.Duration
        //游标,用于定位当前应该写入哪个bucket
        offset int
        //汇总数据时,是否忽略当前正在写入桶的数据
        //某些场景下因为当前正在写入的桶数据并没有经过完整的窗口时间间隔
        //可能导致当前桶的统计并不准确
        ignoreCurrent bool
        //最后写入桶的时间
        //用于计算下一次写入数据间隔最后一次写入数据的之间
        //经过了多少个时间间隔
        lastTime      time.Duration 
    }
image

window 是数据的实际存储位置,其实就是一个数组,提供向指定 offset 添加数据与清除操作。
数组里面按照 internal 时间间隔分隔成多个 bucket。

//时间窗口
type window struct {
    //桶
    //一个桶标识一个时间间隔
    buckets []*Bucket
    //窗口大小
    size int
}

//添加数据
//offset - 游标,定位写入bucket位置
//v - 行为数据
func (w *window) add(offset int, v float64) {
    w.buckets[offset%w.size].add(v)
}

//汇总数据
//fn - 自定义的bucket统计函数
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
    for i := 0; i < count; i++ {
        fn(w.buckets[(start+i)%w.size])
    }
}

//清理特定bucket
func (w *window) resetBucket(offset int) {
    w.buckets[offset%w.size].reset()
}

//桶
type Bucket struct {
    //当前桶内值之和
    Sum float64
    //当前桶的add总次数
    Count int64
}

//向桶添加数据
func (b *Bucket) add(v float64) {
    //求和
    b.Sum += v
    //次数+1
    b.Count++
}

//桶数据清零
func (b *Bucket) reset() {
    b.Sum = 0
    b.Count = 0
}

window 添加数据:

  1. 计算当前时间距离上次添加时间经过了多少个 时间间隔,实际上就是过期了几个 bucket。
  2. 清理过期桶的数据
  3. 更新 offset,更新 offset 的过程实际上就是在模拟窗口滑动
  4. 添加数据
image
// 添加数据
func (rw *RollingWindow) Add(v float64) {
    rw.lock.Lock()
    defer rw.lock.Unlock()
    //获取当前写入的下标
    rw.updateOffset()
    //添加数据
    rw.win.add(rw.offset, v)
}

//计算当前距离最后写入数据经过多少个单元时间间隔
//实际上指的就是经过多少个桶
func (rw *RollingWindow) span() int {
    offset := int(timex.Since(rw.lastTime) / rw.interval)
    if 0 <= offset && offset < rw.size {
        return offset
    }
    //大于时间窗口时 返回窗口大小即可
    return rw.size
}

//更新当前时间的offset
//实现窗口滑动
func (rw *RollingWindow) updateOffset() {
    //经过span个桶的时间
    span := rw.span()
    //还在同一单元时间内不需要更新
    if span <= 0 {
        return
    }
    offset := rw.offset
    //既然经过了span个桶的时间没有写入数据
    //那么这些桶内的数据就不应该继续保留了,属于过期数据清空即可
    //可以看到这里全部用的 % 取余操作,可以实现按照下标周期性写入
    //如果超出下标了那就从头开始写,确保新数据一定能够正常写入
    //类似循环数组的效果
    for i := 0; i < span; i++ {
        rw.win.resetBucket((offset + i + 1) % rw.size)
    }
    //更新offset
    rw.offset = (offset + span) % rw.size
    now := timex.Now()
    //更新操作时间
    //这里很有意思
    rw.lastTime = now - (now-rw.lastTime)%rw.interval
}

window 统计数据:

//归纳汇总数据
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
    rw.lock.RLock()
    defer rw.lock.RUnlock()

    var diff int
    span := rw.span()
    //当前时间截止前,未过期桶的数量
    if span == 0 && rw.ignoreCurrent {
        diff = rw.size - 1
    } else {
        diff = rw.size - span
    }
    if diff > 0 {
        //rw.offset - rw.offset+span之间的桶数据是过期的不应该计入统计
        offset := (rw.offset + span + 1) % rw.size
        //汇总数据
        rw.win.reduce(offset, diff, fn)
    }
}
googleBreaker 判断是否应该熔断
  1. 收集滑动窗口内的统计数据
  2. 计算熔断概率
//按照最近一段时间的请求数据计算是否熔断
func (b *googleBreaker) accept() error {
    //获取最近一段时间的统计数据
    accepts, total := b.history()
    //计算动态熔断概率
    weightedAccepts := b.k * float64(accepts)
    // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
    dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
    //概率为0,通过
    if dropRatio <= 0 {
        return nil
    }
    //随机产生0.0-1.0之间的随机数与上面计算出来的熔断概率相比较
    //如果随机数比熔断概率小则进行熔断
    if b.proba.TrueOnProba(dropRatio) {
        return ErrServiceUnavailable
    }

    return nil
}
googleBreaker 熔断逻辑实现

熔断器对外暴露两种类型的方法

  1. 简单场景直接判断对象是否被熔断,执行请求后必须需手动上报执行结果至熔断器。

func (b *googleBreaker) allow() (internalPromise, error)

  1. 复杂场景下支持自定义快速失败,自定义判定请求是否成功的熔断方法,自动上报执行结果至熔断器。

func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error

Acceptable 参数目的是自定义判断请求是否成功。

Acceptable func(err error) bool
//熔断方法
//返回一个promise异步回调对象,可由开发者自行决定是否上报结果到熔断器
func (b *googleBreaker) allow() (internalPromise, error) {
    if err := b.accept(); err != nil {
        return nil, err
    }

    return googlePromise{
        b: b,
    }, nil
}

//熔断方法
//req - 熔断对象方法
//fallback - 自定义快速失败函数,可对熔断产生的err进行包装后返回
//acceptable - 对本次未熔断时执行请求的结果进行自定义的判定,比如可以针对http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
    //判定是否熔断
    if err := b.accept(); err != nil {
        //熔断中,如果有自定义的fallback则执行
        if fallback != nil {
            return fallback(err)
        }

        return err
    }
    //如果执行req()过程发生了panic,依然判定本次执行失败上报至熔断器
    defer func() {
        if e := recover(); e != nil {
            b.markFailure()
            panic(e)
        }
    }()
    //执行请求
    err := req()
    //判定请求成功
    if acceptable(err) {
        b.markSuccess()
    } else {
        b.markFailure()
    }

    return err
}

//上报成功
func (b *googleBreaker) markSuccess() {
    b.stat.Add(1)
}

//上报失败
func (b *googleBreaker) markFailure() {
    b.stat.Add(0)
}

//统计数据
func (b *googleBreaker) history() (accepts, total int64) {
    b.stat.Reduce(func(b *collection.Bucket) {
        accepts += int64(b.Sum)
        total += b.Count
    })

    return
}

资料

微软 azure 关于熔断器设计模式

索尼参考微软的文档开源的熔断器实现

go-zero 自适应熔断器文档

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • 这篇文章选自images.jpgJohn Camell的《Spring Micoservices IN ACTIO...
    神易风阅读 591评论 0 0
  • 产生原因 在分布式应用中,容易出现由某个基础服务故障引发整个集群了崩溃,称之为雪崩。 熔断器模式 熔断器模式可以有...
    邝健强阅读 1,721评论 0 0
  • 在生活中,如果电路的负载过高,保险箱会自动跳闸,以保护家里的各种电器,这就是熔断器的一个活生生例子。在Hystri...
    美团Java阅读 6,945评论 1 32
  • ——Martin Fowler原文[https://martinfowler.com/bliki/CircuitB...
    Anor9阅读 297评论 0 0
  • 在微服务中服务间依赖非常常见,比如评论服务依赖审核服务而审核服务又依赖反垃圾服务,当评论服务调用审核服务时,审核服...
    kevwan阅读 1,335评论 0 3