Golang笔记-- 并发模式

并发模式

[TOC]

并发程序指同时进行多个任务的程序, Go程序一种支持并发的方式是通过goroutine和channel, 支持“顺序通信进程”(communicating sequential processes)或被简称为CSP.

CSP是一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下仍然是被限制在单一实例中.

goroutines

  • Golang中并发的执行单元叫goroutine, 可类比为线程, 或协程.
  • 程序启动时main函数在main goroutine中运行
  • go语句创建新的goroutine, 多个goroutine并发执行:
go f()
go func() {
   }()
  • 一个简单的并发网络模型:
for {
    conn, err := listener.Accept()
    if err != nil {
        log.Print(err) // e.g., connection aborted
        continue
    }
    go handleConn(conn) // handle connections concurrently
}

func handleConn(c net.Conn) {
    defer c.Close()
    for {
        _, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
        if err != nil {
            return // e.g., client disconnected
        }
        time.Sleep(1 * time.Second)
    }
}
  • Echo 服务器
func echo(c net.Conn, shout string, delay time.Duration) {
    fmt.Fprintln(c, "\t", strings.ToUpper(shout))
    time.Sleep(delay)
    fmt.Fprintln(c, "\t", shout)
    time.Sleep(delay)
    fmt.Fprintln(c, "\t", strings.ToLower(shout))
}

func handleConn(c net.Conn) {
    input := bufio.NewScanner(c)
    for input.Scan() {
        echo(c, input.Text(), 1*time.Second)
    }
    // NOTE: ignoring potential errors from input.Err()
    c.Close()
}
// client
func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    go mustCopy(os.Stdout, conn)
    mustCopy(conn, os.Stdin)
}

Goroutines和线程

  • 量的区别, 动态栈: os线程固定大小(一般2M), goroutines小(一般2k)且不固定, 动态伸缩,最大1G
  • 调度, os没几毫秒由scheduler内核函数调度, goroutines由go自己大调度器调度,如m:n调度方法在n个线程调度m个goroutines, 且不需要进入内核
  • GOMAXPROCS大变量决定调度到多少个os线程

channels

  • channels是goroutines之间通信机制, 通过channel从一个goroutine向另一个发送消息
  • chan定义某种类型的channel, 类比C/C++的一个支持多线程的泛型queue
  • 和map类似,channel也对应一个make创建的底层数据结构的引用: make(chan int)
  • 和make,slice等引用类型一样, channel的零值也是nil
  • 发送和接收两个操作都使用<-运算符
  • 支持close操作, 向close后的channel发送会panic, 但仍然可接收chan里的数据, 如没有数据将产生一个零值
// int型chan, 无缓冲阻塞型, 会阻塞发送者, 直到另一个goroutine接收数据
ch:=make(chan int)
ch<-1 //发送
go func() {
    fmt.Println(<-ch) //接收
}

无缓冲channel

  • 基于无缓存Channels将导致两个goroutine做同步操作, 因此也叫同步channel
  • 当通过一个无缓存Channels发送数据时,接收者收到数据发生在唤醒发送者goroutine之前
func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})
    go func() {
        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
        log.Println("done")
        done <- struct{}{} // signal the main goroutine
    }()
    mustCopy(conn, os.Stdin)
    conn.Close()
    <-done // wait for background goroutine to finish
}

串联的Channels(Pipeline)

Channels也可以用于将多个goroutine连接在一起,一个Channel的输出作为下一个Channel的输入。这种串联的Channels就是所谓的管道(pipeline)

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    // Counter
    go func() {
        for x := 0; x < 100; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    // Squarer
    go func() {
        for x := range naturals {
            squares <- x * x
        }
        close(squares)
    }()

    // Printer (in main goroutine)
    for x := range squares {
        fmt.Println(x)
    }
}

单向channels

<-chan左边是一个只读channel, 在右边则只写,形参传参时可隐式转换:

//改进版本pipeline示例
func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

带缓冲channels

  • 像一个可以容纳指定个数元素的队列, 在达到容量前写入不会阻塞, 写入读取操作解耦
ch = make(chan string, 3)

Multiplexing with select

select多路复用,可以监听多个chan

  • 当一个或多个case满足条件(如chan可发送或接收消息, 类似于读写事件发生), 会随机选择其中一个分支执行
  • default是所有case都阻塞时执行
  • 每一个case代表一个通信操作(在channel上发送或接收)
  • 无分支的select{}会永远等待
  • 因为对一个nil的channel发送和接收操作会永远阻塞,select不会选择到, 可以用nil来激活或者禁用case
select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}

示例: 并发遍历目录

原始版本

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du1: %v\n", err)
        return nil
    }
    return entries
}
  • ioutil.ReadDir()返回os.FileInfo(os.Stat()也返回)类型的slice
  • walkDir()递归调调用
  • walkDir()会向fileSizes这个channel发送一条消息告知文件大小

main包不限制并发的版本(可能产生大量goroutines)

  • 注意for ... select...惯用方式,未使用range循环,用channel接收的二值形式显式判断channel是否close
  • 不用标签break(break loop)的话只会退出内层select
package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
)

func main() {
    // ...determine roots...
    // Traverse each root of the file tree in parallel.
    fileSizes := make(chan int64)
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)
        go walkDir(root, &n, fileSizes)
    }
    go func() {
        n.Wait()
        close(fileSizes)
    }()
    // ...select loop...
    loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop // fileSizes was closed
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
}

// walkDir修改为并发
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, n, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

通过token方式限制并发

// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
    sema <- struct{}{}        // acquire token
    defer func() { <-sema }() // release token
    // ...

并发的退出

Golang未提供一个goroutine终止另一个goroutine的方法, 一般可以通过close done channel的广播方式退出goroutines

var done = make(chan struct{})

for {
    select {
    case <-done:
        return 
    case v, ok:=<-workChan:
        if !ok {
            return 
        }
    }
}

// Cancel traversal when input is detected.
go func() {
    os.Stdin.Read(make([]byte, 1)) // read a single byte
    close(done) // done
}()

基于共享变量的并发

goroutine与channel方式的并发免于处理许多细节, 而基于共享数据的并发就不可避免地需要处理它们, 否则一些竞争条件导致程序产生非预期结果,如错误结果、 死锁(deadlock)、活锁(livelock)和饿死(resource starvation),这就不是并发安全的程序了.

竞争条件

竞争条件指程序在多个goroutine交叉执行时没法给出确定预期的结果.数据竞争是一种特殊竞争条件, 产生于多个goroutines并发访问共享数据,且至少有一个写操作, 避免办法:

  • 不写
  • 避免多个goroutines访问, 绑定到某个goroutine, 其他的都用channel, pipeline channel传递到下一个阶段线性访问这种叫串行绑定
  • 互斥访问, 做并发控制(加锁等)

竞争条件检测: 在go build,go run或者go test命令后加-race

sync.Mutex互斥锁

类似于二元信号量:

var (
    sema    = make(chan struct{}, 1) // a binary semaphore guarding balance
    balance int
)

func Deposit(amount int) {
    sema <- struct{}{} // acquire token
    balance = balance + amount
    <-sema // release token
}

func Balance() int {
    sema <- struct{}{} // acquire token
    b := balance
    <-sema // release token
    return b
}

在Mutex中对应Lock,Unlock:

import "sync"

var (
    mu      sync.Mutex // guards balance
    balance int
)

func Deposit(amount int) {
    mu.Lock()
    balance = balance + amount
    mu.Unlock()
}

func Balance() int {
    mu.Lock()
    // defer mu.Unlock()
    b := balance
    mu.Unlock() 
    return b
}
  • 调用Lock()方法来获取一个互斥锁
  • 如果其它goroutine已经获得了该锁,Lock被阻塞直到其它goroutine调用了Unlock
  • mutex会保护共享变量, 惯例来说,被mutex所保护的变量是在mutex变量声明之后立刻声明
  • Lock和Unlock之间的代码段叫临界区
  • defer来调用Unlock, panic也依然会执行
  • 不是可重入锁(递归锁), 无法对已锁mutex再次上锁, 一个通用的解决方案是将一个函数分离为多个函数(导出的加锁调内部的, 内部的执行实际动作不加锁)
func Withdraw(amount int) bool {
    mu.Lock()
    defer mu.Unlock()
    deposit(-amount)
    if balance < 0 {
        deposit(amount)
        return false // insufficient funds
    }
    return true
}

func Deposit(amount int) {
    mu.Lock()
    defer mu.Unlock()
    deposit(amount)
}

// This function requires that the lock be held.
func deposit(amount int) { balance += amount }

sync.RWMutex读写锁

  • 允许多个只读操作并行执行, 但写操作完全互斥
  • 或叫“多读单写”锁(multiple readers, single writer lock)
  • 读锁RLock, 写锁还是Lock
var mu sync.RWMutex
var balance int
func Balance() int {
    mu.RLock() // readers lock
    defer mu.RUnlock()
    return balance
}

内存同步

写入内存前可能cache, 如不互斥访问, 并不能保证多个goroutines中语句执行顺序

sync.Once

初始化延迟执行有加速启动等好处, 但如果只能或只需执行一次的init操作延迟到多个goroutines执行, 那就需要保证只执行一次, 这就是sync.Once的作用

var loadIconsOnce sync.Once
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
    loadIconsOnce.Do(func(){
    // initializing
    })
    return icons[name]
}

Reference

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容