goroutine并发控制

通信

共享内存

func  Test() {
    ordersInfoApp  :=  make([]orderInfoApp, 0, totalCount)
    var  mux sync.Mutex
    wg  := sync.WaitGroup{}

    for  i  :=  0; i <=  10; i++ {
        wg.Add(1)
        go  func(pageIndex int) {
            // do somethine
            var  ordersInfo orderInfoApp
            mux.Lock()
            ordersInfoApp  =  append(ordersInfoApp, ordersInfo)
            mux.Unlock()

            wg.Done()
        }(i)
    }

    wg.Wait()
}

一般在简单的数据传递下使用

channel

func  Test() {
    ordersInfoApp  :=  make([]orderInfoApp, 0, totalCount)
    choi  :=  make(chan orderInfoApp, 10)
    wg  := sync.WaitGroup{}

    for  i  :=  0; i <=  10; i++ {
        wg.Add(1)
        go  func(pageIndex int) {
            // do somethine
            var  ordersInfo orderInfoApp
            choi <- ordersInfo

            wg.Done()
        }(i)
    }

    go  func() {
        wg.Wait()
        close(choi)
    }()

    for  v  :=  range choi {
        ordersInfoApp  =  append(ordersInfoApp, v)
    }
}

相对复杂的数据流动情况

同步和控制

goroutine退出只能由本身控制,不能从外部强制结束该goroutine
两种例外情况:main函数结束或者程序崩溃结束运行

共享变量控制结束

func  main() {
    running  :=  true
    f  :=  func() {
        for running {
            fmt.Println("i am running")
            time.Sleep(1  * time.Second)
        }
        fmt.Println("exit")
    }
    go  f()
    go  f()
    go  f()
    time.Sleep(2  * time.Second)
    running  =  false
    time.Sleep(3  * time.Second)
    fmt.Println("main exit")
}

优点
实现简单,不抽象,方便,一个变量即可简单控制子goroutine的进行。
缺点:
结构只能是多读一写,不能适应结构复杂的设计,如果有多写,会出现数据同步问题,需要加锁或者使用sync.atomic
不适合用于同级的子go程间的通信,全局变量传递的信息太少
因为是单向通知,主进程无法等待所有子goroutine退出
这种方法只适用于非常简单的逻辑且并发量不太大的场景

sync.Waitgroup 等待结束

func  main() {
    var  wg sync.WaitGroup
    for  i  :=  0; i <  3; i++ {
        wg.Add(1)
        go  func() {
            defer wg.Done()
            // do something
        }()
    }

    wg.Wait()
}

channel控制结束

// 可扩展到多个work
func  main() {
    chClose  :=  make(chan  struct{})
    go  func() {
        for {
            select {
                case  <-chClose:
                return
            }

        // do something
        }
    }()

    //chClose<-struct{}
    close(chClose)
}

注意channel的阻塞情况,避免出现死锁。
通常channel只能由发送者关闭

  • 向无缓冲的channel写入数据会导致该goroutine阻塞,直到其他goroutine从这个channel中读取数据
  • 从无缓冲的channel读出数据,如果channel中无数据,会导致该goroutine阻塞,直到其他goroutine向这个channel中写入数据
  • 向带缓冲的且缓冲已满的channel写入数据会导致该goroutine阻塞,直到其他goroutine从这个channel中读取数据
  • 向带缓冲的且缓冲未满的channel写入数据不会导致该goroutine阻塞
  • 从带缓冲的channel读出数据,如果channel中无数据,会导致该goroutine阻塞,直到其他goroutine向这个channel中写入数据
  • 从带缓冲的channel读出数据,如果channel中有数据,该goroutine不会阻塞
// 读完结束
for {
    select {
    case  <-ch:
        default:
        goto finish
    }
}
finish:
  • 如果多个case同时就绪时,select会随机地选择一个执行
  • case标签里向channel发送或接收数据,case后面的语句在发送接收成功后才会执行
  • nil channel(读、写、读写)的case 标签会被跳过

limitwaitgroup

type  limitwaitgroup  struct {
    sem chan  struct{}
    wg sync.WaitGroup
}

func  New(n int) *limitwaitgroup {
    return  &limitwaitgroup{
        sem: make(chan  struct{}, n),
    }
}

func  (l *limitwaitgroup) Add() {
    l.sem <-  struct{}{}
    l.wg.Add(1)
}

func  (l *limitwaitgroup) Done() {
    <-l.sem
    l.wg.Done()
}

func  (l *limitwaitgroup) Wait() {
    l.wg.Wait()
}

// 例子
wg  := limitwaitgroup.New(6)
for  i  :=  0; i <=  10; i++ {
    wg.Add()
    go  func(index int){
        defer wg.Done()
        // do something
    }(i)
}
wg.Wait()

context

上下文 go 1.7 作为第一个参数在goroutine里传递

Context的接口定义

type  Context  interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan  struct{}
    Err() error
    Value(key interface{}) interface{}
}

Deadline获取设置的截止时间(WithDeadline、WithTimeout), 第一个返回值代表截止时间,第二个返回值代表是否设置了截止时间,false时表示没有设置截止时间

Done方法返回一个关闭的只读的chan,类型为struct{},在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源。

Errcontext没有被结束,返回nil;已被结束,返回结束的原因(被取消、超时)。

Value方法通过一个Key获取该Context上绑定的值,访问这个值是线程安全的。key一般定义当前包的一个新的未导出类型的变量(最好不要使用内置类型),避免和其他goroutine的key冲突。

  • Context衍生
func  WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func  WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func  WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func  WithValue(parent Context, key, val interface{}) Context

这四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生。

通过这些函数,就创建了一颗Context树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。

WithCancel函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context。 WithDeadline函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,也可以不等到这个时候,可以提前通过取消函数进行取消。

WithTimeoutWithDeadline基本上一样,这个表示是超时自动取消,设置多少时间后自动取消Context。

WithValue函数和取消Context无关,生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到

  • 例子
    WithCancel
func  main() {
    ctx, cancel  := context.WithCancel(context.Background())
    go  watch(ctx, "goroutine 1")
    go  watch(ctx, "goroutine 2")
    go  watch(ctx, "goroutine 3")
    time.Sleep(10  * time.Second)
    fmt.Println("开始结束goroutine")
    cancel()
    time.Sleep(5  * time.Second)
    fmt.Println(ctx.Err())
}
func  watch(ctx context.Context, name string) {
  for {
      select {
      case  <-ctx.Done():
          fmt.Println(name, "over")
          return
      default:
          fmt.Println(name, "running")
          time.Sleep(2  * time.Second)
      }
  }
}

// output:
goroutine 1 running
goroutine 2 running
goroutine 3 running
goroutine 1 running
goroutine 2 running
goroutine 3 running
goroutine 1 running
goroutine 2 running
goroutine 3 running
goroutine 2 running
goroutine 3 running
goroutine 1 running
goroutine 3 running
goroutine 2 running
goroutine 1 running
开始结束goroutine
goroutine 1 over
goroutine 2 over
goroutine 3 over
context canceled

WithDeadline

func  main() {
    ctx, cancel  := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
    go  watch(ctx, "goroutine 1")
    go  watch(ctx, "goroutine 2")
    go  watch(ctx, "goroutine 3")
    _  = cancel
    time.Sleep(8  * time.Second)
    fmt.Println(ctx.Err())
}

func  watch(ctx context.Context, name string) {
  for {
      select {
      case  <-ctx.Done():
          fmt.Println(name, "over")
          return
      default:
          fmt.Println(name, "running")
          time.Sleep(2  * time.Second)
      }
  }
}

// output:
goroutine 3 running
goroutine 2 running
goroutine 1 running
goroutine 3 running
goroutine 1 running
goroutine 2 running
goroutine 1 running
goroutine 3 running
goroutine 2 running
goroutine 3 over
goroutine 1 over
goroutine 2 over
context deadline exceeded

WithTimeout

func  main() {
    ctx, cancel  := context.WithTimeout(context.Background(), 5*time.Second)
    go  watch(ctx, "goroutine 1")
    go  watch(ctx, "goroutine 2")
    go  watch(ctx, "goroutine 3")
    _  = cancel
    time.Sleep(8  * time.Second)
    fmt.Println(ctx.Err())
}

func  watch(ctx context.Context, name string) {
  for {
      select {
      case  <-ctx.Done():
          fmt.Println(name, "over")
          return
      default:
          fmt.Println(name, "running")
          time.Sleep(2  * time.Second)
      }
  }
}

// output:
goroutine 3 running
goroutine 1 running
goroutine 2 running
goroutine 3 running
goroutine 2 running
goroutine 1 running
goroutine 2 running
goroutine 1 running
goroutine 3 running
goroutine 2 over
goroutine 3 over
goroutine 1 over
context deadline exceeded

WithValue

type  key  int  // 未导出的包私有类型
var  kkk key =  0

func  main() {
    ctx, cancel  := context.WithCancel(context.Background())
    // WithValue是没有取消函数的
    ctx  = context.WithValue(ctx, kkk, "100W")
    
    go  watch(ctx, "goroutine 1")
    go  watch(ctx, "goroutine 2")
    go  watch(ctx, "goroutine 3")
    
    time.Sleep(8  * time.Second)
    
    fmt.Println("开始结束goroutine")
    cancel()
    
    time.Sleep(3  * time.Second)
    fmt.Println(ctx.Err())
}

func  watch(ctx context.Context, name string) {
  for {
      select {
      case  <-ctx.Done():
          fmt.Println(name, "over")
          return
      default:
          fmt.Println(name, "running", "爸爸给我了", ctx.Value(kkk).(string))
          time.Sleep(2  * time.Second)
      }
  }
}
// output:
goroutine 1 running 爸爸给我了 100W
goroutine 2 running 爸爸给我了 100W
goroutine 3 running 爸爸给我了 100W
goroutine 2 running 爸爸给我了 100W
goroutine 1 running 爸爸给我了 100W
goroutine 3 running 爸爸给我了 100W
goroutine 1 running 爸爸给我了 100W
goroutine 2 running 爸爸给我了 100W
goroutine 3 running 爸爸给我了 100W
goroutine 1 running 爸爸给我了 100W
goroutine 3 running 爸爸给我了 100W
goroutine 2 running 爸爸给我了 100W
开始结束goroutine
goroutine 2 over
goroutine 3 over
goroutine 1 running 爸爸给我了 100W
goroutine 1 over
context canceled

控制多个goroutine

func  main() {
    http.HandleFunc("/", func(W http.ResponseWriter, r *http.Request) {
    fmt.Println("收到请求")
        
    ctx, cancel  := context.WithCancel(context.Background())
    go  worker(ctx, 2)
    go  worker(ctx, 3)
        
    time.Sleep(time.Second *  10)
    cancel()
    fmt.Println(ctx.Err())
    })
    http.ListenAndServe(":9290", nil)
}
func  worker(ctx context.Context, speed int) {
    reader  :=  func(n int) {
        for {
            select {
                case  <-ctx.Done():
                return
                default:
                break
            }
            fmt.Println("reader:", n)
            time.Sleep(time.Duration(n) * time.Second)
        }
    }
    
    go  reader(2)
    go  reader(1)
    
    for {
        select {
            case  <-ctx.Done():
            return
            default:
            break
        }
        fmt.Println("worker:", speed)
        time.Sleep(time.Duration(speed) * time.Second)
    }
}

// output:
收到请求
worker: 2
reader: 1
worker: 3
reader: 1
reader: 2
reader: 2
reader: 1
reader: 1
reader: 1
reader: 2
worker: 2
reader: 2
reader: 1
reader: 1
reader: 1
worker: 3
reader: 1
worker: 2
reader: 2
reader: 1
reader: 2
reader: 1
context canceled
  • 使用规则
    使用Context的程序应遵循以下这些规则来保持跨包接口的一致和方便静态分析工具(go vet)来检查上下文传播是否有潜在问题。

    • 不要将Context存储在结构类型中,而是显式的传递给每个需要的函数; Context应该作为函数的第一个参数传递,通常命名为 ctx:
    func  DoSomething(ctx context.Context, arg Arg) error {
         // ... use ctx ...
    }   
    
    • 即使函数可以接受nil值,也不要传递nil Context。如果不确定要使用哪个Context,请传递 context.TODO。

    • 使用context的Value相关方法只应该用于在程序和接口中传递和请求相关的元数据,不要用它来传递一些可选的参数

    • 相同的Context可以传递给在不同 goroutine 中运行的函数; Context对于多个 goroutine 同时使用是安全的。

本文由博客一文多发平台 OpenWrite 发布!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355

推荐阅读更多精彩内容