go并发

总结源自《go in practice》

go并发包含两个概念:

goroutine:使用一个函数,但是与调用该函数是独立的。
channel:goroutine间通信用的管道

package main

import (
    "io"
    "os"
    "time"
)

func echo(in io.Reader, out io.Writer) {
    io.Copy(out, in)
}

func main() {
    go echo(os.Stdin, os.Stdout)
    time.Sleep(30 * time.Second)
    os.Exit(0)
}

使用一个goroutine在一直在后台将标准输入中的内容拷贝到标准输出当中,直到main函数退出。
使用匿名函数来新建goroutine:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    fmt.Println("Outside a goroutine")
    go func() {
        fmt.Println("Inside a goroutine")
    }()
    fmt.Println("Outside again")
    runtime.Gosched()
}

waitGroup使用

在main函数中调用runtime.Gosched是为了挂起main本身协程,让匿名函数执行结果能打印出来。使用go关键字创建goroutine并不意味着该goroutine会
马上得到执行。这个需要根据go的调度器来安排的。Gosched只能提供其他goroutine执行的机会,如果其他goroutine正在等待数据库查询或者读io的话
调度器会继续执行当前的goroutine,而无法保证其他的goroutine能执行完成。要等待所有的goroutine执行完成需要调用wait group。

package main

import (
    "compress/gzip"
    "io"
    "os"
)

func compress(filename string) error {
    in, err := os.Open(filename)
    if err != nil {
        return err
    }
    defer in.Close()
    out, err := os.Create(filename + ".zip")
    if err != nil {
        return err
    }
    defer out.Close()
    
    gzout := gzip.NewWriter(out)
    _, err = io.Copy(gzout, in)
    gzout.Close()
    return err
}

func main() {
    for _, file := range os.Args {
        compress(file)
    }
}

该程序实现,将命令行后面提供的所有文件,压缩成对应的.zip文件。因为gzip函数是是耗io的,所以该程序在性能上没有充分利用cpu多核来实现多个
文件的并行压缩:

func main() {
    var wg sync.WaitGroup
    var i int = -1
    var file string
    for i, file = range os.Args[1:] {
        wg.Add(1)
        go func(file string) {
            compress(file)
            wg.Done()
        }(file)
    }
    wg.Wait()
    fmt.Printf("Compressed %d files\n", i+1)
}

修改的main函数可以并发的执行多个文件压缩。main函数通过wait函数等待所有的goroutine结束才返回。注意:这里为了保证for循环每次迭代后对应的
file都传入对应的goroutine当中,需要在匿名函数中添加参数file,如果直接调用compress传入file,随着for循环的迭代,前面的goroutine不一定立即
得到执行导致前面的goroutine中filename和后面的一样,最后导致结果不正确。为了防止for循环当中传入到每个goroutine参数都不会被改变,必须
在匿名函数当中传入参数的副本。

mutex互斥锁使用

package main

import (
    "bufio"
    "fmt"
    "os"
    "strings"
    "sync"
)

type words struct {
    sync.Mutex
    found map[string]int
}

func newWords() *words {
    return &words{found: map[string]int{}}
}

func (w *words) add(word string, n int) {
    w.Lock()
    defer w.Unlock()
    if _, ok := w.found[word]; !ok {
        w.found[word] = n
        return
    }
    w.found[word] += n
}

func tallyWords(filename string, dict *words) error {
    file, err := os.Open(filename)
    if err != nil {
        return err
    }
    defer file.Close()
    scanner := bufio.NewScanner(file)
    scanner.Split(bufio.ScanWords)
    for scanner.Scan() {
        word := strings.ToLower(scanner.Text())
        dict.add(word, 1)
    }
    return scanner.Err()
}

func main() {
    var wg sync.WaitGroup
    w := newWords()
    for _, f := range os.Args[1:] {
        wg.Add(1)
        go func(file string) {
            if err := tallyWords(file, w); err != nil {
                fmt.Println(err.Error())
            }
            wg.Done()
        }(f)
    }
    wg.Wait()
    fmt.Println("Words that appear more than onece:")
    for word, count := range w.found {
        if count > 1 {
            fmt.Printf("%s: %d\n", word, count)
        }
    }
}

统计命令行参数传入文件,所有单词出现次数大于2的。这里使用mutex互斥锁,实现多个goroutine访问map时不会冲突。需要注意的是只有所有的goroutine
都在等待同一个互斥锁,才能实现这种对同一个资源的竞争。该互斥锁必须是唯一。

channel使用

channle可以类比成网络socket,可以单项或者双向的发送数据到接收方。channle发送的数据是有类型的,不同于socket发送的字节流。

package main

import (
    "fmt"
    "os"
    "time"
)

func readStdin(out chan<- []byte) {
    for {
        data := make([]byte, 1024)
        length, _ := os.Stdin.Read(data)
        if length > 0 {
        }
        out <- data
    }
}

func main() {
    done := time.After(30 * time.Second)
    echo := make(chan []byte)
    go readStdin(echo)
    for {
        select {
        case buf := <-echo:
            os.Stdout.Write(buf)
        case <-done:
            fmt.Println("Time out")
            os.Exit(0)
        }
    }

}

time.After会在等待的时候后返回一个channel time.Time类型。select会阻塞等到case能执行,多个case都有数据会随机选择。

如何安全的关闭channel

package main

import (
    "fmt"
    "time"
)

func send(ch chan string) {
    for {
        ch <- "Hello"
        time.Sleep(500 * time.Millisecond)
    }
}

func main() {
    msg := make(chan string)
    util := time.After(1 * time.Second)
    go send(msg)
    for {
        select {
        case m := <-msg:
            fmt.Println(m)
        case <-util:
            close(msg)
            time.Sleep(500 * time.Microsecond)
            return
        }
    }
}

output:

E:\go\awesomeProject\concurrent>go run echo.go
Hello
Hello
panic: send on closed channel

goroutine 7 [running]:

出现panic错误,因为main函数在时间到之后已经关闭了channel,而send后台还要goroutine往msg通道发送数据。注意:关闭通道必须由发送方来关闭
否则会出现上面错误。

package main

import (
    "fmt"
    "time"
)

func send(ch chan<- string, done <-chan bool) {
    for {
        select {
        case <-done:
            println("Done")
            close(ch)
            return
        default:
            ch <- "hello"
            time.Sleep(500 * time.Microsecond)
        }
    }
}

func main() {
    msg := make(chan string)
    done := make(chan bool)
    util := time.After(5 * time.Second)
    go send(msg, done)
    for {
        select {
        case m := <-msg:
            fmt.Println(m)
        case <-util:
            done <- true
            time.Sleep(500 * time.Millisecond)
            return
        }
    }
}

通过添加一个done通道,来通知send关闭msg通道,实现通道安全关闭。在go当中,经常需要设置一个done通道来进行goroutine之间状态的同步。
上面的用例send函数中done只能用于发送,ch用于接收。接收端触发通道关闭条件时,就需要通知发送端,通过done来通知。

使用buffer channel实现锁的功能

package main

import (
    "fmt"
    "time"
)

func worker(id int, lock chan bool) {
    fmt.Printf("%d wants the lock\n", id)
    lock <- true
    fmt.Printf("%d has the lock\n", id)
    time.Sleep(500 * time.Millisecond)
    fmt.Printf("%d is releasing the lock\n", id)
    <-lock
}
func main() {
    lock := make(chan bool, 1)
    for i := 1; i < 7; i++ {
        go worker(i, lock)
    }
    time.Sleep(10 * time.Second)
}

output

E:\go\awesomeProject\concurrent>go run echo.go
1 wants the lock
1 has the lock
2 wants the lock
4 wants the lock
6 wants the lock
3 wants the lock
5 wants the lock
1 is releasing the lock
2 has the lock
2 is releasing the lock
4 has the lock
4 is releasing the lock
6 has the lock
6 is releasing the lock
3 has the lock
3 is releasing the lock
5 has the lock
5 is releasing the lock

在output结果中看到首先是goroutine 1获得写入channel的机会,执行打印操作sleep之后读出channel里的内容,释放channel空间供其他goroutine写入。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352