goroutine和chan配合是golang的核心技术, 必须深入研究一下使用方法.
无缓冲chan
进和出都会阻塞.
例1:
func main() {
ch := make(chan error)
fmt.Println("main1")
go func() {
fmt.Println("go1")
ch <- nil //阻塞
fmt.Println("go2") //永远不会执行
}()
fmt.Println("main2")
time.Sleep(1 * time.Second)
fmt.Println("main等待1秒")
fmt.Println("main3")
}
输出:
main1
main2
go1
main等待1秒
main3
例2:
func main() {
ch := make(chan error)
fmt.Println("main1")
go func() {
fmt.Println("go1")
fmt.Println("go等待1秒")
time.Sleep(1 * time.Second)
ch <- nil //阻塞等待
fmt.Println("go2")
}()
fmt.Println("main2")
fmt.Println("main等待1秒")
time.Sleep(1 * time.Second)
<-ch //阻塞等待
fmt.Println("main3")
time.Sleep(2 * time.Second)
}
输出:
main1
main2
main等待1秒
go1
go等待1秒
main3
go2
有缓冲chan
先进先出队列, 出会一直阻塞到有数据, 进时当队列未满不会阻塞, 队列已满则阻塞.
select
- select 先遍历所有case, 所有channel表达式都会被求值、所有被发送的表达式都会被求值。求值顺序:自上而下、从左到右.
- 当case没有阻塞则随机执行一个没有阻塞的case就退出select
- 当所有case阻塞时, 则一直阻塞直到某个case解除阻塞, 但是如果有default则直接执行default
- 也就是一个select最多只执行一次case里的代码
- 要一直检测case则必须外层使用for循环包起来
close(chan)
要点
- close没有make的chan会引起panic
- close以后不能再写入,写入会出现panic
- close之后可以读取,无缓冲chan读取返回0值和false,有缓冲chan可以继续读取,返回的都是chan中数据和true,直到读取完所有队列中的数据。
- 重复close会引起panic
- 只读chan不能close
- 不close chan也是可以的,当没有被引用时系统会自动垃圾回收。
总结
- 不要在receiver中close(chan),而要在sender中close(chan)
- 多个sender时,要使用辅助chan来确保不会多次close(chan)和close后又再次写入。
此时最好不要主动close chan而应该在无引用时由系统自动回收,要主动close除非确保所有写入都停止了。
多个sender写个简单例子测试一下:
package main
import (
"fmt"
"sync"
"time"
)
type TestChan struct {
msgChan chan int
stopChan chan struct{}
}
func createTestChan() *TestChan {
t := &TestChan{}
t.msgChan = make(chan int)
t.stopChan = make(chan struct{})
return t
}
func (t *TestChan) addMsg(msg int) bool {
//这个是为了避免stopChan已经close但是下面的第二个select多次随机执行t.msgChan <- msg
select {
case <-t.stopChan:
return false
default:
}
select {
case <-t.stopChan:
return false
case t.msgChan <- msg: //当stopChan close时最多写入一次(随机),如果不能写则阻塞,但是<-t.stopChan会被激活于是return
}
return true
//我们不主动关闭t.msgChan,在无引用时会自动释放
}
func (t *TestChan) processMsg() {
for {
select {
case <-t.stopChan:
return
case msg := <-t.msgChan:
fmt.Println(msg)
}
}
}
// processMsg在这种情况下性能应该更好,因为只需要select一个chan状态,
// 但是这种用法需要自己手动close(t.msgChan)才能退出循环,而且要保证所有的addMsg循环都退出了
// func (t *TestChan) processMsg() {
// for {
// select {
// case msg, ok := <-t.msgChan:
// if !ok {
// fmt.Println("processMsg !ok")
// return
// }
// fmt.Println(msg)
// }
// }
// }
func testChan(wg *sync.WaitGroup) {
t := createTestChan()
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(msg int) {
for {
if !t.addMsg(msg) {
fmt.Println("addMsg stop", msg)
wg.Done()
return
}
time.Sleep(time.Second * 1)
}
}(i)
}
go func() {
t.processMsg()
fmt.Println("stop processMsg")
}()
go func() {
time.Sleep(time.Second * 3)
fmt.Println("close stopchan")
close(t.stopChan)
}()
wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
testChan(&wg)
wg.Wait()
fmt.Println("exit")
}
只读和只写chan
read_only := make (<-chan int)
write_only := make (chan<- int)
只读或者只写一般用在参数传递中。
chan和mutex
mutex的性能比chan高不少。
比如开源的消息队列gnatsd(NATS)就很少用chan而使用mutex,其性能是非常高的,比另一个消息队列nsq高很多。