go资源池pool实现原理

本节介绍如何使用带缓冲的channel时间资源池。pool模式正在需要共享一组静态资源的情况(入共享数据库连接或者内存缓冲区)下非常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。

pool.go代码

package main

import (
    "errors"
    "io"
    "log"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

//pool管理一组可以安全地在多个goroutine间共享得到资源。被管理的资源必须实现io.Closer接口
type Pool struct {
    m         sync.Mutex
    resources chan io.Closer
    factory   func() (io.Closer, error)
    closed    bool
}

var ErrPoolClosed = errors.New("Pool has been closed.")

//New创建一个用来管理资源的池,这个池需要一个可以分配新资源的函数,并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size < 0 {
        return nil, errors.New("Size value too small")
    }
    return &Pool{
        factory:   fn,
        resources: make(chan io.Closer, size),
    }, nil
}

//Acquire从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
    select {
    case r, ok := <-p.resources: //检查是否有空闲的资源
        log.Println("ACquire:", "shared resource")
        if !ok {
            return nil, ErrPoolClosed
        }
        return r, nil
    default:
        log.Println("Acquire:", "New resource")
        return p.factory()
    }
}

//Release将一个使用后的资源池释放回池里
func (p *Pool) Release(r io.Closer) {
    p.m.Lock() //确保本操作和Close操作安全
    defer p.m.Unlock()
    if p.closed { //如果资源池已经关闭,销毁这个资源
        r.Close()
        return
    }
    select {
    case p.resources <- r: //试图将这个资源放入队列
        log.Println("Release:", "In Queue")
    default: //如果队列已满,则关闭这个资源
        log.Println("Release", "Closing")
        r.Close()
    }
}

//close会让资源池停止工作,并关闭所有的资源
func (p *Pool) Close() {
    p.m.Lock() //确保本操作与release操作安全
    defer p.m.Unlock()
    if p.closed {
        return
    }
    p.closed = true    //将池关闭
    close(p.resources) //在清空通道里的资源前,将通道关闭,如果不这样做,会发生死锁
    //关闭资源
    for r := range p.resources {
        r.Close()
    }
}

代码解析:

//pool管理一组可以安全地在多个goroutine间共享得到资源。被管理的资源必须实现io.Closer接口
type Pool struct {
    m         sync.Mutex
    resources chan io.Closer
    factory   func() (io.Closer, error)
    closed    bool
}

Pool结构体声明了4个字段,sysc.Mutex类型,互斥锁用来保证多个goroutine访问资源池时,池内的值是安全的。第二个字段名为resources,被声明为io.Closer接口类型的通道。这个通道是作为一个有缓冲的通道创建的,用来保存共享资源。由于通道的类型是一个接口,所以池可以管理任意实现了io.Closer接口的资源类型。factory字段是一个函数类型。任何一个没有输入参数且返回一个io.Closer和一个error接口值的函数,都可以赋值给这个字段。这个函数的目的是,当池需要一个新的资源时,可以用这个函数创建。这个函数的实现细节不在Pool的范围,需要包使用者自定义实现。最有一个字段closed,是一个标志,表示Pool是否已被关闭。

//表示请求(Acquire)了一个关闭的资源池
var ErrPoolClosed = errors.New("Pool has been closed.")

Go语言里会经常创建error接口变量。这可以让调用者来判断某个包里的函数或者方法返回的具体错误值。当调用者对一个已关闭的池调用Acquire方法时,会返回该接口变量。因为Acquire方法可能返回多个不同类型的错误,所有Pool已经关闭时会关闭会返回这个错误变量让调用者从其他错误中识别出这个特定的错误。

接下来可以看下pool的函数和方法了:

//New创建一个用来管理资源的池,这个池需要一个可以分配新资源的函数,并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size < 0 {
        return nil, errors.New("Size value too small")
    }
    return &Pool{
        factory:   fn,
        resources: make(chan io.Closer, size),
    }, nil
}

函数New接收两个参数,并返回两个值。第一个参数fn声明为一个函数类型,这个函数不接受任何参数,返回一个io.Closer和一个error接口值。这个作为参数的函数是一个工厂函数,用来创建由池管理的资源值。第二个参数size表示为了保存资源而创建的有缓冲的通道的缓冲区大小。if语句检查size的值,保证这个值不小于等于0.如果这个值小于0,就会使用nil值作为返回的pool指针,然后为该错误创建一个error接口值。因为这是这个函数唯一可能返回的错误值,所以不需要为这个错误单独创建和使用一个error接口变量。如果能够接收传入的size,就会创建并初始化一个新的pool值。函数fn被赋值给factory字段。

//Acquire从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
    select {
    case r, ok := <-p.resources: //检查是否有空闲的资源
        log.Println("ACquire:", "shared resource")
        if !ok {
            return nil, ErrPoolClosed
        }
        return r, nil
    default:
        log.Println("Acquire:", "New resource")
        return p.factory()
    }
}

Acqure方法,通过select表示通道缓冲区里还有可用的资源时从资源池里返回一个资源,否则会为该调用创建并返回一个新的资源。有资源就执行case语句,返回r,没有就执行default分支。通过用户自定义fn创建并返回一个新资源。

如果不需要已经获得的资源,必须将这个资源释放回资源池。这个是release方法的任务。在理解release方法之前先看下close方法:

/close会让资源池停止工作,并关闭所有的资源
func (p *Pool) Close() {
    p.m.Lock() //确保本操作与release操作安全
    defer p.m.Unlock()
    if p.closed {
        return
    }
    p.closed = true    //将池关闭
    close(p.resources) //在清空通道里的资源前,将通道关闭,如果不这样做,会发生死锁
    //关闭资源
    for r := range p.resources {
        r.Close()
    }
}

一旦程序不再使用资源池,需要调用资源池的Close方法。这个方法关闭并清空了有缓冲的通道,并将缓冲的空闲资源关闭。需要注意的是,在同一时刻只能有一个goroutine执行这段代码。事实上,当这段代码被执行时,必须保证其他goroutine中没有同时执行release方法,这很重要。互斥量被加锁,并在函数返回时解锁。if检查closed标志判断池是否已经关闭。如果已经关闭,该方法直接返回,并释放锁。如果这个方法第一次被调用,就会将这个标志设置为true,并关闭且清空resource通道。因为资源池里面的对象都实现io.Closer接口,都是需要调用close方法关闭io操作。

//Release将一个使用后的资源池释放回池里
func (p *Pool) Release(r io.Closer) {
    p.m.Lock() //确保本操作和Close操作安全
    defer p.m.Unlock()
    if p.closed { //如果资源池已经关闭,销毁这个资源
        r.Close()
        return
    }
    select {
    case p.resources <- r: //试图将这个资源放入队列
        log.Println("Release:", "In Queue")
    default: //如果队列已满,则关闭这个资源
        log.Println("Release", "Closing")
        r.Close()
    }
}

Release方法首先对互斥量加锁和解锁。这和Close方法中的互斥锁是同一个互斥量。这样可以阻止两个方法在不同goroutine里同时运行。使用互斥量有两个目的。第一,可以保护closed读取行为,保证同一时刻不会有其他goroutine调用Close方法写同一个标志。第二,我们不想往一个已经关闭的通道里发送数据,因为那样会引起崩溃。如果closed表示是true,我们就知道resources通道已经关闭。如果池已经关闭,就会直接调用资源r的close方法。因为这是已经清空并关闭了池,所以无法将资源重新释放回该资源池。对closed标志的读写必须进行同步,否则可能误导其他goroutine,让其认为资源池依旧是打开的,并试图对通道进行无效的操作。

接下来我们实现main方法来使用pool包:

const (
    maxGoroutines   = 25 //需要使用的goroutine的数量
    pooledResources = 2  //池中的资源数量
)

//dbconnection模拟要共享的资源
type dbconnection struct {
    ID int32
}

//Close实现了io.Closer接口,以便dbconnection可以被池管理。Close用来完成任意资源的释放管理
func (dbConn *dbconnection) Close() error {
    log.Println("Close: Connection", dbConn.ID)
    return nil
}

var idCounter int32 //用来给每个连接分配独一无二的id

//当需要一个新连接时,资源池会调用这个函数
func createConnection() (io.Closer, error) {
    id := atomic.AddInt32(&idCounter, 1)
    log.Println("Create: New Connection", id)

    return &dbconnection{id}, nil
}

func main() {
    var wg sync.WaitGroup
    wg.Add(maxGoroutines)

    //创建用来管理连接的池
    pool, err := New(createConnection, pooledResources)
    if err != nil {
        log.Println(err)
    }
    //使用池里的连接完成查询
    for query := 0; query < maxGoroutines; query++ {
        //每个goroutine需要自己复制一份要查询值的副本,不然所有的查询会共享同一个查询变量
        go func(q int) {
            performQuery(q, pool)
            wg.Done()
        }(query)
    }
    //等待goroutine的结束
    wg.Wait()
    //关闭池
    log.Println("Shutdown program.")
    pool.Close()
}

//performQuery用来测试连接的资源池
func performQuery(query int, p *Pool) {
    //从池里请求一个连接
    conn, err := p.Acquire()
    if err != nil {
        log.Println(err)
        return
    }
    //将该连接释放回池里
    defer p.Release(conn)
    //用等待来模拟查询响应
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbconnection).ID)
}

执行上述main函数结果如下:

GOROOT=/opt/go #gosetup
GOPATH=/home/wmj/go #gosetup
/opt/go/bin/go build -o /tmp/___go_build_pool_go /home/wmj/go/src/awesomeProject1/pool.go #gosetup
/tmp/___go_build_pool_go #gosetup
2020/06/02 00:04:13 Acquire: New resource
2020/06/02 00:04:13 Acquire: New resource
2020/06/02 00:04:13 Acquire: New resource
2020/06/02 00:04:13 Create: New Connection 3
2020/06/02 00:04:13 Create: New Connection 2
......
020/06/02 00:04:14 QID[7] CID[2]
2020/06/02 00:04:14 Release Closing
2020/06/02 00:04:14 Close: Connection 2
2020/06/02 00:04:14 Shutdown program.
2020/06/02 00:04:14 Close: Connection 25
2020/06/02 00:04:14 Close: Connection 5

Process finished with exit code 0
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352