前言
最近在学习Go并发,在同学强烈推荐下,阅读了tunny源码。在此记录自己的理解和感想。
tunny 基于Go实现的协程池
要去理解一个东西,最快的方式莫过于先去熟悉使用它。那么,现在我们就开始使用它:假设我们现在的需求是输入一个字符串,将它与"Hello"拼接后打印并返回,要求用tunny实现。
#在NewFunc函数中需要对传入的函数进行转换并执行
printHello := func(str interface{}) interface{} {
fmt.Println("Hello!", str)
return "Hello! " + str.(string)
}
pool3 := tunny.NewFunc(3, func(payload interface{}) interface{} {
f, ok := payload.(func())
if !ok {
return nil
}
f()
return f
})
pool3.Process(printHello("lizhuoming"))
#而NewCallback的Process函数封装了这个操作
printHello := func(str interface{}) interface{} {
fmt.Println("Hello!", str)
return "Hello! " + str.(string)
}
pool2 := tunny.NewCallback(2)
pool2.Process(printHello("lizhuoming"))
#New的灵活度最高,我们可以定制自己的Worker
type myWorker struct {
processor func(interface{}) interface{}
}
func (w *myWorker) Process(payload interface{}) interface{} {
return w.processor(payload)
}
func (w *myWorker) BlockUntilReady() {}
func (w *myWorker) Interrupt() {}
func (w *myWorker) Terminate() {}
func main() {
printHello := func(str interface{}) interface{} {
fmt.Println("Hello!", str)
return "Hello! " + str.(string)
}
pool1 := tunny.New(3, func() tunny.Worker {
return &myWorker{
processor: printHello,
}
})
pool1.Process("lizhuoming")
}
源码分析
在熟悉了tunny的使用后,我们通过代码来看看它是如何工作的吧~
协程池的主要工作流程
在我们创建并指定协程池容量后,协程池会启动指定容量个协程。它们竞争向一个channel中写入 workRequest(它充当一个桥梁,连接 Process 函数与真正执行任务的协程)。当你调用 Process 函数时,它会通过这个桥梁将任务传递给协程,并在任务结束后,接收到协程返回的结果。
下面,我们来了解它的具体实现吧
桥梁以及Process函数与协程之间的通信实现
type Worker interface {
// 执行任务
Process(interface{}) interface{}
// 在执行任务前执行,相当于init
BlockUntilReady()
// 在任务执行时被终止时,会执行该函数
Interrupt()
// 当协程被关闭时,执行该函数
Terminate()
}
//协程池
type Pool struct {
//正在执行的任务数量
queuedJobs int64
ctor func() Worker
workers []*workerWrapper
//所有运行的协程会竞争向该channel写入workRequest
reqChan chan workRequest
workerMut sync.Mutex
}
//桥梁载体
type workRequest struct {
//接收任务的channel
jobChan chan<- interface{}
//返回结果的channel
retChan <-chan interface{}
//中断协程的执行
interruptFunc func()
}
//负责管理worker(stop函数)和goroutine(interrupt函数)的整个生命周期
type workerWrapper struct {
worker Worker
interruptChan chan struct{}
// workerWrapper 和 Pool 的reqChan是同一个(channel是引用传递)
reqChan chan<- workRequest
closeChan chan struct{}
closedChan chan struct{}
}
func (p *Pool) ProcessTimed(
payload interface{},
timeout time.Duration,
) (interface{}, error) {
atomic.AddInt64(&p.queuedJobs, 1)
defer atomic.AddInt64(&p.queuedJobs, -1)
tout := time.NewTimer(timeout)
var request workRequest
var open bool
select {
//读取桥梁载体
case request, open = <-p.reqChan:
if !open {
return nil, ErrPoolNotRunning
}
//超时处理
case <-tout.C:
return nil, ErrJobTimedOut
}
select {
//通过桥梁载体将任务传给协程
case request.jobChan <- payload:
case <-tout.C:
//调用 workerWrapper 的 interrupt 方法,结束函数执行
request.interruptFunc()
return nil, ErrJobTimedOut
}
select {
//接收返回数据
case payload, open = <-request.retChan:
if !open {
return nil, ErrWorkerClosed
}
case <-tout.C:
//调用 workerWrapper 的 interrupt 方法,结束函数执行
request.interruptFunc()
return nil, ErrJobTimedOut
}
tout.Stop()
return payload, nil
}
func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
for {
w.worker.BlockUntilReady()
select {
// 给channel中写入桥梁载体,为协程私有
case w.reqChan <- workRequest{
jobChan: jobChan,
retChan: retChan,
interruptFunc: w.interrupt,
}:
select {
//尝试读取任务
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
//执行被中断,新建中断 channel
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
// 协程被关闭
case <-w.closeChan:
return
}
}
}
协程池如何保证协程数恒定
func (p *Pool) SetSize(n int) {
p.workerMut.Lock()
defer p.workerMut.Unlock()
lWorkers := len(p.workers)
if lWorkers == n {
return
}
// 给池中添加协程
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}
// 异步关闭超出的协程
for i := n; i < lWorkers; i++ {
p.workers[i].stop()
}
// 同步等待所有超出协程都关闭完成
for i := n; i < lWorkers; i++ {
p.workers[i].join()
}
p.workers = p.workers[:n]
}
一些感想
代码就说到这儿了,下面来谈谈我的感想:
(1)不得不说人家的代码健壮性真好,以后自己在写代码时也要借鉴人家的经验
(2)通过对 workerWrapper 和 workRequest 的设计和逻辑的拆分,使代码解耦,并且这样的代码逻辑看起来是非常清晰的