熟悉go编程的同学,肯定都用过time.Sleep来暂停goroutine的执行,但是time.Sleep无法实现按照事件暂停和恢复。换句话说,你一旦设定了暂停时间,那后面的事情就由不得你了,你设了暂停10秒就是10秒,设了1分钟就是1分钟,而且你没法“永远暂停”下去。
那么现在问题就来了,我有一个数据流的播放任务,希望做到用户点击暂停⏸️按钮(发出暂停信号)的时候,播放任务被暂停(不再输出数据),而用户点击恢复⏯按钮(发出恢复信号)的时候,播放任务从被暂停的地方继续。这个暂停和恢复可以按照用户的意愿进行无数多次,暂停多久不能预先设定,而是看用户的心情😄
关于这个问题,我在网上找了挺久,并没有找到特别好的例子,直到看到《Go并发编程实战》这本书才得到了启示。
select 语句
select语句是一个仅能被用于发送和接收通道中的元素值的专用语句,一个select语句在被执行的时候会根据通道的值选择执行其中的某一个分支,通道里没有值的时候就走default分支(前提是你写了这么一个default分支)
现在利用select语句,我们可以写出控制数据流的关键代码(为方便起见,在此我们把数据流简化为一个循环输出)
// 略去一些声明
循环总次数 := 10000
运行信号 := make(chan struct{})
for i := 0; i < 循环总次数; {
select {
case <-运行信号:
fmt.Printf("数据流播放到:%d\n", i)
time.Sleep(1 * time.Second) // 让播放显得慢一点,每次停1秒
i++
default:
continue // 暂停时保持空转
}
}
我们要暂停数据流播放的时候,只需要让
运行信号 = nil
即可。
此时,名为“运行信号”的通道里没有值,所以select语句只会走default分支,那么整个for循环就进入到了一个无限空转的过程中,直到下一次“运行信号”里再有值才会继续数据流的播放。
Task 数据流的模拟
package task // task.go
import "log"
// Task 任务结构体
type Task struct {
任务编号 string
循环总次数 int
是否完成 bool
工作信号 <-chan struct{}
工作信号备份 <-chan struct{}
}
// NewTask 新任务
func NewTask(任务编号 string, 循环总次数 int) *Task {
ch := make(chan struct{})
defer close(ch)
return &Task{
任务编号: 任务编号,
循环总次数: 循环总次数,
工作信号: ch,
工作信号备份: ch,
}
}
// Play 开始运行
func (t *Task) Play() {
log.Printf("启动任务:%s , 次数: %d\n", t.任务编号, t.循环总次数)
for i := 0; i < t.循环总次数; { // 用循环模拟数据流
select {
case <-t.工作信号:
log.Printf("任务 %s @ %d\n", t.任务编号, i)
time.Sleep(1 * time.Second) // 让模拟数据流走得慢点
i++
default:
continue // ⚠️ 这个 continue 是Pause时运行 空转的
}
}
t.是否完成 = true
}
// Pause 暂停
func (t *Task) Pause() {
if !t.是否完成 {
t.工作信号 = nil
log.Printf("%s # 暂停 \n", t.任务编号)
} else {
log.Printf("%s # 已运行完毕,不能暂停 \n", t.任务编号)
}
}
// Resume 恢复执行
func (t *Task) Resume() {
if !t.是否完成 {
t.工作信号 = t.工作信号备份
log.Printf("%s # 恢复执行 \n", t.任务编号)
} else {
log.Printf("%s # 已运行完毕,不能恢复 \n", t.任务编号)
}
}
主程序 模拟多次暂停与恢复
package main
import (
"(...路径)/task"
"log"
"sync"
"time"
)
func main() {
任务1名称 := "task #15"
t1 := task.NewTask(任务1名称, 15) // 创建一个数据流任务
var wg sync.WaitGroup
wg.Add(5)
go func() {
defer wg.Done()
log.Println("任务启动")
t1.Play()
log.Println("播放结束")
}()
go func() {
defer wg.Done()
time.Sleep(4 * time.Second)
log.Printf("%s 启动4秒后,试图暂停\n", 任务1名称)
t1.Pause()
}()
go func() {
defer wg.Done()
time.Sleep(8 * time.Second)
log.Printf("%s 启动暂停8秒后,试图恢复\n", 任务1名称)
t1.Resume()
}()
go func() {
defer wg.Done()
time.Sleep(12*time.Second)
log.Printf("%s 启动12秒后,再次暂停 \n", 任务1名称)
t1.Pause()
}()
go func() {
defer wg.Done()
time.Sleep(15 * time.Second)
log.Printf("%s 启动15秒后,再次恢复\n", 任务1名称)
t1.Resume()
}()
wg.Wait()
}
小结
以上模拟了一个最简单的数据流的多次暂停与恢复,我们可以根据实际项目中的需要来扩展Task以及其调用。
《Go并发编程实战》写的非常细致,Go并发的原理讲得很清楚,也有很多贴近实战的代码,对于想深入学习Go语言的同学来说是本很好的教程。