Go 并发编程:Goroutine常见应用范式

一、多独立协程并发——worker分工模式

并发协程独立运行且互不通信,主协程等待处理独立子协程的结果

并发编程有一种常见方式就是许多工作子协程都是独立的,互不干扰,但他们又是“同一时间”处理。
例如http服务器的工作模式就是这种,在go标准包net/http中,http服务启动后,它所接收的每一个请求都是独立协程处理的,每个请求的运行协程之间都互不通信。
我们知道,go http服务的请求处理协程都是动态创建的,但很多情况下,我们是需要一些固定数量的协程去独立处理任务,所以这里我们先说固定数量协程独立运行的情况。

范式实现:
  • 创建三个通道,分别处理“任务单元”、“任务结果”、“任务完成状态”三种通信数据;
  • 启动新增任务协程,把任务单元数据发送给jobs通道;
  • 各任务单元分别由独立的worker协程处理,执行独立任务,并把任务处理结果发送给results通道;
  • 启动等待任务结果协程,从done管道接收任务处理完成的数据;
  • 主协程显示处理结果。

我们知道go有个select原语,专门用来处理阻塞或非阻塞通道数据的,我们可以使用for...select...范式最大效率地不断从通道中读出数据。代码如下:

func Demo() {
    run("读书", "看报", "撸代码")
}

type TaskJob struct {
    task    string
    results chan<- Result
}

func (j *TaskJob) do() {
    fmt.Println("Do Task:", j.task)
    // Do Something...
    j.results <- Result{j.task, 200, "Successful"}
}

type Result struct {
    task    string
    code    int
    message string
}

// 逻辑核心数作为并发worker数
var workers = runtime.NumCPU()

func run(tasks ...string) {
    jobs := make(chan TaskJob, workers)      // 任务通道
    results := make(chan Result, len(tasks)) // 执行结果输出通道
    done := make(chan struct{}, workers)     // 任务完成状态通道

    // 子协程添加任务
    go addJob(jobs, tasks, results)

    // 使用单独的worker协程执行并发任务
    for i := 0; i < workers; i++ {
        go doJobs(done, jobs)
    }
    // 主协程等待并处理结果
    waitAndProcessResults1(done, results)
}
// 添加任务
func addJob(jobs chan<- TaskJob, tasks []string, results chan<- Result) {
    for _, task := range tasks {
        jobs <- TaskJob{task: task, results: results}
    }
    // 因为此为发送端,新增完任务后在此关闭通道
    close(jobs)
}

// 执行任务
func doJobs(done chan<- struct{}, jobs <-chan TaskJob) {
    for job := range jobs {
        job.do()
    }
    done <- struct{}{}
}

// 等待并处理子协程记过:for...select...范式同时处理results和done通道,合并awaitCompletion()和showResults()
func waitAndProcessResults1(done <-chan struct{}, results <-chan Result) () {
    // 【阻塞】运行worker时:要么就在处理结果<-results,要么处理完成<-done
    for i := workers; i > 0; {
        select {
        case result := <-results:
            fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
        case <-done:
            i--
        }
    }

    // 【非阻塞】worker全部做完后:results全部清空后退出
DONE:
    for {
        select {
        case result := <-results:
            fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
        default:
            break DONE
        }
    }
}

执行结果:

=== RUN   TestDemo31
Do Task: 撸代码
Do Task: 读书
Task Job :撸代码, Result:200,Successful
Task Job :读书, Result:200,Successful
Do Task: 看报
Task Job :看报, Result:200,Successful
--- PASS: TestDemo31 (0.00s)
PASS

可以看到,当有好几个不同的通道需要处理时,使用select原语是非常方便的。在这里我们集中处理results和done通道。

添加子协程超时处理:

当对worker任务子协程有超时要求时,也可在select中添加超时操作,对waitAndProcessResults()修改如下:

// 合并awaitCompletion()和showResults(),并增加超时处理
func waitAndProcessResults(timeout time.Duration, done <-chan struct{}, results <-chan Result) () {
    // 超时通道
    finish := time.After(time.Duration(timeout))

    // 【阻塞】运行worker时:要么就在处理结果<-results,要么处理完成<-done,要么超时
    for i := workers; i > 0; {
        select {
        case result := <-results:
            fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
        case <-finish:
            fmt.Println("worker任务执行超时!")
            return
        case <-done:
            i--
        }
    }

    // 【非阻塞】worker全部做完后:results全部清空后退出
    for {
        select {
        case result := <-results:
            fmt.Printf("Task Job :%s, Result:%d,%s\n", result.task, result.code, result.message)
        case <-finish:
            fmt.Println("worker任务执行超时!")
            return
        default:
            return
        }
    }
}


二、地铁闸机——限流模式

开启多条子协程, 往有限的通道发送/接收数据,这种情形类似地铁站内的地铁闸机模式,闸机就那么几个,行人分组从闸机通过,通常哪个闸机人少,人们倾向于排哪个闸门的队,或者哪个闸机通行效率高就排哪个闸门的队。

在这种并发模式中,协程相当于行人,通道相当于地铁闸门,多条通道从有限闸门中通过。
具体代码如下:

func Demo() {
    var workers = 100000
    ch0 := make(chan int, 0)
    ch1 := make(chan int, 1)
    ch2 := make(chan int, 2)
    ch3 := make(chan int, 3)

    counter := make([]int, 4)

    // 开workers条协程,发送消息到四条通道
    for i := 0; i < workers; i++ {
        go func(n int) {
            select {
            case ch0 <- n:
                send(n, 0)
            case ch1 <- n:
                send(n, 1)
            case ch2 <- n:
                send(n, 2)
            case ch3 <- n:
                send(n, 3)
            }
        }(i)
    }

    // 主协程从四条通道接收workers条子协程发送的消息
    for i := 0; i < workers; i++ {
        select {
        case rec := <-ch0:
            receive(rec, 0)
            counter[0]++
        case rec := <-ch1:
            receive(rec, 1)
            counter[1]++
        case rec := <-ch2:
            receive(rec, 2)
            counter[2]++
        case rec := <-ch3:
            receive(rec, 3)
            counter[3]++
        }
    }
    fmt.Println("channel counter:", counter)
}

func send(i, j int) {
    fmt.Printf("goroutine#%d send %d to ch%d\n", i, i, j)
}

func receive(routine, ch int) {
    fmt.Printf("receive from goroutine#%d to ch%d channel\n", ch, routine)
}

运行结果:

...
receive from goroutine#92612 to ch2 channel
receive from goroutine#92426 to ch0 channel
receive from goroutine#92733 to ch0 channel
receive from goroutine#92531 to ch0 channel
goroutine#92531 send 92531 to ch0
goroutine#92616 send 92616 to ch3
goroutine#92425 send 92425 to ch2
goroutine#92612 send 92612 to ch2
goroutine#92733 send 92733 to ch0
goroutine#92424 send 92424 to ch2
receive from goroutine#92423 to ch3 channel
receive from goroutine#92614 to ch1 channel
receive from goroutine#92734 to ch1 channel
goroutine#92617 send 92617 to ch3
goroutine#92735 send 92735 to ch1
goroutine#92530 send 92530 to ch3
...
channel counter: [25030 25073 25011 24886]

以上结果表明,多条协程数据比较均衡地从四条通道中通过


三、扇入扇出——分流模式

我们通常会遇到许多耗时任务,如一个通道的数据流向一个执行函数时,当前函数执行时长较长,我们可以把该通道的数据流拆分流向多个通道,并给每个通道启动相应的goroutine处理;随后把所有通道汇总到一个通道输出,以加大该耗时任务的执行效率。
以上过程我们成为扇入扇出过程:

  • 扇出(Fan-out):将一个通道的数据分流给多个通道并启动多个goroutine处理;
  • 扇入(Fan-in):将多个goroutines返回的通道的结果组合到一个通道并输出;

我们看一个简单示例:模拟管道中某一阶段的耗时任务

// 模拟耗时任务
func takeUpTimeTask(done <-chan interface{}, inStream <-chan interface{}) <-chan interface{} {
    outStream := make(chan interface{})
    go func() {
        defer close(outStream)
        for {
            select {
            case <-done:
                return
            case val ,ok := <-inStream:
                if !ok {
                    return
                }
                select {
                case <-done:
                    return
                case outStream <- val:
                    // 模拟耗时任务
                    time.Sleep(time.Second)
                }

            }
        }
    }()
    return outStream
}

我们来模拟调用运行它

// 没有扇入扇出处理
func Demo1() {
    done := make(chan interface{})
    defer close(done)
    inStream := make(chan interface{})

    go func() {
        defer close(inStream)
        for i := 0; i < 20; i++ {
            inStream <- i
        }
    }()

    // 对数据流执行耗时任务
    resultChan := takeUpTimeTask(done, inStream)

    // 使用chRange安全遍历打印
    for val := range chRange(done,resultChan){
        fmt.Printf("%v ", val)
    }
}

结果输出:处理20个数据花费20s

=== RUN   TestDemo1
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 --- PASS: TestDemo81 (20.05s)
PASS

这种情况很显然是可以用并发对耗时任务进行改进的,使用扇入扇出的思路可以这样做:


// 扇出处理耗时任务
func fanOut(done <-chan interface{}, chanStream chan interface{}) []<-chan interface{} {
    numFinders := runtime.NumCPU()
    finders := make([]<-chan interface{}, numFinders)
    for i := 0; i < numFinders; i++ {
        // 耗时任务的分流管道
        finders[i] = takeUpTimeTask(done, chanStream)
    }
    return finders
}

// 扇入汇流结果通道
func fanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
    var wg sync.WaitGroup
    multiplexedStream := make(chan interface{})
    // 管道汇流处理
    multiplex := func(c <-chan interface{}) {
        defer wg.Done()
        for i := range c {
            select {
            case <-done:
                return
            case multiplexedStream <- i:
            }
        }
    }
    // 从所有的通道中取数据
    wg.Add(len(channels))
    for _, c := range channels {
        go multiplex(c)
    }
    // 等待所有数据汇总完毕
    go func() {
        wg.Wait()
        close(multiplexedStream)
    }()
    return multiplexedStream
}

改进之后我们再来调用它们:

// 扇入扇出处理
func Demo2() {
    done := make(chan interface{})
    defer close(done)
    inStream := make(chan interface{})

    go func() {
        defer close(inStream)
        for i := 0; i < 20; i++ {
            inStream <- i
        }
    }()

    // 扇入扇出执行耗时任务
    resultChan := fanIn(done, fanOut(done, inStream)...)

    // 使用chRange安全遍历打印
    for val := range chRange(done,resultChan){
        fmt.Printf("%v ", val)
    }
}

结果输出:四个逻辑单元并发运行耗时5s,效率提升4倍

=== RUN   TestDemo82
0 1 3 2 4 6 5 7 8 10 9 11 12 13 14 15 16 18 17 19 --- PASS: TestDemo82 (5.02s)
PASS

四、动态创建协程

根据需要来动态创建goroutine,并限制可并发的协程数。

// 设置默认最多开启5子协程
const maxRoutineNum = 5

func autoRoutine(wg *sync.WaitGroup, inStream <-chan int) {
    for {
        in, ok := <-inStream
        if !ok {
            break
        }
        // 自动开启协程的条件:输入流为偶数
        if in%2 == 0 && runtime.NumGoroutine() < maxRoutineNum{
            wg.Add(1)
            go process(in, func() { wg.Done() })
        } else {
            process(in, nil)
        }
    }
}

func process(in int, callback func()) {
    if callback != nil {
        defer callback()
        fmt.Printf("sub routine process:%d \n", in)
        time.Sleep(1000000 * time.Microsecond)
    }else{
        fmt.Printf("parent routine process:%d \n", in)
        time.Sleep(100000 * time.Microsecond)
    }
}

调用示例:

// 动态创建协程
func Demo() {
    wg := sync.WaitGroup{}
    inStream := make(chan int)
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer close(inStream)
        for i := 1; i < 10; i++ {
            inStream <- i
        }
    }()
    autoRoutine(&wg, inStream)
    wg.Wait()
}

运行结果:可以看到,程序根据要求最多开不超过5个子协程

=== RUN   TestDemo121
parent routine process:1 
sub routine process:2 
parent routine process:3 
parent routine process:5 
sub routine process:4 
parent routine process:6 
parent routine process:7 
parent routine process:8 
parent routine process:9 
--- PASS: TestDemo121 (1.21s)
PASS

以上只是演示该模式的一个简单例子,但它可以扩展到许多应用,如对不定数量的资源进行计算,其中某些资源比较耗时,需要开启协程提高执行效率,但协程数不能无限扩张。一个具体的例子就是使用go标准包的filepath.Walk()对系统特点目录的文件进行哈希计算,显然对某个目录的文件数是事先不确定的,且大文件的哈希计算耗时比较大,这就需要开启子协程计算,而协程又不能无限制扩张,因为系统对打开文件数又有限制。这种场景就非常适合使用这种模式,感兴趣的可自己实现一下。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容

  • 道可道 非常道 译:道,终之果,道,可以达到的道;非常道,每一种道,每一个人的道,万物之道,都是不同的。因为道就是...
    心玄紫来阅读 420评论 0 1
  • 打算从明天开始,女儿睡了我就赶紧睡,不再熬夜看手机写文章了,该做的事情一定要在白天完成。 因为最近体力越来越不支了...
    今天的星星阅读 450评论 4 10