golang并发三板斧系列之二:goroutine池用于并发

这是本系列文章的第二篇,第一篇在此golang并发三板斧系列之一:channel用于通信和同步

前文描述了手工作坊的时代,即老师傅带着小学徒并发地做一项工作,现在我们准备进入工业时代。

Pipeline模型

Pipeline即流水线模型,这在现代工业是很常见的。模型分为数个阶段,每个阶段干不同的事情,但可以并行地去做。以造拖拉机为例来解释流水线的工作方式,假设装配一辆汽车需要四个步骤:

  • 第一步冲压:制作车身外壳和底盘等部件。
  • 第二步焊接:将冲压成形后的各部件焊接成车身。
  • 第三步涂装:将车身等主要部件清洗、化学处理、打磨、喷漆和烘干。
  • 第四步总装:将各部件(包括发动机和向外采购的零部件)组装成车。
image

比如要造一百辆拖拉机,如果每个阶段都等前一阶段的一百辆完成才开工,是对生产线的极大浪费。现代工业的流水线做法是每个阶段同时开工,在时间上并行起来。流水线的概念在计算机世界中也很普遍,拥有流水线的CPU可以在一个时钟周期内完成一条指令,而不是等待取指令、译码、取操作数、执行四个阶段才能完成一条指令:

image

golang的设计者们吸收了这一经典概念,使用channel把前后数个阶段串联起来,形成一个流水线:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func TestPipeline(t *testing.T) {
    c := gen(2, 3, 4)
    out := sq(c)

    for n := range out {
        log.Print(n)
    }
}

由于sq函数的入参和出参一样,故可以增加无数个阶段写为:

func TestPipelines(t *testing.T) {
    for n := range sq(sq(sq(gen(1, 2, 3, 4)))) {
        log.Print(n)
    }
}

FAN模型

可以说FAN模型是流水线的一种改进。可以观察到上述的流水线模型,每个阶段只起了一个goroutine,但是现实造拖拉机的时候,在组装轮子的时候可以4个工人一起上,这又是提高了并发。golang吸取了这种模型,在任务分发阶段,多个goroutine从同一个channel读取数据,直到关闭,称为FAN-OUT模型;在结果收集阶段,单个goroutine从多个channel读取数据,直到关闭,称为FAN-IN模型:

[图片上传失败...(image-d2aaa-1558316671133)]

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func TestFan(t *testing.T) {
    c := gen(2, 3, 4)
    out1 := sq(c)
    out2 := sq(c)

    for n := range merge(out1, out2) {
        log.Print(n)
    }
}

并行起来之后,最终结果的顺序是不可控的:

2019/02/22 21:39:27 9
2019/02/22 21:39:27 16
2019/02/22 21:39:27 4

Process finished with exit code 0

以上模型能提升并行效率吗

CPU密集型

用浮点数的幂计算模拟CPU-BOUND,设计了如下模型用于比较:

//功能函数
func benchmarkList() []float64 {
    list := make([]float64, MAX)
    for n := 0; n < MAX; n++ {
        list[n] = float64(n)
    }
    return list
}

func cpubound(n float64) float64 {
    return math.Pow(3.1415926, n)
}

func genChan(nums []float64) <-chan float64 {
    out := make(chan float64)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func genChanBuffer(nums []float64) <-chan float64 {
    out := make(chan float64, BUFFERSIZE)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func cpuChan(in <-chan float64) <-chan float64 {
    out := make(chan float64)
    go func() {
        for n := range in {
            out <- cpubound(n)
        }
        close(out)
    }()
    return out
}

func cpuChanBuffer(in <-chan float64) <-chan float64 {
    out := make(chan float64, BUFFERSIZE)
    go func() {
        for n := range in {
            out <- cpubound(n)
        }
        close(out)
    }()
    return out
}

func mergeChan(cs ...<-chan float64) <-chan float64 {
    var wg sync.WaitGroup
    out := make(chan float64)

    output := func(c <-chan float64) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func mergeChanBuffer(cs ...<-chan float64) <-chan float64 {
    var wg sync.WaitGroup
    out := make(chan float64, BUFFERSIZE)

    output := func(c <-chan float64) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

//测试函数
func BenchmarkCPUSequential(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        var sum float64
        for _, n := range list {
            sum += cpubound(n)
        }
    }
}

func BenchmarkCPUPipeline(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        c := genChan(list)
        out := cpuChan(c)

        var sum float64
        for n := range out {
            sum += n
        }
    }
}

func BenchmarkCPUPipelineBuffer(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        c := genChanBuffer(list)
        out := cpuChanBuffer(c)

        var sum float64
        for n := range out {
            sum += n
        }
    }
}

func BenchmarkCPUFan(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        c := genChan(list)
        gonum := runtime.NumCPU() / 2
        outs := make([]<-chan float64, gonum)
        for i := 0; i < gonum; i++ {
            outs[i] = cpuChan(c)
        }

        var sum float64
        for n := range mergeChan(outs...) {
            sum += n
        }
    }
}

func BenchmarkCPUFanBuffer(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        c := genChanBuffer(list)
        gonum := runtime.NumCPU() / 2
        outs := make([]<-chan float64, gonum)
        for i := 0; i < gonum; i++ {
            outs[i] = cpuChanBuffer(c)
        }

        var sum float64
        for n := range mergeChanBuffer(outs...) {
            sum += n
        }
    }
}

func BenchmarkCPUParallelize(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()
        gonum := runtime.NumCPU()

        var sum float64
        num := len(list)
        stride := num / gonum

        var wg sync.WaitGroup
        wg.Add(gonum)
        var mux sync.Mutex

        for g := 0; g < gonum; g++ {
            go func(g int) {
                start := g * stride
                end := start + stride
                if g == gonum-1 {
                    end = num
                }

                var sumin float64
                for _, n := range list[start:end] {
                    sumin += cpubound(n)
                }

                mux.Lock()
                sum += sumin
                mux.Unlock()

                wg.Done()
            }(g)
        }

        wg.Wait()
    }
}

设MAX=1000000,BUFFERSIZE=1000。结果让人大跌眼镜,无论是Pipeline模型还是Fan模型,都比不上普通的串行,只有普通的并发模型能有效提升:

[baixiao@localhost go_concurrency]$ GOGC=off go test -cpu 1,8 -run none -bench CPU -benchtime 3s 
goos: linux
goarch: amd64
BenchmarkCPUSequential                50          95148037 ns/op
BenchmarkCPUSequential-8              30         101450384 ns/op
BenchmarkCPUPipeline                  10         512093124 ns/op
BenchmarkCPUPipeline-8                 5         864946495 ns/op
BenchmarkCPUPipelineBuffer            20         219850707 ns/op
BenchmarkCPUPipelineBuffer-8          10         370302165 ns/op
BenchmarkCPUFan                        5         715223945 ns/op
BenchmarkCPUFan-8                      5         913448396 ns/op
BenchmarkCPUFanBuffer                 10         320494600 ns/op
BenchmarkCPUFanBuffer-8               10         427863250 ns/op
BenchmarkCPUParallelize              100          95482003 ns/op
BenchmarkCPUParallelize-8            200          19398520 ns/op
PASS
ok      _/home/baixiao/go_concurrency   77.085s

可以得出以下结论:

  • Sequential完爆Pipeline和Fan
  • Pipeline和Fan在多核下均弱于单核,因为系统瓶颈根本不在并行上,而是channel造成的阻塞
  • 给channel加了buffer之后,PipelineBuffer优于Pipeline、FanBuffer优于Fan,因为channel的阻塞减弱了
  • 多核下的Parallelize:在座的各位都是垃圾

IO密集型

用随机sleep模拟IO-BOUND,设计了如下模型用于比较:

//功能函数
func iobound(n float64) float64 {
    time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
    return n
}

func ioChan(in <-chan float64) <-chan float64 {
    out := make(chan float64)
    go func() {
        for n := range in {
            out <- iobound(n)
        }
        close(out)
    }()
    return out
}

func ioChanBuffer(in <-chan float64) <-chan float64 {
    out := make(chan float64, BUFFERSIZE)
    go func() {
        for n := range in {
            out <- iobound(n)
        }
        close(out)
    }()
    return out
}

//测试函数
func BenchmarkIOSequential(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        var sum float64
        for _, n := range list {
            sum += iobound(n)
        }
    }
}

func BenchmarkIOPipeline(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        c := genChan(list)
        out := ioChan(c)

        var sum float64
        for n := range out {
            sum += n
        }
    }
}

func BenchmarkIOPipelineBuffer(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        c := genChanBuffer(list)
        out := ioChanBuffer(c)

        var sum float64
        for n := range out {
            sum += n
        }
    }
}

func BenchmarkIOFan(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        c := genChan(list)
        gonum := runtime.NumCPU() / 2
        outs := make([]<-chan float64, gonum)
        for i := 0; i < gonum; i++ {
            outs[i] = ioChan(c)
        }

        var sum float64
        for n := range mergeChan(outs...) {
            sum += n
        }
    }
}

func BenchmarkIOFanBuffer(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()

        c := genChanBuffer(list)
        gonum := runtime.NumCPU() / 2
        outs := make([]<-chan float64, gonum)
        for i := 0; i < gonum; i++ {
            outs[i] = ioChanBuffer(c)
        }

        var sum float64
        for n := range mergeChanBuffer(outs...) {
            sum += n
        }
    }
}

func BenchmarkIOParallelize(b *testing.B) {
    for i := 0; i < b.N; i++ {
        list := benchmarkList()
        gonum := runtime.NumCPU()

        var sum float64
        num := len(list)
        stride := num / gonum

        var wg sync.WaitGroup
        wg.Add(gonum)
        var mux sync.Mutex

        for g := 0; g < gonum; g++ {
            go func(g int) {
                start := g * stride
                end := start + stride
                if g == gonum-1 {
                    end = num
                }

                var sumin float64
                for _, n := range list[start:end] {
                    sumin += iobound(n)
                }

                mux.Lock()
                sum += sumin
                mux.Unlock()

                wg.Done()
            }(g)
        }

        wg.Wait()
    }
}

设MAX=100,BUFFERSIZE=1000。

[baixiao@localhost go_concurrency]$ GOGC=off go test -cpu 1,8 -run none -bench IO -benchtime 3s
goos: linux
goarch: amd64
BenchmarkIOSequential                 10         441609349 ns/op
BenchmarkIOSequential-8               10         457200927 ns/op
BenchmarkIOPipeline                   10         454147034 ns/op
BenchmarkIOPipeline-8                 10         467264740 ns/op
BenchmarkIOPipelineBuffer             10         456459492 ns/op
BenchmarkIOPipelineBuffer-8           10         452286832 ns/op
BenchmarkIOFan                       100          36995490 ns/op
BenchmarkIOFan-8                     100          36950275 ns/op
BenchmarkIOFanBuffer                 100          37555702 ns/op
BenchmarkIOFanBuffer-8               100          36851459 ns/op
BenchmarkIOParallelize                50          88653231 ns/op
BenchmarkIOParallelize-8              50          87061568 ns/op
PASS
ok      _/home/baixiao/go_concurrency   54.007s

可以得出以下结论:

  • 在IO密集的情况下,Fan模型吊打所有
  • channel带buffer没有效率提升
  • 所有的模型,在多核下都没有提升

结论是?

  1. 不带buffer的channel由于「强同步」特性,无法提高并行,甚至拖累效率
  2. CPU密集型的场景,多核并行能提升效率
  3. IO密集型的场景,多核并行不能提升效率
  4. Pipeling模型有何用途?我没看出来
  5. Waitgroup模型在CPU密集型场景有优势
  6. Fan模型在IO密集型场景有优势

坑:runtime.NumCPU()不会随着runtime.GOMAXPROCS()改变,前者代表的是系统全部的核数,后者代表的是可同时使用的核数

goroutine池

pool模型

设计一个pool,需要考虑几个方面:输入是什么,做什么事情,多少worker一起执行?

现抽象出一个goroutine pool的模型代码,可以自定义输入类型,执行函数,worker数量。

func genPoolChanBuffer(nums []float64) <-chan interface{} {
    out := make(chan interface{}, BUFFERSIZE)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

type Handler func(*Work)
type Work struct {
    input interface{}
}

type Pool struct {
    channum   int
    workernum int
    wg        *sync.WaitGroup
    ch        chan Work
    Func      Handler
}

func (p Pool) RunWorker() {
    for i := 0; i < p.workernum; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for work := range p.ch {
                p.Func(&work)
            }
        }()
    }
}

func (p Pool) FeedWorker(in <-chan interface{}) {
    go func() {
        for n := range in {
            work := Work{
                input: n,
            }
            p.ch <- work
        }
        close(p.ch)
    }()
}

func (p Pool) Wait() {
    p.wg.Wait()
}

func InitPool(channum, workernum int, f Handler) *Pool {
    return &Pool{
        channum:   channum,
        workernum: workernum,
        wg:        &sync.WaitGroup{},
        ch:        make(chan Work, channum),
        Func:      f,
    }
}

对比测试

设计了四个对比测试,观察在CPU密集型和IO密集型的情况下该pool的表现,另外还旨在探索什么情况下worker的数量会越多越好?带Min的测试中设置gonum为系统核数的一半,带Max的测试中设置gonum为系统核数的十倍,gonum即为worker数量。

//测试函数
func BenchmarkCPUPool(b *testing.B) {
    channum := 100
    gonum := runtime.NumCPU()
    for i := 0; i < b.N; i++ {
        f := func(w *Work) {
            if v, ok := w.input.(float64); ok {
                cpubound(v)
            }
        }
        list := benchmarkList()
        c := genPoolChanBuffer(list)

        p := InitPool(channum, gonum, f)
        p.RunWorker()
        p.FeedWorker(c)
        p.Wait()
    }
}

func BenchmarkCPUPoolMin(b *testing.B) {
    channum := 100
    gonum := runtime.NumCPU() / 2
    for i := 0; i < b.N; i++ {
        f := func(w *Work) {
            if v, ok := w.input.(float64); ok {
                cpubound(v)
            }
        }
        list := benchmarkList()
        c := genPoolChanBuffer(list)

        p := InitPool(channum, gonum, f)
        p.RunWorker()
        p.FeedWorker(c)
        p.Wait()
    }
}

func BenchmarkCPUPoolMax(b *testing.B) {
    channum := 100
    gonum := 10 * runtime.NumCPU()
    for i := 0; i < b.N; i++ {
        f := func(w *Work) {
            if v, ok := w.input.(float64); ok {
                cpubound(v)
            }
        }
        list := benchmarkList()
        c := genPoolChanBuffer(list)

        p := InitPool(channum, gonum, f)
        p.RunWorker()
        p.FeedWorker(c)
        p.Wait()
    }
}

func BenchmarkIOPool(b *testing.B) {
    channum := 100
    gonum := runtime.NumCPU()
    for i := 0; i < b.N; i++ {
        f := func(w *Work) {
            if v, ok := w.input.(float64); ok {
                iobound(v)
            }
        }
        list := benchmarkList()
        c := genPoolChanBuffer(list)

        p := InitPool(channum, gonum, f)
        p.RunWorker()
        p.FeedWorker(c)
        p.Wait()
    }
}

func BenchmarkIOPoolMin(b *testing.B) {
    channum := 100
    gonum := runtime.NumCPU() / 2
    for i := 0; i < b.N; i++ {
        f := func(w *Work) {
            if v, ok := w.input.(float64); ok {
                iobound(v)
            }
        }
        list := benchmarkList()
        c := genPoolChanBuffer(list)

        p := InitPool(channum, gonum, f)
        p.RunWorker()
        p.FeedWorker(c)
        p.Wait()
    }
}

func BenchmarkIOPoolMax(b *testing.B) {
    channum := 100
    gonum := 10 * runtime.NumCPU()
    for i := 0; i < b.N; i++ {
        f := func(w *Work) {
            if v, ok := w.input.(float64); ok {
                iobound(v)
            }
        }
        list := benchmarkList()
        c := genPoolChanBuffer(list)

        p := InitPool(channum, gonum, f)
        p.RunWorker()
        p.FeedWorker(c)
        p.Wait()
    }
}

设MAX=100,BUFFERSIZE=1000。

[baixiao@localhost go_concurrency]$ GOGC=off go test -cpu 1,8 -run none -bench Pool -benchtime 3s
goos: linux
goarch: amd64
BenchmarkCPUPool                  100000             38070 ns/op
BenchmarkCPUPool-8                 50000             77872 ns/op
BenchmarkCPUPoolMin               100000             33286 ns/op
BenchmarkCPUPoolMin-8              50000             71690 ns/op
BenchmarkCPUPoolMax                30000            143250 ns/op
BenchmarkCPUPoolMax-8              20000            281619 ns/op
BenchmarkIOPool                      200          21382667 ns/op
BenchmarkIOPool-8                    200          21467617 ns/op
BenchmarkIOPoolMin                   100          37068480 ns/op
BenchmarkIOPoolMin-8                 100          37350682 ns/op
BenchmarkIOPoolMax                   500           9438800 ns/op
BenchmarkIOPoolMax-8                 500           9519574 ns/op
PASS
ok      _/home/baixiao/go_concurrency   64.149s 

可以得出以下结论:

  • CPU密集型场景中多核下均弱于单核
  • CPU密集型场景中worker数量太多只能起反作用
  • IO密集型场景中多核并行不能提升效率
  • IO密集型场景中worker数量在一定范围内能有效提升效率
  • Pool模型由于用到了channel,多核都不能提升效率

channel是个好东西?

在第一篇里,我们讲到channel是goroutine之间通信和同步的重要工具,也是golang中重要的关键字之一,说明golang的设计者们很看重这个特性。

但是实际上channel的性能较一般,分析源码可知,channel中的数据无论读写都会加mutex锁,造成高并发时的较大瓶颈,这个从我们的对比测试中也都可以看出来。

image

上图来自文章

另外,channel目前都还有整个社群都无法调优的问题,比如runtime: select on a shared channel is slow with many Ps


本篇的模型可以类比为你现在是个企业主,开工厂进行工业生产了。


所有代码都在https://github.com/baixiaoustc/go_concurrency/blob/master/second_post_test.go中能找到。

原文载于golang并发三板斧系列之二:goroutine池用于并发

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容