1、只运行一次
package once_test
import (
"fmt"
"sync"
"testing"
"unsafe"
)
type Singleton struct {
data string
}
var singleInstance *Singleton
var once sync.Once
func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}
func TestGetSingletonObj(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
obj := GetSingletonObj()
fmt.Printf("%X\n", unsafe.Pointer(obj))
wg.Done()
}()
}
wg.Wait()
}
2、仅需任意任务完成
package concurrency
import (
"fmt"
"runtime"
"testing"
"time"
)
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func FirstResponse() string {
numOfRunner := 10
// channel增加buff防止协程泄漏
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
return <-ch
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(FirstResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}
3、所有任务完成(channel CSP方式)
package util_all_done
import (
"fmt"
"runtime"
"testing"
"time"
)
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func FirstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
return <-ch
}
func AllResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
finalRet := ""
for j := 0; j < numOfRunner; j++ {
finalRet += <-ch + "\n"
}
return finalRet
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(AllResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}
4、对象池
package object_pool
import (
"errors"
"time"
)
type ReusableObj struct {
}
type ObjPool struct {
bufChan chan *ReusableObj //用于缓冲可重用对象
}
func NewObjPool(numOfObj int) *ObjPool {
objPool := ObjPool{}
objPool.bufChan = make(chan *ReusableObj, numOfObj)
for i := 0; i < numOfObj; i++ {
objPool.bufChan <- &ReusableObj{}
}
return &objPool
}
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): //超时控制
return nil, errors.New("time out")
}
}
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")
}
}
package object_pool
import (
"fmt"
"testing"
"time"
)
func TestObjPool(t *testing.T) {
pool := NewObjPool(10)
// if err := pool.ReleaseObj(&ReusableObj{}); err != nil { //尝试放置超出池大小的对象
// t.Error(err)
// }
for i := 0; i < 11; i++ {
if v, err := pool.GetObj(time.Second * 1); err != nil {
t.Error(err)
} else {
fmt.Printf("%T\n", v)
if err := pool.ReleaseObj(v); err != nil {
t.Error(err)
}
}
}
fmt.Println("Done")
}
5、sync.pool 对象缓存
package object_pool
import (
"fmt"
"runtime"
"sync"
"testing"
)
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("Create a new object.")
return 100
},
}
v := pool.Get().(int)
fmt.Println(v)
pool.Put(3)
runtime.GC() //GC 会清除sync.pool中缓存的对象
v1, _ := pool.Get().(int)
fmt.Println(v1)
}
func TestSyncPoolInMultiGroutine(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("Create a new object.")
return 10
},
}
pool.Put(100)
pool.Put(100)
pool.Put(100)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
fmt.Println(pool.Get())
wg.Done()
}(i)
}
wg.Wait()
}