本节介绍如何使用带缓冲的channel时间资源池。pool模式正在需要共享一组静态资源的情况(入共享数据库连接或者内存缓冲区)下非常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。
pool.go代码
package main
import (
"errors"
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
//pool管理一组可以安全地在多个goroutine间共享得到资源。被管理的资源必须实现io.Closer接口
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
var ErrPoolClosed = errors.New("Pool has been closed.")
//New创建一个用来管理资源的池,这个池需要一个可以分配新资源的函数,并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size < 0 {
return nil, errors.New("Size value too small")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
//Acquire从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources: //检查是否有空闲的资源
log.Println("ACquire:", "shared resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
log.Println("Acquire:", "New resource")
return p.factory()
}
}
//Release将一个使用后的资源池释放回池里
func (p *Pool) Release(r io.Closer) {
p.m.Lock() //确保本操作和Close操作安全
defer p.m.Unlock()
if p.closed { //如果资源池已经关闭,销毁这个资源
r.Close()
return
}
select {
case p.resources <- r: //试图将这个资源放入队列
log.Println("Release:", "In Queue")
default: //如果队列已满,则关闭这个资源
log.Println("Release", "Closing")
r.Close()
}
}
//close会让资源池停止工作,并关闭所有的资源
func (p *Pool) Close() {
p.m.Lock() //确保本操作与release操作安全
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true //将池关闭
close(p.resources) //在清空通道里的资源前,将通道关闭,如果不这样做,会发生死锁
//关闭资源
for r := range p.resources {
r.Close()
}
}
代码解析:
//pool管理一组可以安全地在多个goroutine间共享得到资源。被管理的资源必须实现io.Closer接口
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
Pool结构体声明了4个字段,sysc.Mutex类型,互斥锁用来保证多个goroutine访问资源池时,池内的值是安全的。第二个字段名为resources,被声明为io.Closer接口类型的通道。这个通道是作为一个有缓冲的通道创建的,用来保存共享资源。由于通道的类型是一个接口,所以池可以管理任意实现了io.Closer接口的资源类型。factory字段是一个函数类型。任何一个没有输入参数且返回一个io.Closer和一个error接口值的函数,都可以赋值给这个字段。这个函数的目的是,当池需要一个新的资源时,可以用这个函数创建。这个函数的实现细节不在Pool的范围,需要包使用者自定义实现。最有一个字段closed,是一个标志,表示Pool是否已被关闭。
//表示请求(Acquire)了一个关闭的资源池
var ErrPoolClosed = errors.New("Pool has been closed.")
Go语言里会经常创建error接口变量。这可以让调用者来判断某个包里的函数或者方法返回的具体错误值。当调用者对一个已关闭的池调用Acquire方法时,会返回该接口变量。因为Acquire方法可能返回多个不同类型的错误,所有Pool已经关闭时会关闭会返回这个错误变量让调用者从其他错误中识别出这个特定的错误。
接下来可以看下pool的函数和方法了:
//New创建一个用来管理资源的池,这个池需要一个可以分配新资源的函数,并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size < 0 {
return nil, errors.New("Size value too small")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
函数New接收两个参数,并返回两个值。第一个参数fn声明为一个函数类型,这个函数不接受任何参数,返回一个io.Closer和一个error接口值。这个作为参数的函数是一个工厂函数,用来创建由池管理的资源值。第二个参数size表示为了保存资源而创建的有缓冲的通道的缓冲区大小。if语句检查size的值,保证这个值不小于等于0.如果这个值小于0,就会使用nil值作为返回的pool指针,然后为该错误创建一个error接口值。因为这是这个函数唯一可能返回的错误值,所以不需要为这个错误单独创建和使用一个error接口变量。如果能够接收传入的size,就会创建并初始化一个新的pool值。函数fn被赋值给factory字段。
//Acquire从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources: //检查是否有空闲的资源
log.Println("ACquire:", "shared resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
log.Println("Acquire:", "New resource")
return p.factory()
}
}
Acqure方法,通过select表示通道缓冲区里还有可用的资源时从资源池里返回一个资源,否则会为该调用创建并返回一个新的资源。有资源就执行case语句,返回r,没有就执行default分支。通过用户自定义fn创建并返回一个新资源。
如果不需要已经获得的资源,必须将这个资源释放回资源池。这个是release方法的任务。在理解release方法之前先看下close方法:
/close会让资源池停止工作,并关闭所有的资源
func (p *Pool) Close() {
p.m.Lock() //确保本操作与release操作安全
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true //将池关闭
close(p.resources) //在清空通道里的资源前,将通道关闭,如果不这样做,会发生死锁
//关闭资源
for r := range p.resources {
r.Close()
}
}
一旦程序不再使用资源池,需要调用资源池的Close方法。这个方法关闭并清空了有缓冲的通道,并将缓冲的空闲资源关闭。需要注意的是,在同一时刻只能有一个goroutine执行这段代码。事实上,当这段代码被执行时,必须保证其他goroutine中没有同时执行release方法,这很重要。互斥量被加锁,并在函数返回时解锁。if检查closed标志判断池是否已经关闭。如果已经关闭,该方法直接返回,并释放锁。如果这个方法第一次被调用,就会将这个标志设置为true,并关闭且清空resource通道。因为资源池里面的对象都实现io.Closer接口,都是需要调用close方法关闭io操作。
//Release将一个使用后的资源池释放回池里
func (p *Pool) Release(r io.Closer) {
p.m.Lock() //确保本操作和Close操作安全
defer p.m.Unlock()
if p.closed { //如果资源池已经关闭,销毁这个资源
r.Close()
return
}
select {
case p.resources <- r: //试图将这个资源放入队列
log.Println("Release:", "In Queue")
default: //如果队列已满,则关闭这个资源
log.Println("Release", "Closing")
r.Close()
}
}
Release方法首先对互斥量加锁和解锁。这和Close方法中的互斥锁是同一个互斥量。这样可以阻止两个方法在不同goroutine里同时运行。使用互斥量有两个目的。第一,可以保护closed读取行为,保证同一时刻不会有其他goroutine调用Close方法写同一个标志。第二,我们不想往一个已经关闭的通道里发送数据,因为那样会引起崩溃。如果closed表示是true,我们就知道resources通道已经关闭。如果池已经关闭,就会直接调用资源r的close方法。因为这是已经清空并关闭了池,所以无法将资源重新释放回该资源池。对closed标志的读写必须进行同步,否则可能误导其他goroutine,让其认为资源池依旧是打开的,并试图对通道进行无效的操作。
接下来我们实现main方法来使用pool包:
const (
maxGoroutines = 25 //需要使用的goroutine的数量
pooledResources = 2 //池中的资源数量
)
//dbconnection模拟要共享的资源
type dbconnection struct {
ID int32
}
//Close实现了io.Closer接口,以便dbconnection可以被池管理。Close用来完成任意资源的释放管理
func (dbConn *dbconnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
var idCounter int32 //用来给每个连接分配独一无二的id
//当需要一个新连接时,资源池会调用这个函数
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbconnection{id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
//创建用来管理连接的池
pool, err := New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
//使用池里的连接完成查询
for query := 0; query < maxGoroutines; query++ {
//每个goroutine需要自己复制一份要查询值的副本,不然所有的查询会共享同一个查询变量
go func(q int) {
performQuery(q, pool)
wg.Done()
}(query)
}
//等待goroutine的结束
wg.Wait()
//关闭池
log.Println("Shutdown program.")
pool.Close()
}
//performQuery用来测试连接的资源池
func performQuery(query int, p *Pool) {
//从池里请求一个连接
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
//将该连接释放回池里
defer p.Release(conn)
//用等待来模拟查询响应
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbconnection).ID)
}
执行上述main函数结果如下:
GOROOT=/opt/go #gosetup
GOPATH=/home/wmj/go #gosetup
/opt/go/bin/go build -o /tmp/___go_build_pool_go /home/wmj/go/src/awesomeProject1/pool.go #gosetup
/tmp/___go_build_pool_go #gosetup
2020/06/02 00:04:13 Acquire: New resource
2020/06/02 00:04:13 Acquire: New resource
2020/06/02 00:04:13 Acquire: New resource
2020/06/02 00:04:13 Create: New Connection 3
2020/06/02 00:04:13 Create: New Connection 2
......
020/06/02 00:04:14 QID[7] CID[2]
2020/06/02 00:04:14 Release Closing
2020/06/02 00:04:14 Close: Connection 2
2020/06/02 00:04:14 Shutdown program.
2020/06/02 00:04:14 Close: Connection 25
2020/06/02 00:04:14 Close: Connection 5
Process finished with exit code 0