golang-熔断器

熔断器

go-zero在breaker里面基于google的sre算法实现了熔断器逻辑,并在redis等客户端操作的时候引入了熔断器

算法公式
image.png

go-zero 实现了这个公式,并且作为底层组件很好被嵌入使用,在redis的调用就有很好的表现.

  1. 定义接口
  2. 底层组件实现该接口,
  3. 初始化breaker各属性对象,依赖对象数据类型定义的都是interface
    https://github.com/tal-tech/go-zero/blob/master/core/breaker/breaker.go

使用demo

import (
    "errors"
    "fmt"
    "github.com/stretchr/testify/assert"
    "github.com/tal-tech/go-zero/core/stat"
    "strconv"
    "strings"
    "testing"
)
// 熔断服务会将闭包每次返回的结果统计起来
// 最终在sre公式中通过计算得到是否放行的标志
func TestGoogleBreaker(t *testing.T) {
    br := NewBreaker()
    for i := 0; i < 500; i++ {
        br.Do(
            func() error {

                fmt.Println("如果闭包一直执行那么i就会连续的被打印", i)

                if i > 4 {
                    err := errors.New("err info")
                    return err
                } else {
                    return nil
                }

            },
        )
    }
}
image.png

redis调用使用demo

package redis

import (
    "errors"
    "fmt"
    "strconv"
    "time"

    red "github.com/go-redis/redis"
    "github.com/tal-tech/go-zero/core/breaker"
    "github.com/tal-tech/go-zero/core/mapping"
)
//redis客户端初始化过程中,特意将breaker作为brk依赖注入,使得go-zero底层redis客户端组件自带熔断保护措施
// 参照这我们使用breaker的灵活性就很大了,  闭包。。
func NewRedis(redisAddr, redisType string, redisPass ...string) *Redis {
    var pass string
    for _, v := range redisPass {
        pass = v
    }

    return &Redis{
        Addr: redisAddr,
        Type: redisType,
        Pass: pass,
        brk:  breaker.NewBreaker(),//redis客户端初始化过程中,特意将breaker作为brk依赖注入
    }
}


// 这是找到go-zero的redis客户端del过程
// 看见没 s.brk.DoWithAcceptable, go-zero将所有操作redis的过程都以闭包的形式包入breker中
func (s *Redis) Del(keys ...string) (val int, err error) {
    err = s.brk.DoWithAcceptable(func() error {
        conn, err := getRedis(s)
        if err != nil {
            return err
        }

        if v, err := conn.Del(keys...).Result(); err != nil {
            return err
        } else {
            val = int(v)
            return nil
        }
    }, acceptable)

    return
}

代码分析:


// 闭包是以参数传入该方法的
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
   // 该方法每次在执行闭包之前会先通过 b.accept(); 验证当前是否还能执行闭包 
   if err := b.accept(); err != nil {
        if fallback != nil {
            return fallback(err)
        } else {
            return err
        }
    }

    defer func() {
        if e := recover(); e != nil {
            b.markFailure()
            panic(e)
        }
    }()

    err := req()
    if acceptable(err) {
                // 闭包执行返回的err为nil走这里,进行数据统计
        b.markSuccess()
    } else {
                // 闭包执行返回的err 不为 nil走这里
        b.markFailure()
    }

    return err
}


// sre 公式的真是写照了
// 闭包每次执行之前都会先执行这个方法,看一下还能不能执行闭包
func (b *googleBreaker) accept() error {
    accepts, total := b.history()  //这个方法返回当前时刻 闭包返回nil,与非nil的统计数值,用于sre公式计算
    weightedAccepts := b.k * float64(accepts)
    // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
    dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
    if dropRatio <= 0 {
        return nil
    }

    if b.proba.TrueOnProba(dropRatio) {
        return ErrServiceUnavailable
    }

    return nil
}
// 闭包执行成功,其实还是调用add
func (b *googleBreaker) markSuccess() {
    b.stat.Add(1)
}


func (rw *RollingWindow) Add(v float64) {
    rw.lock.Lock()
    defer rw.lock.Unlock()
    rw.updateOffset()  //核心。。
    rw.win.add(rw.offset, v)
}

// 这里就是计算 sre 算法用到的 参数数字的过程了
// 一开始在初始化breaker的时候,10S为单位分成40份
// 这里用到了一个巧妙方式自动初始化一部分区间的数值,
// 也就是算法一开始所描述的,当错误次数过高熔断被触发,当一段时间过期后便会重新再去调用闭包,
// 只所以会过一段时间去调用,是因为sre公式又成立了,之所以成立了就是因为这里随着时间的推移初始化掉
//了一部分区间的数值
func (rw *RollingWindow) updateOffset() {
        // breaker初始化过程中,|--|--|--|--|--|...   每个区间时间长度为: 10 * time.Second / 40,对应代码为数组的40个下标,不同时刻 闭包执行的统计值的累加结果存在不同下标的对象中进行累加  ,这里的span()就是根据当前时刻决定下标移动的跨度
        span := rw.span()      
        if span > 0 {  
        offset := rw.offset //当前区间的下标
        // reset expired buckets
        start := offset + 1  // 新的开始下标数值
        steps := start + span //新的结束下标数值
        var remainder int
                // 到底了,就从头开始清洗数据
        if steps > rw.size {
            remainder = steps - rw.size
            steps = rw.size
        }
                // 没到底,就将该范围的数值重置为0,这也就是在表达,随着时间的推移,会有一部分数据给归零处理
//使得sre公式又开始成立,也就是上游服务过载让其休息下,过一段时间再来调用看看~
        for i := start; i < steps; i++ {
            rw.win.resetBucket(i)
            offset = i
        }
        for i := 0; i < remainder; i++ {
            rw.win.resetBucket(i)
            offset = i
        }
        rw.offset = offset
        rw.lastTime = timex.Now()
    }
}

 

参考文献

https://blog.csdn.net/jfwan/article/details/109328874

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 在微服务中服务间依赖非常常见,比如评论服务依赖审核服务而审核服务又依赖反垃圾服务,当评论服务调用审核服务时,审核服...
    kevwan阅读 1,381评论 0 3
  • 最近对服务进行监控,而当前监控最流行的数据库就是 Prometheus,同时 go-zero 默认接入也是这款数据...
    kevwan阅读 1,556评论 0 4
  • 从此篇文章开始,我们来陆续介绍 go-zero 开发一个项目所需要的组件和开发实践。 首先我们从 model 层开...
    kevwan阅读 525评论 0 0
  • 限速方式 漏桶算法: 讲究的是服务器匀速的去处理并发请求,但... 为达到目的居然采用sleep了。简单来说服务器...
    Best博客阅读 4,127评论 0 0
  • 大家好!我是 go-zero 作者 Kevin。充满惊吓的 2020 快要过去了,看到掘金上的技术人年度征文,忍不...
    kevwan阅读 210评论 0 0