多线程同步问题
- 互斥锁
- 互斥锁的本质是当一个goroutine访问的时候, 其它goroutine都不能访问
- 这样就能实现资源同步, 但是在避免资源竞争的同时也降低了程序的并发性能. 程序由原来的并发执行变成了串行
- 案例:
- 有一个打印函数, 用于逐个打印字符串中的字符, 有两个人都开启了goroutine去打印
- 如果没有添加互斥锁, 那么两个人都有机会输出自己的内容
- 如果添加了互斥锁, 那么会先输出某一个的, 输出完毕之后再输出另外一个人的
package main
import (
"fmt"
"sync"
"time"
)
// 创建一把互斥锁
var lock sync.Mutex
func printer(str string) {
// 让先来的人拿到锁, 把当前函数锁住, 其它人都无法执行
// 上厕所关门
lock.Lock()
for _, v := range str{
fmt.Printf("%c", v)
time.Sleep(time.Millisecond * 500)
}
// 先来的人执行完毕之后, 把锁释放掉, 让其它人可以继续使用当前函数
// 上厕所开门
lock.Unlock()
}
func person1() {
printer("hello")
}
func person2() {
printer("world")
}
func main() {
go person1()
go person2()
for{
;
}
}
生产者消费者问题
- 所谓的生产者消费者模型就是
- 某个模块(函数)负责生产数据, 这些数据由另一个模块来负责处理
- 一般生产者消费者模型包含三个部分"生产者"、"缓冲区"、"消费者"
- 为什么生产者消费者模型要含三个部分? 直接生产和消费不行么?
- 一个案例说明一切
- 生产者好比现实生活中的某个人
- 缓冲区好比现实生活中的邮箱
- 消费者好比现实生活中的邮递员
- 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员, 那么如果将来过去的邮递员离职了, 你想邮寄信件必须想办法结识新的邮递员(消费者发生变化, 会直接影响生产者, 耦合性太强)
- 如果在生产者和消费者之间添加一个缓冲区, 那么就好比有了邮箱, 以后邮寄信件不是找邮递员, 只需把信件投递到邮箱中即可, 写信的人不需要关心邮递员是谁(解耦)
- 如果只有生产者和消费者, 那么每个人邮寄信件都需要直接找邮递员(1对1关系), 如果有10个人要邮寄信件, 那么邮递员只能依次找到每个人, 然后才能取件(效率低下)
- 如果在生产者和消费者之间添加一个缓冲区, 那么所有的人只需要将信件投递到邮箱即可, 邮递员不用关心有多少人要邮寄信件, 也不用依次取件, 只需要找到邮箱从邮箱中统一取件即可(效率提高)
- 如果只有生产者和消费者, 那么如果邮寄信件太多邮递员无法一次拿走, 这个时候非常难办
- 如果在生产者和消费者之间添加一个缓冲区, 那么如果信件太多可以先拿走一部分, 剩下的继续放到邮箱中下次再拿
... ...
生产者和消费者资源竞争问题
- 例如生产比较慢, 而消费比较快, 就会导致消费者消费到错误数据
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 创建一把互斥锁
var lock = sync.Mutex{}
// 定义缓冲区
var sce []int = make([]int, 10)
// 定义生产者
func producer(){
// 加锁, 注意是lock就是我们的锁, 全局公用一把锁
lock.Lock()
rand.Seed(time.Now().UnixNano())
for i:=0;i<10;i++{
num := rand.Intn(100)
sce[i] = num
fmt.Println("生产者生产了: ", num)
time.Sleep(time.Millisecond * 500)
}
// 解锁
lock.Unlock()
}
// 定义消费者
func consumer() {
// 加锁, 注意和生产者中用的是同一把锁
// 如果生产者中已加过了, 则阻塞直到解锁后再重新加锁
lock.Lock()
for i:=0;i<10;i++{
num := sce[i]
fmt.Println("---消费者消费了", num)
}
lock.Unlock()
}
func main() {
go producer()
go consumer()
for{
;
}
}
- 思考: 那如果是一对多, 或者多对多的关系, 上述代码有问题么?
管道(Channel)
- 上述实现并发的代码中为了保持主线程不挂掉, 我们都会在最后写上一个死循环或者写上一个定时器来实现等待goroutine执行完毕
- 上述实现并发的代码中为了解决生产者消费者资源同步问题, 我们利用加锁来解决, 但是这仅仅是一对一的情况, 如果是一对多或者多对多, 上述代码还是会出现问题
- 综上所述, 企业开发中需要一种更牛X的技术来解决上述问题, 那就是
管道(Channel)
- Channel的本质是一个队列
- Channel是线程安全的, 也就是自带锁定功能
- Channel声明和初始化
- 声明:
var 变量名chan 数据类型
- 初始化:
mych := make(chan 数据类型, 容量)
- Channel和切片还有字典一样, 必须make之后才能使用
- Channel和切片还有字典一样, 是引用类型
- 声明:
package main
import "fmt"
func main() {
// 1.声明一个管道
var mych chan int
// 2.初始化一个管道
mych = make(chan int, 3)
// 3.查看管道的长度和容量
fmt.Println("长度是", len(mych), "容量是", cap(mych))
// 4.像管道中写入数据
mych<- 666
fmt.Println("长度是", len(mych), "容量是", cap(mych))
// 5.取出管道中写入的数据
num := <-mych
fmt.Println("num = ", num)
fmt.Println("长度是", len(mych), "容量是", cap(mych))
}
- 注意点:
- 管道中只能存放声明的数据类型, 不能存放其它数据类型
- 管道中如果已经没有数据, 再取就会报错
- 如果管道中数据已满, 再写入就会报错
package main
import "fmt"
func main() {
// 1.声明一个管道
var mych chan int
// 2.初始化一个管道
mych = make(chan int, 3)
// 注意点: 管道中只能存放声明的数据类型, 不能存放其它数据类型
//mych<-3.14
// 注意点: 管道中如果已经没有数据,
// 并且检测不到有其它协程再往管道中写入数据, 那么再取就会报错
//num = <-mych
//fmt.Println("num = ", num)
// 注意点: 如果管道中数据已满, 再写入就会报错
mych<- 666
mych<- 777
mych<- 888
mych<- 999
}
- 管道的关闭和遍历
package main
import "fmt"
func main() {
// 1.创建一个管道
mych := make(chan int, 3)
// 2.往管道中存入数据
mych<-666
mych<-777
mych<-888
// 3.遍历管道
// 第一次遍历i等于0, len = 3,
// 第二次遍历i等于1, len = 2
// 第三次遍历i等于2, len = 1
//for i:=0; i<len(mych); i++{
// fmt.Println(<-mych) // 输出结果不正确
//}
// 3.写入完数据之后先关闭管道
// 注意点: 管道关闭之后只能读不能写
close(mych)
//mych<- 999 // 报错
// 4.遍历管道
// 利用for range遍历, 必须先关闭管道, 否则会报错
//for value := range mych{
// fmt.Println(value)
//}
// close主要用途:
// 在企业开发中我们可能不确定管道有还没有有数据, 所以我们可能一直获取
// 但是我们可以通过ok-idiom模式判断管道是否关闭, 如果关闭会返回false给ok
for{
if num, ok:= <-mych; ok{
fmt.Println(num)
}else{
break;
}
}
fmt.Println("数据读取完毕")
}
- Channel阻塞现象
- 单独在主线程中操作管道, 写满了会报错, 没有数据去获取也会报错
- 只要在协程中操作管道过, 写满了就会阻塞, 没有就数据去获取也会阻塞
package main
import (
"fmt"
"time"
)
// 创建一个管道
var myCh = make(chan int, 5)
func demo() {
var myCh = make(chan int, 5)
//myCh<-111
//myCh<-222
//myCh<-333
//myCh<-444
//myCh<-555
//fmt.Println("我是第六次添加之前代码")
//myCh<-666
//fmt.Println("我是第六次添加之后代码")
fmt.Println("我是第六次直接获取之前代码")
<-myCh
fmt.Println("我是第六次直接获取之后代码")
}
func test() {
//myCh<-111
//myCh<-222
//myCh<-333
//myCh<-444
//myCh<-555
//fmt.Println("我是第六次添加之前代码")
//myCh<-666
//fmt.Println("我是第六次添加之后代码")
//fmt.Println("我是第六次直接获取之前代码")
//<-myCh
//fmt.Println("我是第六次直接获取之后代码")
}
func example() {
time.Sleep(time.Second * 2)
myCh<-666
}
func main() {
// 1.同一个go程中操作管道
// 写满了会报错
//myCh<-111
//myCh<-222
//myCh<-333
//myCh<-444
//myCh<-555
//myCh<-666
// 没有了去取也会报错
//<-myCh
// 2.在协程中操作管道
// 写满了不会报错, 但是会阻塞
//go test()
// 没有了去取也不会报错, 也会阻塞
//go test()
//go demo()
//go demo()
// 3.只要在协程中操作了管道, 就会发生阻塞现象
go example()
fmt.Println("myCh之前代码")
<-myCh
fmt.Println("myCh之后代码")
//for{
// ;
//}
}
- 利用Channel实现生产者消费者
package main
import (
"fmt"
"math/rand"
"time"
)
// 定义缓冲区
var myCh = make(chan int, 5)
var exitCh = make(chan bool, 1)
// 定义生产者
func producer(){
rand.Seed(time.Now().UnixNano())
for i:=0;i<10;i++{
num := rand.Intn(100)
fmt.Println("生产者生产了: ", num)
// 往管道中写入数据
myCh<-num
//time.Sleep(time.Millisecond * 500)
}
// 生产完毕之后关闭管道
close(myCh)
fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer() {
// 不断从管道中获取数据, 直到管道关闭位置
for{
if num, ok := <-myCh; !ok{
break
}else{
fmt.Println("---消费者消费了", num)
}
}
fmt.Println("消费者停止消费")
exitCh<-true
}
func main() {
go producer()
go consumer()
fmt.Println("exitCh之前代码")
<-exitCh
fmt.Println("exitCh之后代码")
}
- 无缓冲Channel
package main
import "fmt"
var myCh1 = make(chan int, 5)
var myCh2 = make(chan int, 0)
func main() {
// 有缓冲管道
// 只写入, 不读取不会报错
//myCh1<-1
//myCh1<-2
//myCh1<-3
//myCh1<-4
//myCh1<-5
//fmt.Println("len =",len(myCh1), "cap =", cap(myCh1))
// 无缓冲管道
// 只有两端同时准备好才不会报错
go func() {
fmt.Println(<-myCh2)
}()
// 只写入, 不读取会报错
myCh2<-1
//fmt.Println("len =",len(myCh2), "cap =", cap(myCh2))
// 写入之后在同一个线程读取也会报错
//fmt.Println(<-myCh2)
// 在主程中先写入, 在子程中后读取也会报错
//go func() {
// fmt.Println(<-myCh2)
//}()
}
- 无缓冲Channel和有缓冲Channel
- 有缓冲管道具备异步的能力(写几个读一个或读几个)
- 无缓冲管道具备同步的能力(写一个读一个)
package main
import (
"fmt"
"math/rand"
"time"
)
// 定义缓冲区
//var myCh = make(chan int, 0)
var myCh = make(chan int)
var exitCh = make(chan bool, 1)
// 定义生产者
func producer(){
rand.Seed(time.Now().UnixNano())
for i:=0;i<10;i++{
num := rand.Intn(100)
fmt.Println("生产者生产了: ", num)
// 往管道中写入数据
myCh<-num
//time.Sleep(time.Millisecond * 500)
}
// 生产完毕之后关闭管道
close(myCh)
fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer() {
// 不断从管道中获取数据, 直到管道关闭位置
for{
if num, ok := <-myCh; !ok{
break
}else{
fmt.Println("---消费者消费了", num)
}
}
fmt.Println("消费者停止消费")
exitCh<-true
}
func main() {
go producer()
go consumer()
fmt.Println("exitCh之前代码")
<-exitCh
fmt.Println("exitCh之后代码")
}
IO的延迟说明:
看到的输出结果和我们想象的不太一样, 是因为IO输出非常消耗性能, 输出之后还没来得及赋值可能就跑去执行别的协程了
- 单向管道和双向管道
- 默认情况下所有管道都是双向了(可读可写)
- 但是在企业开发中, 我们经常需要用到将一个管道作为参数传递
- 在传递的过程中希望对方只能单向使用, 要么只能写,要么只能读
- 双向管道
- var myCh chan int = make(chan int, 0)
- 单向管道
- var myCh chan<- int = make(chan<- int, 0)
- var myCh <-chan int = make(<-chan int, 0)
- 注意点:
- 双向管道可以自动转换为任意一种单向管道
- 单向管道不能转换为双向管道
package main
import "fmt"
func main() {
// 1.定义一个双向管道
var myCh chan int = make(chan int, 5)
// 2.将双向管道转换单向管道
var myCh2 chan<- int
myCh2 = myCh
fmt.Println(myCh2)
var myCh3 <-chan int
myCh3 = myCh
fmt.Println(myCh3)
// 3.双向管道,可读可写
myCh<-1
myCh<-2
myCh<-3
fmt.Println(<-myCh)
// 3.只写管道,只能写, 不能读
// myCh2<-666
// fmt.Println(<-myCh2)
// 4.指读管道, 只能读,不能写
fmt.Println(<-myCh3)
//myCh3<-666
// 注意点: 管道之间赋值是地址传递, 以上三个管道底层指向相同容器
}
- 单向管道作为函数参数
package main
import (
"fmt"
"math/rand"
"time"
)
// 定义生产者
func producer(myCh chan<- int){
rand.Seed(time.Now().UnixNano())
for i:=0;i<10;i++{
num := rand.Intn(100)
fmt.Println("生产者生产了: ", num)
// 往管道中写入数据
myCh<-num
//time.Sleep(time.Millisecond * 500)
}
// 生产完毕之后关闭管道
close(myCh)
fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer(myCh <-chan int) {
// 不断从管道中获取数据, 直到管道关闭位置
for{
if num, ok := <-myCh; !ok{
break
}else{
fmt.Println("---消费者消费了", num)
}
}
fmt.Println("消费者停止消费")
}
func main() {
// 定义缓冲区
var myCh = make(chan int, 5)
go producer(myCh)
consumer(myCh)
}
select选择结构
- select是Go中的一个控制结构,类似于switch语句,用于处理异步IO操作
- 如果有多个case都可以运行,select会随机选出一个执行,其他不会执行。
- 如果没有可运行的case语句,且有default语句,那么就会执行default的动作。
- 如果没有可运行的case语句,且没有default语句,select将阻塞,直到某个case通信可以运行
select {
case IO操作1:
IO操作1读取或写入成功就执行
case IO操作2:
IO操作2读取或写入成功就执行
default:
如果上面case都没有成功,则进入default处理流程
}
- 注意点:
- select的case后面必须是一个IO操作
- 一般情况下使用select结构不用写default
package main
import (
"fmt"
"time"
)
func main() {
// 创建管道
var myCh = make(chan int)
var exitCh = make(chan bool)
// 生产数据
go func() {
for i:=0;i <10;i++{
myCh<-i
time.Sleep(time.Second)
}
//close(myCh)
exitCh<-true
}()
// 读取数据
for{
fmt.Println("读取代码被执行了")
select {
case num:= <-myCh:
fmt.Println("读到了", num)
case <-exitCh:
//break // 没用, 跳出的是select
return
}
fmt.Println("-----------")
}
}
- select应用场景
- 实现多路监听
- 实现超时处理
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
// 1.创建管道
myCh := make(chan int, 5)
exitCh := make(chan bool)
// 2.生成数据
go func() {
for i:=0; i<10; i++ {
myCh<-i
time.Sleep(time.Second * 3)
}
}()
// 3.获取数据
go func() {
for{
select {
case num:= <-myCh:
fmt.Println(num)
case <-time.After(time.Second * 2):
exitCh<-true
runtime.Goexit()
}
}
}()
<-exitCh
fmt.Println("程序结束")
}
定时器补充
- 一次性定时器
- NewTimer函数
- func NewTimer(d Duration) *Timer
- NewTimer创建一个Timer,它会在到期后向Timer自身的C字段发送当时的时间
type Timer struct {
C <-chan Time // 对于我们来说, 这个属性是只读的管道
r runtimeTimer
}
package main
import (
"fmt"
"time"
)
func main() {
start := time.Now()
fmt.Println("开始时间", start)
timer := time.NewTimer(time.Second * 3)
fmt.Println("读取之前代码被执行")
end := <-timer.C // 系统写入数据之前会阻塞
fmt.Println("读取之后代码被执行")
fmt.Println("结束时间", end)
}
- After函数
- func After(d Duration) <-chan Time
- 底层就是对NewTimer的封装, 只不过返回值不同而已
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
package main
import (
"fmt"
"time"
)
func main() {
start := time.Now()
fmt.Println("开始时间", start)
timer := time.After(time.Second * 3)
fmt.Println("读取之前代码被执行")
end := <-timer // 系统写入数据之前会阻塞
fmt.Println("读取之后代码被执行")
fmt.Println("结束时间", end)
}
- 周期性定时器
- NewTicker函数
- func NewTicker(d Duration) *Ticker
- 和NewTimer差不多, 只不过NewTimer只会往管道中写入一次数据, 而NewTicker每隔一段时间就会写一次
type Ticker struct {
C <-chan Time // 周期性传递时间信息的通道
// 内含隐藏或非导出字段
}
package main
import (
"fmt"
"time"
)
func main() {
// 1.创建一个周期定时器
ticker := time.NewTicker(time.Second)
// 2.不断从重启定时器中获取时间
for{
t := <-ticker.C // 系统写入数据之前会阻塞
fmt.Println(t)
}
}