Go并发控制简明教程-WaitGroup和Context简明教程

控制并发的两种方式

  • 使用WaitGroup
  • 使用Context

WaitGroup简单例子

使用WaitGroup可以把一个作业分包,使用多个协程完成,节省作业处理时间。

func main(){
    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        time.Sleep(2 * time.Second)
        fmt.Println("job 1 done.")
        wg.Done()
    }()

    go func() {
        time.Sleep(1 * time.Second)
        fmt.Println("job 2 done.")
        wg.Done()
    }()

    fmt.Println("Wait for job finish")
    wg.Wait()
}

声明一个WaitGroup,大小设置为2,表示有需要等待两个子协程完成。创建两个子协程,分别以睡眠代替作业负载,子协程结束前调用wg.Done()表示此任务已经完成。主协程在wg.Wait()时阻塞,等待子协程结束,countDown数为0时就继续执行。countDown数如何减少呢?通过wg.Done()完成的。

以下是输出:

~ » go run main.go
Wait for job finish
job 2 done.
job 1 done.

但是,我们会发现,创建一个子协程后,主协程无法控制子协程,只能等待,不能因为时间过长而发送信号通知子协程停止执行。

Channel + Selete控制协程

为了解决上面提到的问题,可以简单使用Channel + Selete来实现子协程执行时间过长后,主协程通知子协程结束返回的功能。

func main(){
    stop := make(chan bool)

    go func() {
        for {
            select {
            case <- stop:
                fmt.Println("job timeout return")
                return
            default:
                fmt.Println("job still working")
                time.Sleep(1 * time.Second)
            }
        }
    }()

    time.Sleep(5 * time.Second)
    fmt.Println("Timeout stop the job")
    stop <- true
    time.Sleep(5 * time.Second)
    fmt.Println("Main goroutine finished!!")
}

创建一个布尔类型的Channel用于通知子协程是否停止。启动一个子协程模拟搞IO作业,里面有一个select,用于等待主协程的stop信号,如果没有此信号就执行default下面的语句。主协程5s后表示子协程执行超时,那么发送stop信号,子协程接收到信号后返回。主协程正常结束。

下面是执行日志:

~ » go run main.go
job still working
job still working
job still working
job still working
job still working
Timeout stop the job
job timeout return
Main goroutine finished!!

Context简单例子

上面的Channel + Selete简单例子只能解决一层主子协程的控制,如果子协程里也有子协程,那么此方法就无法奏效了。

不如下图,Worker2启动Job03协程工作,而Job03也要启动Job04协程配合工作。此时一旦Job04超时未返回,需要把此信号传递给Worker02,让它通知Job03和Job04停止工作,这种情况就很难解决了。

层级子协程

以此,Go引入了Context处理这种情况,Context是一种很好的设计模式。

下面使用Context改造Channel + Selete的例子:

func main(){
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        for {
            select {
            case <- ctx.Done():
                fmt.Println("job timeout return")
                return
            default:
                fmt.Println("job still working")
                time.Sleep(1 * time.Second)
            }
        }
    }()

    time.Sleep(5 * time.Second)
    fmt.Println("Timeout stop the job")
    cancel()
    time.Sleep(5 * time.Second)
    fmt.Println("Main goroutine finished!!")
}

改造点如下:

  • 把创建一个stop的Channel换成创建一个Context。
  • 把子协程里监听stop的Channel的信号换成ctx.Done()
  • 把在主协程给Channel写入true信号换成调用创建Context时返回的cancel方法。

context.WithCancel方法用于创建一个可以发送取消信号的Context,它的入参需要一个父Context,此处使用了context.Background(),它正是根Context。调用cancel方法,会通过ctx.Done()通知子协程,然后使用select处理此信号。

Context控制多个子协程
func main(){
    ctx, cancel := context.WithCancel(context.Background())

    go Work(ctx, "node1")
    go Work(ctx, "node2")
    go Work(ctx, "node3")

    time.Sleep(5 * time.Second)
    fmt.Println("Timeout stop the job")
    cancel()
    time.Sleep(5 * time.Second)
    fmt.Println("Main goroutine finished!!")
}

func Work(ctx context.Context, name string){
    for {
        select {
        case <- ctx.Done():
            fmt.Println(name, "job timeout return")
            return
        default:
            fmt.Println(name, "job still working")
            time.Sleep(1 * time.Second)
        }
    }
}

把子协程的方法抽取出来,我们尝试启动3个子协程,调用cancel()方法发送取消信号,所有子协程都停止了。

如下面打印日志所示:

~ » go run main.go
node2 job still working
node3 job still working
node1 job still working
node1 job still working
node3 job still working
node2 job still working
node3 job still working
node2 job still working
node1 job still working
node1 job still working
node2 job still working
node3 job still working
node3 job still working
node1 job still working
node2 job still working
Timeout stop the job
node2 job timeout return
node1 job timeout return
node3 job timeout return
Main goroutine finished!!

第二个例子,符合上面图的情况,主协程启动node1子协程,node1子协程启动node2子协程,那么只需要把Context传递下去,所有的子协程就能接受到主协程的取消信息,然后马上返回。

func main(){
    ctx, cancel := context.WithCancel(context.Background())

    go Work(ctx, "node1")

    time.Sleep(5 * time.Second)
    fmt.Println("Timeout stop the job")
    cancel()
    time.Sleep(5 * time.Second)
    fmt.Println("Main goroutine finished!!")
}

func Work(ctx context.Context, name string){
    go Work2(ctx, "node2")
    for {
        select {
        case <- ctx.Done():
            fmt.Println(name, "job timeout return")
            return
        default:
            fmt.Println(name, "job still working")
            time.Sleep(1 * time.Second)
        }
    }
}

func Work2(ctx context.Context, name string){
    for {
        select {
        case <- ctx.Done():
            fmt.Println(name, "job timeout return")
            return
        default:
            fmt.Println(name, "job still working")
            time.Sleep(1 * time.Second)
        }
    }
}

执行日志如下:

~ » go run main.go
node2 job still working
node1 job still working
node1 job still working
node2 job still working
node1 job still working
node2 job still working
node2 job still working
node1 job still working
node1 job still working
node2 job still working
Timeout stop the job
node2 job timeout return
node1 job timeout return
Main goroutine finished!!

晚安~

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 开发go程序的时候,时常需要使用goroutine并发处理任务,有时候这些goroutine是相互独立的,而有的时...
    驻马听雪阅读 2,505评论 0 21
  • go并发编程入门到放弃 并发和并行 并发:一个处理器同时处理多个任务。 并行:多个处理器或者是多核的处理器同时处理...
    yangyunfeng阅读 614评论 0 2
  • GoLang并发控制(上) 在go程序中,最被人所熟知的便是并发特性,一方面有goroutine这类二级线程,对这...
    不喜欢夜雨天阅读 3,646评论 0 8
  • 本文从上下文Context、同步原语与锁、Channel、调度器四个方面介绍Go语言是如何实现并发的。本文绝大部分...
    彦帧阅读 1,638评论 1 3
  • 前言:本专题用于记录自己(647)在Go语言方向的学习和积累。系列内容比较偏基础,推荐给想要入门Go语言开发者们阅...
    齐舞647阅读 911评论 0 4