【Go高效并发模式】

for select

for  select 循环模式非常常见,之前也介绍过,它一般和channel组合完成任务,代码格式如下:
for { // for 无限循环,或者for  range
    select {
        // 通过一个channel控制
    }
}


// for select 无限循环
for {
    select {
        case <- done:
            return 
        default :
            // 执行具体任务
    }
}


// for range select 有限循环
for _, s := range []int{} {
    select {
        case <-done:
            return
        case resultCh <- s:
    }
}
  • 第一种for + select 多路复用的并发模式,那个case满足要求执行哪个,直到满足一定条件退出for循环。这种模式会一直执行default语句中的任务,直到channel被关闭为止
  • 第二种模式是for range select 有限循环,一般用于把可以迭代的内容发送到channel上。这种模式也会有一个done channel,用于退出当前for循环,另一个resultCh channel用于接收for range 循环的值,这些值通过resultCh 可以传递给其他调用者。

select timeout模式

假如需要访问服务器获取数据,因为网络的响应时间不一样,为保证程序的质量,不可能一直等待,所以需要设置一个超时时间,这时候可以使用**select timeout**模式。
func main(){
    result := make(chan string)
    go func(){
        // 模拟网络访问
        time.Sleep(8 * time.Second)
        result <- "服务端结果"
    }()
    
    select {
        case v:= <- result:
            fmt.Println(v)
        case <- time.After(5 * time.Second):
            fmt.Println("网络访问超时了")
    }
}
select timeout 模式的核心在于通过 **time.After**函数设置一个超时时间,防止因为异常造成select语句的无限等待。**小提示**:如果可以使用Context的WithCancel函数超时取消,要优先使用。

Pipiline模式

**Pipeline模式也称为流水线模式,模拟的就是现实世界中的流水线生产**。从技术上看,每一道工序的输出,就是下一道工序的输入,在工序之间传递的东西就是数据,这种模式称为流水线模式,而传递的数据称为数据流。

以组装手机为例,讲解流水线模式的使用。假设一条组装手机的流水线有3道工序,分别是配件采购、配件组装、打包成品。相对工序2来说,工序1是生产者,工序3是消费这。相对工序1来说,工序2是消费者。相对工序3来说,工序2是生产者。
image-20211215225852832
// 工序1采购
func buy(n int) <- chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for i:=1; i <= n ; i++ {
            out <- fmt.Println("配件", 1)
        }
    }()
    
    return out
}

// 工序2组装
func build(in <- chan string) <- chan string {
    out := make(chan string)
    go func(){
        defer close(out)
        for c := range in {
            out <- "组装(" + c + ")"
        }
    }()
    
    return out
}

// 工序3打包
func page(in <- chan string) <- chan string {
    out := make(chan string)
    go func(){
        defer close(out)
        for c := range in {
            out <- "打包(" + c + ")"
        }
    }()
    
    return out
}


func main() {
    coms := buy(10)
    phones := build(coms)
    packs := pack(phones)
    
    for p:= range packs {
        fmt.Println(p)
    }
}
// 输出结果
打包(组装(配件1))
打包(组装(配件2))
打包(组装(配件3))
打包(组装(配件4))
打包(组装(配件5))
打包(组装(配件6))
打包(组装(配件7))
打包(组装(配件8))
打包(组装(配件9))
打包(组装(配件10))

上述例子中,我们可以总结出一个流水线模式的构成:

  1. 流水线由一道道工序构成,每到工序通过channel把数据传递到下一个工序
  2. 每道工序一般都会对应一个函数,函数里有协程和channel,协程一般用于处理数据并把它放入一个channel中,整个函数会返回这个channnel以供下一道工序使用
  3. 最终要有一个组织者把这些工序串起来,这样就形成了一个完整的流水线,对于数据来说就是数据流

扇入和扇出模式

手机流水线经过一段时间运转,组织者发现产能提不上去,经过调研分析,瓶颈在工序2配件组装。工序2过慢,导致工序1配件采购速度不得不下降,下游工序3没什么事情做,不得不闲着,这就是整条流水线产能低下的原因。为了提升产能,组织者决定对工序2增加两班人手。人手增加后,整条流水线示意图如下:
image-20211216000632613
改造后的流水线示意图可以看到,工序2有工序2-1、2-2、2-3三组人手,工序1采购的配件会被工序2的3班人手同时组装,这三班人手组装好的手机会同时传给merge组件汇聚,然后再传给工序3打包。这个流程中,会产生两种模式:**扇出和扇入**。
  • 红色的部分是扇出,对于工序1来说,它同时为工序2的三班人手传递数据,已工序1为中心,三条传递数据的线发散出去,就像一把打开的扇子一样,所以叫扇出
  • 蓝色的部分是扇入,对于merge组件来说,它同时接收工序2三班人手传递的数据进行汇聚,然后传给工序3.已merge组件为中心,三条传递数据的线汇聚到merge组件,也像一个打开的扇子一样,所以叫扇入

Tips:扇出和扇入都像一把打开的扇子,因为数据传递的方向不同,所以叫法也不一样,扇出的数据流是发散传递出去,是输出流;扇入的数据流是汇聚进来,是输入流。

// 扇入函数(组件),把多个channel中的数据发送到一个channel中
func merge(ins ...<-chan string) <- chan string {
    var wg sync.WaitGroup
    out := make(chan string)
    // 把一个channel中的数据发送到out中
    p := func(in <- chan string) {
        defer wg.Done()
        for c := range in {
            out <- c
        }
    }
    wg.Add(len(ins))
    // 扇入,需要启动多个goroutine用于处理多个channel中的数据
    for _, cs := range ins {
        go p(cs)
    }
    
    // 等待所有输入的数据ins处理完,再关闭输出out
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}
新增的merge函数的核心逻辑就是对输入的每个channel使用单独的协程处理,并将每个携程处理的结果都发送到变量out中,达到扇入的目的。总结起来就是通过多个协程并发,把多个channel 合成一个。

在整个手机组装流水线中,merge函数非常小,而且和业务无关,不能当做一道工序,所以管它叫**组件**。该merge组件是可以复用的,流水线中的任何工序需要扇入的时候,都可以使用merge组件。

Tips:改造新增了merge函数,其他函数保持不变,符合开闭原则。开闭原则规定“软件中的对象(类、模块、函数等等)应该对于扩展是开放的,但是对于修改是封闭的”。

// 流水线组织者main函数如何使用扇出和扇入并发模式

func main() {
    coms := buy(100)
    
    // 同时调用3次build函数,也就是为工序2增加人手,然后通过merge函数将三个channel汇聚成一个,然后传给pack函数打包
    phones1 := build(coms)
    phones2 := build(coms)
    phones3 := build(coms)
    
    // 汇聚三个channel成一个
    phones := merge(phones1, phones2, phones3)
    packs := pack(phones)
    
    // 输出
    for p := range packs {
        fmt.Println(p)
    }
}
通过扇出和扇入模式,整条流水线就被扩充好了,大大提高了生产效率。因为已经有了通用的扇入组件merge,所以整条流水线中任何需要扇出、扇入提高性能的工序,都可以复用merge组件做扇入,并且不用做任何隔离。

Futures模式

Pipeline流水线模式中的工序是相互依赖的,上一道工序做完,下一道工序才能开始。但是在我们实际需求中,也有大量的任务之间相互独立、没有依赖,所以为了提高性能,这些独立的任务就可以并发执行。

Futures模式可以理解为**未来模式**,主协程不用等待子协程返回的结果,可以先去做其他事情,等未来需要子协程结果的时候再来取,如果子协程没有返回结果,就一直等待。
// 洗菜和烧水是两个相互独立的任务可以一起做,所以可以通过开启协程的方式,实现同时做的功能。当任务完成后,结果会通过channel返回。
// 洗菜
func washVegetables() <- chan string {
    vegetables := make(chan string)
    go func(){
        time.Sleep(5 * time.Second)
        vegetables <- "洗好的菜"
    }()
    
    return vegetables
}

// 烧水
func boilWater() <- chan string {
    water := make(chan string)
    go func(){
        time.Sleep(5 * time.Second)
        water <- "水烧好了"
    }()
    
    return water
}


func main(){
    vagetablesCh := washVegetables() // 洗菜
    waterCh := boilWater() // 烧水
    fmt.Println("已经安排洗菜和烧水了,休息一下")
    time.Sleep(3 * time.Second)
    
    fmt.Println("要做反了,看看菜和水好了吗")
    vegetables := <- vegetables
    water := <-waterCh
    fmt.Println("准备好了,可以做饭了", vegetables, water)
}
Futures模式下的协程和普通协程最大区别是可以返回结果,而这个结果会在未来的某个时间点使用。所以在未来获取这个结果的时候的操作必须是阻塞的操作,要一直等到获取结果为止。

如果大的任务可以拆解为一个个独立并发执行的小任务,并且可以通过这些小任务的结果得出最终大任务的结果,就可以使用Future模式。

小结:并发模式和设计模式很相似,都是对现实场景的抽象封装,以便提供一个统一的解决方案。但是和设计模式不同的是,并发模式更专注于异步和并发。


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,427评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,551评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,747评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,939评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,955评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,737评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,448评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,352评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,834评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,992评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,133评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,815评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,477评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,022评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,147评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,398评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,077评论 2 355

推荐阅读更多精彩内容