context.Context取消其他协程的操作

  • 现在有一个需求,两个子协程分别执行两个一次性长耗时操作,其中一个协程因为错误退出的时候,另外一个协程也需要退出,当我阅读相关文章的时候都告诉我,用如下代码实现:

    package main
    
    import (
        "context"
        "errors"
        "sync"
        "time"
    )
    
    func main() {
        ctx, cancel := context.WithCancel(context.Background())
        wg := sync.WaitGroup{}
        errChan := make(chan error)
        wg.Add(2)
        // 子协程1
        go func(ctx context.Context) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    // 模拟一个阻塞30秒的长耗时任务
                    time.Sleep(30 * time.Second)
                }
            }
        }(ctx)
    
        // 子协程2
        go func() {
            defer wg.Done()
            // 模拟执行3秒以后出现了错误退出协程
            time.Sleep(3 * time.Second)
            errChan <- errors.New("something is wrong")
        }()
    
        // cancel本身应该在子协程出现错误退出的时候调用
        // 因为子协程1和子协程2都可能会出现错误而退出
        // 为了避免忘记调用cancel的情况,专门另起一个协程来控制cancel操作
        go func() {
            if err := <-errChan; err != nil {
                cancel()
            }
        }()
        wg.Wait()
        close(errChan)
    }
    

但是仔细分析后,发现这样的代码并不能满足我们的需求。

先我们先明确一下我们需求:

  1. 子协程1和子协程2都是只需要执行一次的长耗时任务
  2. 子协程2因为发生了错误退出,此时子协程1也需要退出

我们再来分析上面的代码,是否能满足我们的需求:

  1. 当子协程2发生错误退出了,将错误放入errChan中,errChan拿出值发现err != nil,调用cancel
  2. 此时子协程1正在被阻塞中,等待30秒阻塞完成以后,进入下一次循环,发现当前当前协程应该cancel了,于是当前子协程1退出协程。

显然执行的结果并不能满足我们的预期需求:

假如子协程1中的任务执行了一次以后,进入下一次循环,发现ctx还没有接收到cancel的信号,就会第二次执行任务,现在与我们的需求是违背的。

此时的解决方案可以有两种:

  1. 在子协程1中加入一个bool类型的变量来判断任务是否已经执行过,代码如下:

    // 子协程1
    go func(ctx context.Context) {
        defer wg.Done()
        var isExec bool
        for {
            select {
                case <-ctx.Done():
                 return
                default:
                    if !isExec {
                        // 模拟一个阻塞30秒的长耗时任务
                        time.Sleep(30 * time.Second) 
                    }
            }
        }
    }(ctx)
    

    这样做其实也没有意义,这个任务本身就应该只执行一次,执行结束后,难道一直循环着等其他地方cancel以后才退出当前协程吗?

  2. 任务执行完成以后return直接退出,代码如下:

    // 子协程1
    go func(ctx context.Context) {
        defer wg.Done()
        for {
            select {
                case <-ctx.Done():
                 return
                default:
                 // 模拟一个阻塞30秒的长耗时任务
                 time.Sleep(30 * time.Second) 
                 return
            }
        }
    }(ctx)
    

    这样做以后就会导致ctx的cancel没有任何意义,不管怎样,子协程1中的任务都是会执行完成以后才会退出的

仔细分析下来,这样的写法其实并不能满足我们的需求。

那么到底应该如何书写才能满足我们的需求呢。

需要分为三种情况来看:

  1. 任务本身是可以通过context.Context控制的,比如http请求

    package main
    
    import (
        "context"
        "errors"
        "fmt"
        "io"
        "net/http"
        "sync"
        "time"
    )
    
    func main() {
        ctx, cancel := context.WithCancel(context.Background())
        // 当有两个协程往同一个通道中写入数据的时候,但是又只有一处读的情况下,至少需要一个缓冲区
        // 否则会造成死锁
        errChan := make(chan error, 1)
        wg := sync.WaitGroup{}
        wg.Add(2)
        // 子协程1
        go func(ctx context.Context) {
            defer wg.Done()
            request, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1:8081", nil)
            if err != nil {
                errChan <- err
                return
            }
            resp, err := http.DefaultClient.Do(request)
            if err != nil {
                errChan <- err
                return
            }
            defer resp.Body.Close()
            body, err := io.ReadAll(resp.Body)
            if err != nil {
                errChan <- err
                return
            }
            fmt.Println(string(body))
        }(ctx)
        // 子协程2
        go func() {
            defer wg.Done()
            time.Sleep(3 * time.Second)
            errChan <- errors.New("something is wrong")
        }()
        
        // cancel本身应该在子协程出现错误退出的时候调用
        // 因为子协程1和子协程2都可能会出现错误而退出
        // 为了避免忘记调用cancel的情况,专门另起一个协程来控制cancel操作
        go func() {
            if err := <-errChan; err != nil {
                fmt.Println(err)
                cancel()
            }
        }()
        wg.Wait()
    }
    

    上面的代码中,子协程1中访问的是一个耗时较长的http接口(我在此接口中sleep了30秒来模拟因为网络原因或者其他原因导致接口访问时间较长的情况),假如子协程2运行了3秒以后出现了错误,调用了cancel,那么子协程1也会因为context的控制产生错误直接退出,不需要等待30秒请求结束以后才会退出。

  2. 如果任务本身不能通过ctx控制,但是任务本身是可以拆分为多次完成的任务。比如,子协程1中的任务是读取一个100M文件。

    package main
    
    import (
        "context"
        "errors"
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        ctx, cancel := context.WithCancel(context.Background())
        // 当有两个协程往同一个通道中写入数据的时候,但是又只有一处读的情况下,至少需要一个缓冲区
        // 否则会造成死锁
        errChan := make(chan error, 1)
        wg := sync.WaitGroup{}
        wg.Add(2)
        // 子协程1
        go func(ctx context.Context) {
            for i := 0; i < 100; i++ {
                select {
                case <-ctx.Done():
                    return
                default:
                    time.Sleep(1 * time.Second)
                    fmt.Println("读取1M的数据")
                }
            }
        }(ctx)
        // 子协程2
        go func() {
            defer wg.Done()
            time.Sleep(3 * time.Second)
            errChan <- errors.New("something is wrong")
        }()
    
        // cancel本身应该在子协程出现错误退出的时候调用
        // 因为子协程1和子协程2都可能会出现错误而退出
        // 为了避免忘记调用cancel的情况,专门另起一个协程来控制cancel操作
        go func() {
            if err := <-errChan; err != nil {
                fmt.Println(err)
                cancel()
            }
        }()
        wg.Wait()
    }
    

    上面的代码中,读取100M的文件,分为100次读取,每次读取1M数据,假如子协程2运行了3秒出现错误退出以后,子协程1在读取了最近的1M数据以后进入下一次循环也会发现被cancel了,就会退出协程, 不继续执行任务

  3. 如果任务本身是一次性任务,并且不能拆分为多次任务,又不能被context.Context控制的任务,只能等待任务执行结束,不需要传入context.Context来进行取消控制

除了自己控制context.Context来控制协程取消操作以外,还可以利用ErrGroup的方式来更简单控制协程的取消

package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    eg, ctx := errgroup.WithContext(context.Background())

    eg.Go(func() error {
        request, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1:8081", nil)
        if err != nil {
            return err
        }
        resp, err := http.DefaultClient.Do(request)
        if err != nil {
            fmt.Println(err)
            return err
        }
        defer resp.Body.Close()
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            return err
        }
        fmt.Println(string(body))
        return nil
    })

    eg.Go(func() error {
        for i := 0; i < 10; i++ {
            fmt.Printf("wait %d second\n", i)
            time.Sleep(time.Second)
        }
        return fmt.Errorf("something is wrong")
    })
    if err := eg.Wait(); err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("task is success")
}

上面的代码,可以用非常简单的方式来处理子协程 2出现错误的情况下,子协程1也同时需要退出的需求。不需要自己控制sync.Group和errChan导致代码复杂化。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • [TOC] Golang Context分析 Context背景 和 适用场景 golang在1.6.2的时候还没...
    AllenWu阅读 11,625评论 0 30
  • 在工程化的Go语言开发项目中,Go语言的源码复用是建立在包(package)基础之上的。本文介绍了Go语言中如何定...
    雪上霜阅读 286评论 0 0
  • golang go和php的区别类型:go为编译性语言;php解释性语言错误:go的错误处理机制;php本身或者框...
    Impossible安徒生阅读 465评论 0 0
  • go语言协程使用[vscode-webview://45b6830c-5e27-4be5-8359-1ea2a28...
    xcrossed阅读 1,472评论 0 0
  • 输入与输出-fmt包 时间与日期-time包 命令行参数解析-flag包 日志-log包 IO操作-os包 IO操...
    思考的山羊阅读 6,697评论 0 5

友情链接更多精彩内容