sync包使用官方文档:http://devdocs.io/go/sync/index#Map
Go中sync包包含对低级别内存访问同步最有用的并发原语。
1. sync.Cond
package main
import (
"fmt"
"sync"
"time"
)
/*
* sync.Cond
* 条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在某一个条件发生时,通知阻塞在该条件上的goroutine(线程)
*
* 条件变量+互斥量(锁)
* a. 互斥量为共享数据的访问提供互斥支持
* b. 条件变量就数据状态的变化向相关线程发出通知(goroutine)
*
* sync.Cond提供的三个相关方法:
* 1. wait: 阻塞当前线程(goroutine),直到收到该条件变量发来的通知
* 2. signal: 单发通知,让该条件变量向至少一个正在等待它的goroutine发送通知,表示共享数据的状态已经改变
* 3. broadcast: 广播通知,让条件变量给正在等待它的所有goroutine发送通知,告知共享数据的状态已经改变
*
* sync.Cond struct
* // Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
* // which must be held when changing the condition and
* // when calling the Wait method.
* //
* // A Cond must not be copied after first use.
* type Cond struct {
* noCopy noCopy
*
* // L is held while observing or changing the condition
* L Locker
*
* notify notifyList
* checker copyChecker
* }
*
* 通过sync.Cond的定义,我们需要注意以下几点:
* 1. Cond内部存在一个Locker(Mutex或RWMutex),在Cond状态条件改变或调用Wait方法时,必须被锁住。即Locker是对
* Wait, Signal,Broadcast进行保护,确保在发送信号的时候不会有新的goroutine进入wait而阻塞。
* 2. Cond变量在第一次创建之后不应该被copy。
* 3. 在调用Signal,Broadcast函数之前,应该确保目标进入wait阻塞状态。
*
*/
func main() {
var wg sync.WaitGroup
cond := sync.NewCond(new(sync.Mutex))
for i := 0; i < 3; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// wait前先上锁
cond.L.Lock()
fmt.Printf("goroutine_%d start wait\n", i)
cond.Wait()
fmt.Printf("goroutine_%d end wait\n", i)
cond.L.Unlock()
}(i)
}
/*
// 1. 调用signal方法,一次唤醒一个阻塞等待的goroutine
for i := 0; i < 3; i++ {
time.Sleep(1 * time.Second)
//cond.L.Lock()
cond.Signal() // 1秒唤醒一个goroutine
//cond.L.Unlock()
}
*/
// 2. 调用broadcast方法,一次性唤醒所有阻塞的goroutine
time.Sleep(1 * time.Second)
//cond.L.Lock()
cond.Broadcast()
//cond.L.Unlock()
wg.Wait()
fmt.Println("end test")
}
利用sync.Cond条件变量进行简单阻塞的示例:
c := sync.NewCond(&sync.Mutex{}) // 1
c.L.Lock() // 2
for conditionTrue() == false {
c.Wait() // 3:该方法会使当前goroutine被阻塞(阻塞式)
}
c.L.Unlock() // 4
注意点:
- NewCond函数传入的参数实现了sync.Locker类型。Cond类型允许以并行安全的方式与其他goroutines协调。
- 在执行Wait操作前,必须进行锁定。因为,Wait函数的调用会执行解锁并暂停该goroutine。
- 进入暂停状态的goroutine会一直被阻塞住,直到接收到通知。
- 当前goroutine被唤醒后,必须执行解锁操作,因为在Wait函数退出前,会执行加锁操作。
2. sync.Map
package main
import (
"fmt"
"sync"
)
/*
* Go1.9版本之前,其自带的map并不是并发安全的,如果要对其实现安全地并发访问,则需要对其进行加锁操作,即将map和sync.Mutex或sync.RWMutex
* 打包成一个struct来使用。
* Go1.9推出了sync.Map,其原生支持并发安全,相比上述struct的map具备更好的性能。sync.Map较内置map的用法不太一样,其内部封装了更为复杂的
* 数据结构。
*
* var smp sync.Map
* 其主要提供了以下几种方法:
* 1. Store(key, value interface{}) : 设置一个键值对
* 2. LoadOrStore(key, value interface{})(actual interface{}, loaded bool): 如果key存在,则返回其对应的值;如果key不存在,则设置
相应的值。如果是读取,则loaded返回true;如果是设置,则loaded返回false。
* 3. Load(key interface{})(value interface{}, ok bool) : 读取map中key对应的value。如果存在,ok返回true;否则,返回false。
* 4. Delete(key interface{}) : 删除对应的key
* 5. Range(f func(key, value interface{})bool) : 依次遍历map中的k-v对,如果函数f返回false,则停止迭代。
* Tips: Range does not necessarily correspond to any consistent snapshot of the Map's contents:
no key will be visited more than once, but if the value for any key is stored or deleted concurrently,
Range may reflect any mapping for that key from any point during the Range call.
* 在Range函数调用的过程,遍历key的顺序同样是随机的,且range能否保证遍历key的唯一性(不存在重复)。如果,range过程中存在其它的goroutine并发
* store, delete key,则range遍历的结果是反映当时时刻map的存储状态的。
*/
func main() {
var smp sync.Map
// 添加kv对
smp.Store("1", 1)
smp.Store("2", 2)
smp.Store("3", 3)
// 遍历
fmt.Println("11111111111111")
smp.Range(func(key, value interface{}) bool {
fmt.Printf("%v:%v\n", key, value)
return true
})
// 删除
smp.Delete("3")
// 遍历
fmt.Println("222222222222222")
smp.Range(func(key, value interface{}) bool {
fmt.Printf("%v:%v\n", key, value)
return true
})
v1, ok := smp.Load("1")
if ok {
fmt.Println(v1)
}
}
3. sync.WaitGroup
sync.WaitGroup主要用于等待一组并发操作的完成。例如,使用sync.WaitGroup等待一组goroutine执行完对应的操作。
var wg sync.WaitGroup
wg.Add(1) //1
go func() {
defer wg.Done() //2
fmt.Println("1st goroutine sleeping...")
time.Sleep(1)
}()
wg.Add(1) //1
go func() {
defer wg.Done() //2
fmt.Println("2nd goroutine sleeping...")
time.Sleep(2)
}()
wg.Wait() //3
fmt.Println("All goroutines complete.")
可以将sync.WaitGroup视作一个安全的并发计数器:调用Add操作增加计数,调用Done操作减少计数,调用Wait操作会阻塞等待,直到计数器变为0。
另外,一种使用sync.WaitGroup的方法:
// 传指针的方式,函数内部使用同一sync.WaitGroup
hello := func(wg *sync.WaitGroup, id int) {
defer wg.Done()
fmt.Printf("Hello from %v!\n", id)
}
const numGreeters = 5
var wg sync.WaitGroup
wg.Add(numGreeters)
for i := 0; i < numGreeters; i++ {
go hello(&wg, i+1) // 传指针参数
}
wg.Wait()
4. sync.Mutex和sync.RWMutex
被锁定部分是程序的性能瓶颈,进入和退出锁的成本有点高,因此应该尽量减少锁定涉及的范围。sync.RWMutex相比sync.Mutex具有更高的性能,因此在逻辑正确的情况下应该尽量使用sync.RWMutex而不是sync.Mutex。
5. sync.Once 全局仅执行一次的操作
顾名思义,sync.Once确保了即使在不同的goroutine上,调用Do传入的函数只执行一次。一般应用于初始化的场景,sync.Once确保该初始化操作只被执行一次。
注意
sync.Once只计算Do被调用的次数,而不是调用传入Do的唯一函数的次数。例如,下面函数count的输出值为1而不是0。
var count int
increment := func() { count++ }
decrement := func() { count-- }
var once sync.Once
once.Do(increment) // increment函数将被调用
once.Do(decrement) // decrement函数不会被调用
fmt.Printf("Count: %d\n", count)
6. sync.Pool
sync.Pool是对象池模式的并发安全实现。池模式是一种创建和提供固定数量可用对象的方式。它通常用于约束创建资源昂贵的事务(例如,数据库连接)。Go中sync.Pool可以被多个例程安全地使用。
Get/Put方法
Pool的主要接口是它的Get方法。 被调用时,Get将首先检查池中是否有可用实例返回给调用者,如果没有,则创建一个新成员变量。使用完成后,调用者调用Put将正在使用的实例放回池中供其他进程使用。 这里有一个简单的例子来演示:
myPool := &sync.Pool{
New: func() interface{} {
fmt.Println("Creating new instance.")
return struct{}{}
},
}
instance := myPool.Get() //1: 从资源池拿取对象,否则新建一个对象
myPool.Put(instance) //2: 将使用完的对象放回资源池
sync.Pool另一个用处可以预热分配对象的缓存,用于必须尽快进行的操作。在这种情况下,我们通过预先加载获取另一个对象的引用来减少消费者的时间消耗。在编写高吞吐量的网络服务器时,连接池是常见的优化技术。相关示例参考:https://www.kancloud.cn/mutouzhang/go/596830。
使用sync.Pool时应该注意的要点:
- 实例化sync.Pool时,给它一个新元素,该元素应该是线程安全的。
- 当你从Get获得一个实例时,不要假设你接收到的对象状态。
- 当你从池中取得实例时,请务必不要忘记调用Put。否则池的优越性就体现不出来了。这通常用defer来执行延迟操作。(defer Pool.Put())
- 池中的元素必须大致上是均匀的。