go 实现生产者消费者模式

package main

import (
    "fmt"
    "sync"
)

var (
    finish bool
    ch     = make(chan int, 10)
)

func main() {
    //test01()
    test02()
}

func test01() {
    var wgp sync.WaitGroup
    var wgc sync.WaitGroup
    wgp.Add(3)
    go producer(&wgp, 10)
    go producer(&wgp, 20)
    go producer(&wgp, 30)
    //
    wgc.Add(5)
    go consumer(&wgc, 1)
    go consumer(&wgc, 2)
    go consumer(&wgc, 3)
    go consumer(&wgc, 4)
    go consumer(&wgc, 5)
    wgp.Wait()
    finish = true
    wgc.Wait()
}

func producer(wg *sync.WaitGroup, num int) {
    defer wg.Done()
    for i := 1; i <= 10; i++ {
        ch <- i*num + 1
    }
}

func consumer(wg *sync.WaitGroup, num int) {
    defer wg.Done()
    for !finish {
        data := <-ch
        fmt.Printf("num:%d data:%d\n", num, data)
    }
}

用循环队列的方式

package main

import (
    "fmt"
    "sync"
)

type Queue struct {
    Cache  []int
    head   int
    tail   int
    finish bool
    lock   sync.RWMutex
}

func newQueue(k int) *Queue {
    return &Queue{
        make([]int, k),
        0,
        0,

        false,
        sync.RWMutex{},
    }
}
func (q *Queue) Put(x int) {
    if (q.tail + 1) != q.head {
        q.Cache[q.tail] = x
        q.tail++
        if q.tail >= len(q.Cache)-1 {
            q.tail = 0
        }
    }
}
func (q *Queue) Get() int {
    if q.head == q.tail {
        return -1
    }
    x := q.Cache[q.head]
    q.head++
    if q.head >= len(q.Cache)-1 {
        q.head = 0
    }
    return x
}
func test02() {
    var wgp sync.WaitGroup
    var wgc sync.WaitGroup
    queue := newQueue(20)
    go queue.pro(&wgp, 1)
    go queue.pro(&wgp, 2)
    go queue.pro(&wgp, 3)
    wgp.Add(3)
    go queue.cus(&wgc, 1)
    go queue.cus(&wgc, 2)
    go queue.cus(&wgc, 3)
    go queue.cus(&wgc, 4)
    go queue.cus(&wgc, 5)
    wgc.Add(5)
    wgp.Wait()

    queue.finish = true
    wgc.Wait()
}
func (q *Queue) pro(wg *sync.WaitGroup, num int) {
    defer wg.Done()
    for i := 0; i < 10; i++ {
        q.lock.Lock()
        q.Put(num*10 + i)
        q.lock.Unlock()
        fmt.Println("p", num, ":", num*10+i)
    }
}
func (q *Queue) cus(wg *sync.WaitGroup, num int) {
    defer wg.Done()
    for !q.finish {
        i := -1
        q.lock.RLock()
        if i = q.Get(); i == -1 {
            q.lock.RUnlock()
            continue
        }
        q.lock.RUnlock()
        fmt.Println("c", num, ":", i)
    }
}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容