go-redis 源码分析:连接池

笔者最近在项目中基于 go-redis 实现 Redis 缓存优化性能。go-redis 是一个 Go 语言实现的 Redis 客户端,既然是网络服务的客户端,为了高效利用有限资源,避免重复创建和销毁网络连接,就必需对其进行管理。而资源管理又是编程领域中的一个重点难点,抱着对是否能利用 Go 语言语法简洁的特点来优雅实现连接池的好奇,笔者决定阅读并分析 go-redis 连接池部分的源码,一探究竟。以下是对源码的分析,分为接口与结构体连接池管理建立与关闭连接获取与放回连接监控统计等5大部分,源码链接


接口与结构体

连接结构体:

type Conn struct {
    netConn net.Conn  // 基于 tcp 的网络连接

    rd *proto.Reader // 根据 Redis 通信协议实现的 Reader
    wr *proto.Writer // 根据 Redis 通信协议实现的 Writer

    Inited    bool // 是否完成初始化
    pooled    bool // 是否放进连接池
    createdAt time.Time // 创建时间
    usedAt    int64 // 使用时间,atomic
}

连接池接口:

type Pooler interface {
    NewConn(context.Context) (*Conn, error) // 创建连接
    CloseConn(*Conn) error // 关闭连接

    Get(context.Context) (*Conn, error) // 获取连接
    Put(*Conn) // 放回连接
    Remove(*Conn, error) // 移除连接

    Len() int // 连接池长度
    IdleLen() int // 空闲连接数量
    Stats() *Stats // 连接池统计

    Close() error // 关闭连接池
}

连接池结构体:

type ConnPool struct {
    opt *Options // 连接池配置

    dialErrorsNum uint32 // 连接错误次数,atomic

    lastDialErrorMu sync.RWMutex // 上一次连接错误锁,读写锁
    lastDialError   error // 上一次连接错误

    queue chan struct{} // 工作连接队列

    connsMu      sync.Mutex // 连接队列锁
    conns        []*Conn // 连接队列
    idleConns    []*Conn // 空闲连接队列
    poolSize     int // 连接池大小
    idleConnsLen int // 空闲连接队列长度

    stats Stats // 连接池统计

    _closed  uint32 // 连接池关闭标志,atomic
    closedCh chan struct{} // 通知连接池关闭通道
}

连接池管理

初始化

var _ Pooler = (*ConnPool)(nil)

func NewConnPool(opt *Options) *ConnPool {
    p := &ConnPool{
        opt: opt,

        queue:     make(chan struct{}, opt.PoolSize),
        conns:     make([]*Conn, 0, opt.PoolSize),
        idleConns: make([]*Conn, 0, opt.PoolSize),
        closedCh:  make(chan struct{}),
    }

    p.checkMinIdleConns()

    if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
        go p.reaper(opt.IdleCheckFrequency)
    }

    return p
}
  1. 创建连接池,传入连接池配置选项参数 opt,工厂函数根据 opt 创建连接池实例。连接池主要依靠以下四个数据结构实现管理和通信:
  • queue: 存储工作连接的缓冲通道
  • conns:存储所有连接的切片
  • idleConns:存储空闲连接的切片
  • closed:用于通知所有协程连接池已经关闭的通道
  1. 检查连接池的空闲连接数量是否满足最小空闲连接数量要求,若不满足,则创建足够的空闲连接。
  2. 若连接池配置选项规定了空闲连接超时检查空闲连接频率,则开启一个清理空闲连接的协程。

关闭

func (p *ConnPool) Close() error {
    if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
        return ErrClosed
    }
    close(p.closedCh)

    var firstErr error
    p.connsMu.Lock()
    for _, cn := range p.conns {
        if err := p.closeConn(cn); err != nil && firstErr == nil {
            firstErr = err
        }
    }
    p.conns = nil
    p.poolSize = 0
    p.idleConns = nil
    p.idleConnsLen = 0
    p.connsMu.Unlock()

    return firstErr
}
  1. 原子性检查连接池是否已经关闭,若没关闭,则将关闭标志置为1
  2. 关闭 closedCh 通道,连接池中的所有协程都可以通过判断该通道是否关闭来确定连接池是否已经关闭。
  3. 连接队列锁上锁,关闭队列中的所有连接,并置空所有维护连接池状态的数据结构,解锁。

过滤

func (p *ConnPool) Filter(fn func(*Conn) bool) error {
    var firstErr error
    p.connsMu.Lock()
    for _, cn := range p.conns {
        if fn(cn) {
            if err := p.closeConn(cn); err != nil && firstErr == nil {
                firstErr = err
            }
        }
    }
    p.connsMu.Unlock()
    return firstErr
}

实质上是遍历连接池中的所有连接,并调用传入的 fn 过滤函数作用在每个连接上,过滤出符合业务要求的连接。

清理

func (p *ConnPool) reaper(frequency time.Duration) {
    ticker := time.NewTicker(frequency)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // It is possible that ticker and closedCh arrive together,
            // and select pseudo-randomly pick ticker case, we double
            // check here to prevent being executed after closed.
            if p.closed() {
                return
            }
            _, err := p.ReapStaleConns()
            if err != nil {
                internal.Logger.Printf("ReapStaleConns failed: %s", err)
                continue
            }
        case <-p.closedCh:
            return
        }
    }
}

func (p *ConnPool) ReapStaleConns() (int, error) {
    var n int
    for {
        p.getTurn()

        p.connsMu.Lock()
        cn := p.reapStaleConn()
        p.connsMu.Unlock()
        p.freeTurn()

        if cn != nil {
            _ = p.closeConn(cn)
            n++
        } else {
            break
        }
    }
    atomic.AddUint32(&p.stats.StaleConns, uint32(n))
    return n, nil
}

func (p *ConnPool) reapStaleConn() *Conn {
    if len(p.idleConns) == 0 {
        return nil
    }

    cn := p.idleConns[0]
    if !p.isStaleConn(cn) {
        return nil
    }

    p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
    p.idleConnsLen--
    p.removeConn(cn)

    return cn
}
  1. 开启一个用于检查并清理过期连接的 goroutine 每隔 frequency 时间遍历检查连接池中是否存在过期连接,并清理。
  2. 创建一个时间间隔为 frequency 的计时器,在连接池关闭时关闭该计时器
  3. 循环判断计时器是否到时和连接池是否关闭
  4. 移除空闲连接队列中的过期连接

建立与关闭连接

建立连接

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
    cn, err := p.dialConn(ctx, pooled)
    if err != nil {
        return nil, err
    }

    p.connsMu.Lock()
    p.conns = append(p.conns, cn)
    if pooled {
        // If pool is full remove the cn on next Put.
        if p.poolSize >= p.opt.PoolSize {
            cn.pooled = false
        } else {
            p.poolSize++
        }
    }
    p.connsMu.Unlock()
    return cn, nil
}

func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }

    if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
        return nil, p.getLastDialError()
    }

    netConn, err := p.opt.Dialer(ctx)
    if err != nil {
        p.setLastDialError(err)
        if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
            go p.tryDial()
        }
        return nil, err
    }

    cn := NewConn(netConn)
    cn.pooled = pooled
    return cn, nil
}

func (p *ConnPool) tryDial() {
    for {
        if p.closed() {
            return
        }

        conn, err := p.opt.Dialer(context.Background())
        if err != nil {
            p.setLastDialError(err)
            time.Sleep(time.Second)
            continue
        }

        atomic.StoreUint32(&p.dialErrorsNum, 0)
        _ = conn.Close()
        return
    }
}

创建连接流程图:


newConn流程图.png

DialConn流程图.png

移除与关闭连接

func (p *ConnPool) Remove(cn *Conn, reason error) {
    p.removeConnWithLock(cn)
    p.freeTurn()
    _ = p.closeConn(cn)
}

func (p *ConnPool) CloseConn(cn *Conn) error {
    p.removeConnWithLock(cn)
    return p.closeConn(cn)
}

func (p *ConnPool) removeConnWithLock(cn *Conn) {
    p.connsMu.Lock()
    p.removeConn(cn)
    p.connsMu.Unlock()
}

func (p *ConnPool) removeConn(cn *Conn) {
    for i, c := range p.conns {
        if c == cn {
            p.conns = append(p.conns[:i], p.conns[i+1:]...)
            if cn.pooled {
                p.poolSize--
                p.checkMinIdleConns()
            }
            return
        }
    }
}

func (p *ConnPool) closeConn(cn *Conn) error {
    if p.opt.OnClose != nil {
        _ = p.opt.OnClose(cn)
    }
    return cn.Close()
}

连接池无论移除还是关闭连接,底层调用的都是 removeConnWithLock 函数。removeConnWithLock 函数的工作流程如下:

  1. 连接队列上锁
  2. 遍历连接队列找到要关闭的连接,并将其移除出连接队列
  3. 更新连接池统计数据
  4. 检查连接池最小空闲连接数量
  5. 连接队列解锁
  6. 关闭连接,先执行关闭连接时的回调函数(创建连接池时的配置选项传入),再关闭连接

获取与放回连接

获取

// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }

    err := p.waitTurn(ctx)
    if err != nil {
        return nil, err
    }

    for {
        p.connsMu.Lock()
        cn := p.popIdle()
        p.connsMu.Unlock()

        if cn == nil {
            break
        }

        if p.isStaleConn(cn) {
            _ = p.CloseConn(cn)
            continue
        }

        atomic.AddUint32(&p.stats.Hits, 1)
        return cn, nil
    }

    atomic.AddUint32(&p.stats.Misses, 1)

    newcn, err := p.newConn(ctx, true)
    if err != nil {
        p.freeTurn()
        return nil, err
    }

    return newcn, nil
}

func (p *ConnPool) getTurn() {
    p.queue <- struct{}{}
}

func (p *ConnPool) waitTurn(ctx context.Context) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
    }

    select {
    case p.queue <- struct{}{}:
        return nil
    default:
    }

    timer := timers.Get().(*time.Timer)
    timer.Reset(p.opt.PoolTimeout)

    select {
    case <-ctx.Done():
        if !timer.Stop() {
            <-timer.C
        }
        timers.Put(timer)
        return ctx.Err()
    case p.queue <- struct{}{}:
        if !timer.Stop() {
            <-timer.C
        }
        timers.Put(timer)
        return nil
    case <-timer.C:
        timers.Put(timer)
        atomic.AddUint32(&p.stats.Timeouts, 1)
        return ErrPoolTimeout
    }
}

func (p *ConnPool) freeTurn() {
    <-p.queue
}

func (p *ConnPool) popIdle() *Conn {
    if len(p.idleConns) == 0 {
        return nil
    }

    idx := len(p.idleConns) - 1
    cn := p.idleConns[idx]
    p.idleConns = p.idleConns[:idx]
    p.idleConnsLen--
    p.checkMinIdleConns()
    return cn
}

获取连接流程图:


Get流程图.png

放回

func (p *ConnPool) Put(cn *Conn) {
    if cn.rd.Buffered() > 0 {
        internal.Logger.Printf("Conn has unread data")
        p.Remove(cn, BadConnError{})
        return
    }

    if !cn.pooled {
        p.Remove(cn, nil)
        return
    }

    p.connsMu.Lock()
    p.idleConns = append(p.idleConns, cn)
    p.idleConnsLen++
    p.connsMu.Unlock()
    p.freeTurn()
}
  1. 检查连接中是否还有数据没被读取,若有,移除连接并返回 BadConnError
  2. 判断连接是否已经放入连接池中,若无,直接移除连接
  3. 连接队列上锁,将该连接加入空闲连接队列中,连接队列解锁,工作连接通道移除一个元素

监控统计

监控统计对调整连接池配置选项,优化连接池性能有很大的帮助。

Dial 错误统计

func (p *ConnPool) setLastDialError(err error) {
    p.lastDialErrorMu.Lock()
    p.lastDialError = err
    p.lastDialErrorMu.Unlock()
}

func (p *ConnPool) getLastDialError() error {
    p.lastDialErrorMu.RLock()
    err := p.lastDialError
    p.lastDialErrorMu.RUnlock()
    return err
}

由于一般情况下,连接错误记录是读多写少的,所以采用读写锁来保证该记录的并发安全(读写锁在该场景下性能更佳)。

状态统计

// Len returns total number of connections.
func (p *ConnPool) Len() int {
    p.connsMu.Lock()
    n := len(p.conns)
    p.connsMu.Unlock()
    return n
}

// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
    p.connsMu.Lock()
    n := p.idleConnsLen
    p.connsMu.Unlock()
    return n
}

func (p *ConnPool) Stats() *Stats {
    idleLen := p.IdleLen()
    return &Stats{
        Hits:     atomic.LoadUint32(&p.stats.Hits),
        Misses:   atomic.LoadUint32(&p.stats.Misses),
        Timeouts: atomic.LoadUint32(&p.stats.Timeouts),

        TotalConns: uint32(p.Len()),
        IdleConns:  uint32(idleLen),
        StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
    }
}
  • func (p *ConnPool) Len() int {...}:返回连接池连接数量总数
  • func (p *ConnPool) IdleLen() int {...}:返回连接池空闲连接数量
  • Stats:
    Hits 连接池命中空闲连接次数
    Misses 连接池没有空闲连接可用次数
    Timeouts 请求连接等待超时次数
    TotalConns 连接池总连接数量
    IdleConns 连接池空闲连接数量
    StaleConns 移除过期连接数量
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351