大纲
- 什么是channel
- 什么是mutex
- 使用channel、mutex 实现安全的并发减操作
- 使用channel 实现批量任务处理
- channel 的本质
- 使用 channel or 并发原语的时机
- 参考资料
1 什么是 channel
通过学习,总结 channel 的定义 及特性 :
1. channel 是控制并发的高级语法 or 数据结构,是一种更高层次的并发控制模型(其实内部封装了共享内存 + 互斥锁)
2. channel 是线程安全的,channel 内的信息同一时刻只能被1个 goroutine 获取(消费), send() 不会发生多 goroutine 同时操作覆盖的情况
3. channel 底层用循环链表存储消息,具有 FIFO 的特性
4. channel 能影响 goroutine 之间的阻塞与唤醒,这是在 channel.go 中实现的,
不需要显式调用 wait or notify 实现。
5. channel 适合在 消息的同步传递,数据流转场景使用;
6. channel 用来 解耦 两端的 goroutine
channel 在 go 中 的编程语法:
blockCh := make(chan int)
bufferCh := make(chan int , [cap])
sendCh := make(chan<- int)
reveiveCh := make(<-chan int)
2 什么是mutex
1. mutex 是比 channel 更低层次的并发控制原语,在 多 goroutine 并发修改临界区(共享变量)资源时,提供一种安全保障机制;
2. sync 包是 go 中控制并发的配角,但并不代表它不重要,在该使用的时候还得使用;
mutex 在go 中语法:
var mtx sync.Mutex
var shareVal int
go func() {
mtx.Lock()
shareVal --
mtx.Unlock()
}()
3. 使用channel、mutex 实现安全的并发减操作
Do not communicate by sharing memory; instead, share memory by communicating.
如何理解这句话?
假设现在有个需求: n 个协程对一共享变量 shareVal 进行并发减操作,实现快速、安全的并发减操作 (shareVal = shareVal - n);
3.1 使用 channel 实现
分析:
-
安全性
: 不通协程需要对 共享变量 shareVal 做 -1 操作, 且操作是安全的,即 当前协程在 做 -1操作时, 共享变量 shareVal 只对其可见,其余协程均处于block 状态。 -
可见性
: 某协程操作完后,下一个协程操作时,拿到的共享变量必须是前一个 -1 后的值。 -
数据流动性
: 通过将 shareVal 放在 channel 传递,实现 shareVal 在不同 goroutine 之间的共享。
func main() {
ch := make(chan int, 1)
var shareVal = 10000
var i = 0
ch <- shareVal
var wg sync.WaitGroup
for i < 10000 {
wg.Add(1)
go func() {
defer wg.Done()
val := <-ch
val--
ch <- val
}()
i++
}
wg.Wait()
fmt.Printf("shareVal | %d", <-ch)
}
3.2 使用 mutex 实现 - 通过共享内存来实现通信
func main() {
var i = 0
var shareVal = 10000
var wg sync.WaitGroup
var mtx sync.Mutex
for i < 10000 {
wg.Add(1)
go func() {
defer wg.Done()
mtx.Lock()
shareVal--
mtx.Unlock()
}()
i++
}
wg.Wait()
fmt.Printf("shareVal | %d", shareVal)
}
4 使用channel 实现批量任务处理
假设现在有一个场景: 客户端发送大量长事务请求操作数据库(IO 密集型任务,短时间内不能返回数据),服务器核心数为8, 如何充分利用服务器资源(核心)实现批量处理的需求?
这种问题使用信号量机制可以解决:
func getData(idx int) {
//time.Sleep(10 * time.Second)
fmt.Printf("handler task_%d over\n", idx)
}
func main() {
// semaphore 机制实现 任务批量处理
var processNum = 8
// 初始化信号量池 processNum
sephCh := make(chan int, processNum)
for i := 0; i < 8; i++ {
sephCh <- 1
}
var wg sync.WaitGroup
// 模拟多任务
for j := 0; j < 10000; j++ {
wg.Add(1)
// 开启子协程处理任务之前先从 信号量池 中拿到 许可证
<-sephCh
fmt.Printf("run task %d \n", j)
go func(idx int) {
defer wg.Done()
getData(idx)
// 处理完后归还许可证,供 blocking-> ready 状态的协程 使用
sephCh <- 1
}(j)
}
wg.Wait()
close(sephCh)
for s := range sephCh {
fmt.Printf("%d\n", s)
}
fmt.Printf("all task over!")
}
4 channel 的本质
动图描述
4.1 channel - 底层数据结构
详细解释:
- buf 是有缓冲的channel所特有的结构,用来存储缓存数据,是循环链表;
- sendx和recvx用于记录buf这个循环链表中的index;
- lock是互斥锁,控制 channel 的 接收与取出操作;
- recvq和sendq分别是接收( <-channel )或者发送(channel <- val) 的 goroutine 抽象出来的结构体(sudog),是双向链表;
4.2 send or receive 底层操作
以如下缓冲channel 为例:
ch := make(chan int, 3)
4.2.1 send 操作
执行 send 操作, 每一步的操作的细节可以细化为:
第一,加锁
第二,把数据从goroutine中copy到“队列”中(或者从队列中copy到goroutine中)。
第三,释放锁
每一步的操作总结为动态图为:(发送过程)
4.2.2 receive 操作
4.3 buffer channel 满与空 情况下的 goroutine send or receive 过程
4.3.1 send val to full channel
当前场景,channel 已满,groutine 还在并发往 channel 中塞数据:
ch := make(chan int, 3)
ch <- 1
ch <- 1
ch <- 1
这个时候G1正在正常运行,当再次进行send操作(ch<-1)的时候,会主动调用Go的调度器,让G1等待,并从让出M,让其他G去使用, 如下图所示:
即 的G1 被强制 gopark,从
running
状态变为 block
状态;
同时G1也会被抽象成含有G1指针和send元素的sudog结构体保存到hchan的sendq中等待被唤醒。
问题来了, G1何时被唤醒?这个时候G2隆重登场。
G2执行了recv操作p := <-ch,于是会发生以下的操作:
4.3.2 receive val from empty channel
当前场景:
ch := make(chan int, 3)
<-ch
<-ch
<-ch
// channel 已空, 继续执行 receive 操作
<-ch
抽象场景如下图所示:
这个时候G2会主动调用Go的调度器,让G2等待,并从让出M,让其他G去使用。
G2还会被抽象成含有G2指针和recv空元素的sudog结构体保存到hchan的recvq中等待被唤醒,如下图所示:
此时恰好有个goroutine G1开始向channel中推送数据 ch <- 1, 比较特殊的情况(不用加锁)
接下来,G2 被 Go schedule 唤醒,进入P 管理的 goroutine 就绪队列,等待运行
5 使用channel 的时机 or 使用 mutex 的时机
- channel 关注数据流动,如果任务处理模型中存在数据流动,使用channel 解决;
- 数据的控制权需要在多个gorutine 中传递, 使用 channel 处理;