[翻译]GO并发模型一:Pipeline和Cancellation

image.png

简书不维护了,欢迎关注我的知乎:波罗学的个人主页

这篇文章较长将会分两篇来翻译。原文地址:https://blog.golang.org/pipelines

介绍

Go并发模型使构建能有效利用IO和多核CPU的实时流式数据的pipeline非常方便。这篇文章将对此进行介绍,同时会着重强调一些在实践中的易犯错误以及对应的解决方法。

什么是Pipeline

在GO中,pipeline无明确定义;它是语言提供的一种并发编程方式,由连接各个chanel而形成的一系列阶段组成。在其各个阶段,可能分别运行着很多的goroutine。这些goroutine

  • 从输入channel接收数据
  • 对数据作相应处理,例如在此基础上产生新数据
  • 再通过输出channel把数据发送出去

除了开始和结束,每个阶段都会包含任意多个输入和输出channel。开始阶段只有输出channel,结束阶段只有输入channel。相应地,开始阶段可被称为生产者,结束阶段可被称为消费者。

我们先通过一个简单的例子来说明。

并发计算平方数

首先来举一个三阶段pipeline的例子

第一阶段,创建输入参数为可变长int整数的gen函数,它通过goroutine发送所有输入参数,并在发送完成后关闭相应channel:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    
    reutrn out
}

第二阶段,sq函数,负责从输入channel中接收数据并作平方处理再发送到输出channel中。在输入channel关闭并把所有数据都成功发送至输出channel,关闭输出channel:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()

    return out
}

主函数main中创建了pipeline,并执行了最后阶段的任务,从管道中接收了第二阶段的数据并打印了出来:

func main() {
    // Set up the pipeline
    c := gen(2, 3)
    out := sql(c)

    // Consume the output
    fmt.Println(<-out)
    fmt.Println(<-out)
}

此处sq函数的输入和输出参数为相同类型的channel,因此我们可以对其进行组合。重写main函数,如下:

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n)
    }
}

此处相等于在pipeline中增加了一个阶段,即涉及到了三个阶段,其中2、3阶段的goroutine由同一函数产生。

Fan-out和Fan-in (扇出和扇入)

多个函数可同时从同一个channel中读取数据,直到channel关闭,称为fan-out。这为我们提供了一种将任务分发给多个worker的途径,从而实现CPU和I/O的高效利用。

通过多路复用技术将多个channel合并到单个channel实现从多个输入读取数据的能力,只有当所有的输入都关闭,才会停止数据的读取。这个称作 fan-in

重写之前的main,我们调用两次sq,且两次都从同一个channel中读取数据。我们将引入一个新的函数,通过fan-in方式获取数据:

func main() {
    in := gen(2, 3)
    
    // Distribute the sq work across two goroutines that both read from in
    c1 := sq(in)
    c2 := sq(in)
    
    // Comsume the merged output from c1 and c2
    for n := range merge(c1, c2) {
        fmt.Println(n)  // 4 then 9, or 9 then 4
    }
}

merge函数通过为每个输入channel启动一个goroutine实现将数据发送同一个channel中,从完成将channel列表转化为单个channel的功能。一旦所有的输出channel(生产者)启动,merge就会启动一或多个goroutine接收所有数据并在结束后关闭对应channel。

在已关闭的channel发送数据会导致panic,因此保证关闭channel前所有数据都发送完毕是非常重要的。sync.WaitGroup为我们提供了一种实现该同步的方式。示例如下:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        
        wg.Done()
    }
    wg.Add(len(cs)
    
    for _, c := range(cs) {
        go output(c)
    }
    
    // Start a goroutine to close out once all the output goroutines are
    // done. This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

goroutine突然停止

一些使用原则:

  • 在数据发送完毕后,关闭输出channel;
  • 持续不停地从输入中接收数据,直到channel关闭;

我们可使用for循环来接收数据,一旦数据所有数据接收完毕将会自动退出。

但在真实场景中,并非所有情况都需要接收完所有的数据。有时设计如此,我们只需一部分的数据并可运行。更常见地,如果输入早早就抛出了一个错误,这时该阶段便会早早地退出。还有一些情况,如接收者不再等待数据接收,此时也需停止数据的生产。

在我们的例子中,如果一个阶段没有成功接收完数据就退出,我们的goroutine仍会尝试发送数据,这将会导致channel进入无限期的阻塞。

func main() {
    // Consume the first value from the output
    out := merge(c1, c2)
    fmt.Println(<-out)
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutine is hung attempting to send it
}

这是资源泄露:goroutine会继续消耗内存、运行时资源,而且在栈中的堆引用也不能被回收。goroutine必须退出,才能启动垃圾回收机制。

当下游不能完全接收所有数据时,我们需要准备将上游goroutine退出。一种方式,我们可以把上游的channel设定为一个buffer。当buffer还有空间时,发送操作将会立刻完成。

c := make(chan int, 2)
c <- 1      // succeeds immediately
c <- 2      // succeeds immediately
c <- 3      // blocks until another goroutine does <-c and receives 1

当channel被创建时,如果我们已经知道数据的大小,可以如此来简化我们的代码。比如,我们重写gen函数,拷贝数据到buffer channel中,可以避免创建新的goroutine。

func gen(nums ...int) int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

我们可以考虑给merge函数输出channel指定固定大小空间。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan, int, 1)   // enough space for the unread inputs
    // ... the rest is unchanged ...
}

虽然这解决了程序中goroutine阻塞的问题,但并不是好代码。指定buffer的长度我们需知道merge将接收值的数量。如果下游读取少量数据便结束,依然会阻塞。

我们需要一种方式,使下游通知上游以表明它们不再接收信息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350