协程机制
Golang 线程和协程的区别
备注:需要区分进程、线程(内核级线程)、协程(用户级线程)三个概念。
进程、线程 和 协程 之间概念的区别
对于进程、线程,都是有内核进行调度,有CPU时间片的概念,进行抢占式调度(有多种调度算法)
对于协程(用户级线程),这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的,因为是由用户程序自己控制,那么就很难像抢占式调度那样做到强制的 CPU 控制权切换到其他进程/线程,通常只能进行协作式调度,需要协程自己主动把控制权转让出去之后,其他协程才能被执行到。
goroutine 和协程区别
本质上,goroutine 就是协程。 不同的是,Golang 在 runtime、系统调用等多方面对 goroutine 调度进行了封装和处理,当遇到长时间执行或者进行系统调用时,会主动把当前 goroutine 的CPU (P) 转让出去,让其他 goroutine 能被调度并执行,也就是 Golang 从语言层面支持了协程。Golang 的一大特色就是从语言层面原生支持协程,在函数或者方法前面加 go关键字就可创建一个协程。
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println(i)
}(i)
}
time.Sleep(time.Millisecond * 50)
}
这里面额外在内部func里传了i,注意这里如果不传i,直接用的话是不可行的,因为使用go创建协程,但是需要注意的是:协程函数的par作为参数是外部i的数据拷贝。
其他方面的比较
- 内存消耗方面
- 每个 goroutine (协程) 默认占用内存远比 Java 、C 的线程少。
goroutine:2KB
线程:8MB
- 线程和 goroutine 切换调度开销方面
- 线程/goroutine 切换开销方面,goroutine 远比线程小
- 线程:涉及模式切换(从用户态切换到内核态)、16个寄存器、PC、SP...等寄存器的刷新等。
- goroutine:只有三个寄存器的值修改 - PC / SP / DX.
共享内存并发机制
先来看这样一段代码
func TestCounter(t *testing.T) {
counter := 0
for i := 0; i < 5000; i++ {
go func() {
counter++
}()
}
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter)
}
正常来说counter应该最后的结果是5000,但是实际结果每次都不相同且都小于5000。其中存在两个问题,第一个协程不安全,需要对协程加锁,因为每个协程都在修改counter;第二个因为是异步,主协程并不会等待所有子协程的结束,因此不能保证打印结果的时候,counter加到5000。
那如何保证最后的结果输出是5000呢?可以做以下两个操作
Lock(锁)
很明显保证线程安全就需要锁,看一下Go的锁如何使用:
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
}()
}
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter)
}
WaitGroup
多个协程等待全部执行完毕,可以用WaitGroup:
func TestCounterWaitGroup(t *testing.T) {
var mut sync.Mutex
var wg sync.WaitGroup
counter := 0
for i := 0; i < 5000; i++ {
wg.Add(1)
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
wg.Done()
}()
}
wg.Wait()
t.Logf("counter = %d", counter)
}
CSP并发机制
CSP vs. Actor
传统的并发模型主要分为 Actor 模型和CSP模型,CSP模型全称为 communicating sequential processes,CSP 模型由并发执行实体(进程,线程或协程),和消息通道组成,实体之间通过消息通道发送消息进行通信。
- 和Actor的直接通讯不同,CSP模式则是通过channel进⾏通讯的,channel作为中间者,更松耦合⼀些。
- Go中channel是有容量限制并且独⽴于处理Groutine,⽽如Erlang,Actor模式 中的mailbox容量是⽆限的,接收进程也总是被动地处理消息。
channel分为两种:unbuffered channel和buffered channel
如图unbuffered channel中,通信双方同时都在Channel上,否则协程会阻塞,直到双方完成通信。
如图buffered channel中,我们给channel设置一个容量,只要容量未满,发送消息者就可以往里面放消息,相反对于接收方,只要里面有消息,他就可以取。放容量满了以后,发送方只能等接收方取走一条消息后才可以发送,接收方也只能等里面有消息后才会去取。
channel 的基本操作
先讲一下channel的基本操作:
//创建channel
ch := make(chan int)
// 写入channel
ch <- x
// 从channel读取
x <- ch
// another way to read
x = <- ch
//关闭channel
close(ch)
channel 一定要初始化后才能进行读写操作,否则会永久阻塞。
func service() string {
time.Sleep(time.Millisecond * 50)
return "Done"
}
func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Millisecond * 100)
fmt.Println("Task is done.")
}
func AsyncService() chan string {
retCh := make(chan string)
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret
fmt.Println("service exited.")
}()
return retCh
}
func TestAsynService(t *testing.T) {
fmt.Println(service())
otherTask()
retCh := AsyncService()
otherTask()
fmt.Println(<-retCh)
time.Sleep(time.Second * 1)
}
=======打印结果=======
Done
working on something else
Task is done.
working on something else
returned result.
Task is done.
Done
service exited.
TestAsynService方法中前两行是串行的,大家可见打印结果即使有延时,也是按顺序打印。AsyncService的方法返回了一个channel,此channel是unbuffered channel,因此会协程阻塞,因此打印“service exited”会一直等到打印“Done”之后。
func AsyncService() chan string {
retCh := make(chan string, 1)
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret
fmt.Println("service exited.")
}()
return retCh
}
=======打印结果=======
Done
working on something else
Task is done.
working on something else
returned result.
service exited.
Task is done.
Done
我把AsyncService里面初始化的channel变为buffered channel,可见打印“service exited.”就不会被阻塞,打印在了“Done”前。
channel的关闭
有关 channel 的关闭,你需要注意以下事项:
- 关闭一个未初始化(nil) 的 channel 会产生 panic
- 重复关闭同一个 channel 会产生 panic
- 向一个已关闭的 channel 中发送消息会产生 panic
- 从已关闭的 channel 读取消息不会产生 panic,且能读出 channel 中还未被读取的消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息永远不会阻塞。
v, ok <-ch; ok 为 bool 值,true 表示正常接受,false 表示通道关闭 - 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
wg.Done()
}()
}
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func() {
for {
if data, ok := <-ch; ok {
fmt.Println(data)
} else {
break
}
}
wg.Done()
}()
}
func TestCloseChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataReceiver(ch, &wg)
wg.Wait()
}
多路选择和超时
select是go语言中常用的一个关键字,官方解释:select语句用来选择哪个case中的发送或接收操作可以被立即执行。它类似于switch语句,但是它的case涉及到channel有关的I/O操作。
多路选择
对于select的理解有以下几点:
- 每个case都必须是一个通信
- 所有channel表达式都会被求值
- 所有被发送的表达式都会被求值
- 如果任意某个通信可以进行,它就执行;其他被忽略。
- 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。否则:
- 如果有default子句,则执行该语句。
- 如果没有default子句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
//channel
func AsyncService() chan string {
retCh := make(chan string)
go func() {
time.Sleep(time.Millisecond * 50)
retCh <- "Done"
}()
return retCh
}
//测试select含有default
func TestSelectDefault(t *testing.T) {
select {
case ret := <- AsyncService():
t.Logf("result: %s", ret)
default:
t.Error("No one returned")
}
}
---------------------------------------------------------
打印结果:No one returned
---------------------------------------------------------
//测试select不含有default
func TestSelect(t *testing.T) {
select {
case ret := <- AsyncService():
t.Logf("result: %s", ret)
}
}
---------------------------------------------------------
打印结果:result: Done
---------------------------------------------------------
打印结果发现有default子句,执行default;如果没有default子句,select将阻塞,直到channel返回值。
超时
select可以设置超时,具体代码如下
func AsyncService() chan string {
retCh := make(chan string)
go func() {
time.Sleep(time.Millisecond * 500)
retCh <- "Done"
}()
return retCh
}
func TestTimeOut(t *testing.T) {
select {
case ret := <- AsyncService():
t.Logf("result: %s", ret)
case <- time.After(time.Millisecond * 100):
t.Error("time out")
}
}
---------------------------------------------------------
打印结果:time out
---------------------------------------------------------
因为channel延迟了500毫秒,因此超时了,所以走到了超时的case中;如果把上面延迟500毫秒改成50毫秒,则正常走到了打印channel的case中。
另外,select是可以使用break,case中使用了break后,走到此case中执行到break后就不执行break之后的代码。
Context 与任务取消
如何取消中间任务handle(Req1),并且同时取消子任务,这时可以用到Context。
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ {
go func(i int, ctx context.Context) {
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
}
---------------------------------------------------------
1 Cancelled
4 Cancelled
3 Cancelled
2 Cancelled
0 Cancelled
---------------------------------------------------------
- 根 Context:通过 context.Background () 创建
- 子 Context:context.WithCancel(parentContext) 创建
- ctx, cancel := context.WithCancel(context.Background())
- 当前 Context 被取消时,基于他的⼦ context 都会被取消
- 接收取消通知 <-ctx.Done()