Go典型并发任务

1、只运行一次

package once_test

import (
    "fmt"
    "sync"
    "testing"
    "unsafe"
)

type Singleton struct {
    data string
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
    once.Do(func() {
        fmt.Println("Create Obj")
        singleInstance = new(Singleton)
    })
    return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            obj := GetSingletonObj()
            fmt.Printf("%X\n", unsafe.Pointer(obj))
            wg.Done()
        }()
    }
    wg.Wait()
}

2、仅需任意任务完成

package concurrency

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

func runTask(id int) string {
    time.Sleep(10 * time.Millisecond)
    return fmt.Sprintf("The result is from %d", id)
}

func FirstResponse() string {
    numOfRunner := 10
    // channel增加buff防止协程泄漏
    ch := make(chan string, numOfRunner)
    for i := 0; i < numOfRunner; i++ {
        go func(i int) {
            ret := runTask(i)
            ch <- ret
        }(i)
    }
    return <-ch
}

func TestFirstResponse(t *testing.T) {
    t.Log("Before:", runtime.NumGoroutine())
    t.Log(FirstResponse())
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())

}

3、所有任务完成(channel CSP方式)

package util_all_done

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

func runTask(id int) string {
    time.Sleep(10 * time.Millisecond)
    return fmt.Sprintf("The result is from %d", id)
}

func FirstResponse() string {
    numOfRunner := 10
    ch := make(chan string, numOfRunner)
    for i := 0; i < numOfRunner; i++ {
        go func(i int) {
            ret := runTask(i)
            ch <- ret
        }(i)
    }
    return <-ch
}

func AllResponse() string {
    numOfRunner := 10
    ch := make(chan string, numOfRunner)
    for i := 0; i < numOfRunner; i++ {
        go func(i int) {
            ret := runTask(i)
            ch <- ret
        }(i)
    }
    finalRet := ""
    for j := 0; j < numOfRunner; j++ {
        finalRet += <-ch + "\n"
    }
    return finalRet
}

func TestFirstResponse(t *testing.T) {
    t.Log("Before:", runtime.NumGoroutine())
    t.Log(AllResponse())
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())

}

4、对象池

package object_pool

import (
    "errors"
    "time"
)

type ReusableObj struct {
}

type ObjPool struct {
    bufChan chan *ReusableObj //用于缓冲可重用对象
}

func NewObjPool(numOfObj int) *ObjPool {
    objPool := ObjPool{}
    objPool.bufChan = make(chan *ReusableObj, numOfObj)
    for i := 0; i < numOfObj; i++ {
        objPool.bufChan <- &ReusableObj{}
    }
    return &objPool
}

func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
    select {
    case ret := <-p.bufChan:
        return ret, nil
    case <-time.After(timeout): //超时控制
        return nil, errors.New("time out")
    }

}

func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
    select {
    case p.bufChan <- obj:
        return nil
    default:
        return errors.New("overflow")
    }
}

package object_pool

import (
    "fmt"
    "testing"
    "time"
)

func TestObjPool(t *testing.T) {
    pool := NewObjPool(10)
    // if err := pool.ReleaseObj(&ReusableObj{}); err != nil { //尝试放置超出池大小的对象
    //  t.Error(err)
    // }
    for i := 0; i < 11; i++ {
        if v, err := pool.GetObj(time.Second * 1); err != nil {
            t.Error(err)
        } else {
            fmt.Printf("%T\n", v)
            if err := pool.ReleaseObj(v); err != nil {
                t.Error(err)
            }
        }

    }

    fmt.Println("Done")
}

5、sync.pool 对象缓存


image.png

image.png

image.png

image.png
package object_pool

import (
    "fmt"
    "runtime"
    "sync"
    "testing"
)

func TestSyncPool(t *testing.T) {
    pool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("Create a new object.")
            return 100
        },
    }

    v := pool.Get().(int)
    fmt.Println(v)
    pool.Put(3)
    runtime.GC() //GC 会清除sync.pool中缓存的对象
    v1, _ := pool.Get().(int)
    fmt.Println(v1)
}

func TestSyncPoolInMultiGroutine(t *testing.T) {
    pool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("Create a new object.")
            return 10
        },
    }

    pool.Put(100)
    pool.Put(100)
    pool.Put(100)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            fmt.Println(pool.Get())
            wg.Done()
        }(i)
    }
    wg.Wait()
}

image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。