redigo连接池源码解析

何为连接池

连接池是负责分配、管理和释放连接,它允许应用程序重复使用池中的空闲的连接,而不是每次都重新建立一个连接。 本质就是管理了一堆长链接,提供给需求方相应的句柄使用。

连接池有何用

  • 减少网络io开销
    减少了每次连接三次握手和四次挥手的开销。连接复用,自然减少了创建,关闭套接字等流程。提升系统性能

  • 控制资源
    如果没有连接池管理,如每次请求,协程都创建一个连接,那么当请求量巨大时,产生非常大的浪费并且可能会导致高负载下的异常发生,最终导致所有服务都不可用。这就是为什么很多存储都会有一层proxy来管理,不让业务服务直接和存储连接。

  • 简化编程
    使用者只需关心如何获取和返回的方法,无需关心底层连接、避免资源泄漏等问题

redigo是如何实现v1.8.4

首先redigo不支持cluster,作者也不打算支持,所以建议还是选择go-redis

package main
import (
    "fmt"
    red "github.com/gomodule/redigo/redis"
    "time"
)

type Redis struct {
    pool *red.Pool
}

var redis *Redis
func Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) {
    con := redis.pool.Get()
    // connct
    if err := con.Err(); err != nil {
        return nil, err
    }
    defer con.Close()
    parmas := make([]interface{}, 0)
    parmas = append(parmas, key)

    if len(args) > 0 {
        for _, v := range args {
            parmas = append(parmas, v)
        }
    }
    return con.Do(cmd, parmas...)
}

func initRedis() {
    redis = new(Redis)
    redis.pool = &red.Pool{
        MaxIdle:     2, //空闲数
        IdleTimeout: 240 * time.Second,
        MaxActive:   0, //最大数
        Dial: func() (red.Conn, error) {
            c, err := red.Dial("tcp", "127.0.0.1:6379")
            if err != nil {
                return nil, err
            }
            return c, err
        },
        TestOnBorrow: func(c red.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err
        },
    }
}

func main() {
    initRedis()
    Exec("set", "dandy", "hello")
    result, err := Exec("get", "dandy")
    if err != nil {
        fmt.Print(err.Error())
    }
    str, _ := red.String(result, err)
    fmt.Print(str)
    redis.pool.Close()
}

初始化redis.pool

type Pool struct {
    // Dial conn中Dial调用初始化
    Dial func() (Conn, error)
    // 带有context的Dial,2选1即可
    DialContext func(ctx context.Context) (Conn, error)
    // 获取连接池中,校验连接是否可用,一般和PING、PONG使用
    TestOnBorrow func(c Conn, t time.Time) error
    // 连接池中最大空闲数
    MaxIdle int
    // 连接池中保持活跃的数,0没有限制
    MaxActive int
    // 空闲检查时间
    IdleTimeout time.Duration
    // wait设置为true并且pool中活跃数到达设置的最大值,直到连接池中有可用连接,get()才返回 
    Wait bool
    //  设置连接最大存活时间 0无限制
    MaxConnLifetime time.Duration
    // 统计、队列等使用
    mu           sync.Mutex    // mu protects the following fields
    closed       bool          // set to true when the pool is closed.
    active       int           // the number of open connections in the pool
    initOnce     sync.Once     // the init ch once func
    ch           chan struct{} // limits open connections when p.Wait is true
    idle         idleList      // idle connections
    waitCount    int64         // total number of connections waited for.
    waitDuration time.Duration // total time waited for new connections.
}

Pool获取连接

Get获取

源码 pool.go
func (p *Pool) Get() Conn 
  • wait设置等待
select {
  case <-p.ch://当连接池满时,会阻塞等待,直到有空闲连接
    select {
    case <-ctx.Done():
      p.ch <- struct{}{}
      return 0, ctx.Err()
    default:
    }
  case <-ctx.Done():
    return 0, ctx.Err()
}

当pool中设置了Wait,当连接满时(p.ch获取不到数据),会等待直到池中有空闲连接,就会通知ch

看activeConn.close()会调用Pool.put(),此时连接池将会有空闲连接,并且通知刚才等待的Wait ch

if p.ch != nil && !p.closed {
        // 通知等待ch
    p.ch <- struct{}{}
}
  • 空闲时间判断
if p.IdleTimeout > 0 {
        n := p.idle.count
        // 只需从尾部back判断即可验证是否过期
        // 如果过期,删除尾部,释放该连接,并且继续遍历
        for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
            pc := p.idle.back
            p.idle.popBack()
            p.mu.Unlock()
            pc.c.Close()
            p.mu.Lock()
            p.active--
        }
}

我们可以先大致先看内部连接池双向链表的管理点击跳转,也许这样你会很容易的理解。

这里idletimeout遍历整个链表,因为idle.back.t为最早插入的时间,所以只需要检查尾部back即可。

  • 从连接池头部后取空闲连接
for p.idle.front != nil {
    pc := p.idle.front
    // 取出头部
    p.idle.popFront()
    p.mu.Unlock()
    // 校验连接是否正常,一般我们设置回调ping取检验,
    // 自然每次都多了一次请求,性能消耗
    if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
      (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
      // 返回可用的连接
      return &activeConn{p: p, pc: pc}, nil
    }
    // 校验不通过,自然释放该连接
    pc.c.Close()
    p.mu.Lock()
    p.active--
}

因为上面的条件如果都校验成功,说明链表头部有数据,我们只需pop出来,之后返回activeConn,即我们成功后去了一个连接。注意,这里activeConn很关键,里头将最早初始化创建的p(pool)存入,并且将pc(poolConn)即从链表中取出的数据存起来。基本上ac(activeConn)涵盖了后续所有可以操作的数据。 详细pc我们可以看下面

  • 开始链表肯定是空的,如何获取连接
p.active++
p.mu.Unlock()
// 这里拨号,即调用我们一开始注册的回调pool中的Dial
// 这里就是创建线程池开始创建连接,即conn的管理
c, err := p.dial(ctx)
if err != nil {
    p.mu.Lock()
    p.active--
    if p.ch != nil && !p.closed {
        p.ch <- struct{}{}
    }
    p.mu.Unlock()
    // 返回错误连接
    return errorConn{err}, err
}
// pc中c为conn.go中conn的存储。
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil

这里就是一开始,我们调用dial创建连接,并返回activConn得到连接。后续我们会分析conn.go

pool 中链表put的存放

func (p *Pool) put(pc *poolConn, forceClose bool) error {
   p.mu.Lock()
   if !p.closed && !forceClose {
       pc.t = nowFunc()
       // 报存队列
       p.idle.pushFront(pc)
       // 超过设置,pop出时间有效时间最小的back连接
       if p.idle.count > p.MaxIdle {
           pc = p.idle.back
           p.idle.popBack()
       } else {
           pc = nil
       }
   }
   // back该连接不保存,直接关闭
   if pc != nil {
       p.mu.Unlock()
       pc.c.Close()
       p.mu.Lock()
       p.active--
   }
   // 上述以说明,配合wait设置使用
   if p.ch != nil && !p.closed {
       p.ch <- struct{}{}
   }
   p.mu.Unlock()
   return nil
}

当Pool.get获取的连接,并没有保存在连接池中,而是当activeConn.Close()时,才调用put,保存连接。至此,pool中核心功能都已准备完毕。


idleList连接池管理

  • pushFront链表存储
连接池插入.png
func (l *idleList) pushFront(pc *poolConn) {
    // 这里记住,idleList中front和back始终指向
    // 的是连接池中的头部和尾部
    // 1 新的pc尾指针指向链表头部
    pc.next = l.front
    pc.prev = nil
    if l.count == 0 {
        // 0.当连接池为空,头尾都指向改连接
        l.back = pc
    } else {
        // 2. 链表头前驱指针指向pc
        l.front.prev = pc
    }
    // 3.修改l中front指向为新插入的pc
    l.front = pc
    l.count++
}
  • popFront删除链表
    连接池删除.png

conn.go

  • 连接创建
// 调用net/dial.go库进行连接
netConn, err := do.dialContext(ctx, network, address)
if err != nil {
    return nil, err
}
c := &conn{
    // 暂时我们研究的是返回:TCPConn
    conn:         netConn,
    // bufio写的也很好,后续对其分析
    bw:           bufio.NewWriter(netConn),
    br:           bufio.NewReader(netConn),
    readTimeout:  do.readTimeout,
    writeTimeout: do.writeTimeout,
}
  • 之后的activeConn调用的Do方法就是调用conn中的Do
if cmd != "" {
    // RESP协议组包
    if err := c.writeCommand(cmd, args); err != nil {
        return nil, c.fatal(err)
    }
}
// bufio用法,里头Write为interface实际为TCPConn的操作
if err := c.bw.Flush(); err != nil {
    return nil, c.fatal(err)
}

var deadline time.Time
if readTimeout != 0 {
    deadline = time.Now().Add(readTimeout)
}
// read过期检测
if err := c.conn.SetReadDeadline(deadline); err != nil {
    return nil, c.fatal(err)
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
    var e error
    // 获取redis服务回包数据
    if reply, e = c.readReply(); e != nil {
        return nil, c.fatal(e)
    }
    if e, ok := reply.(Error); ok && err == nil {
        err = e
    }
}

Do方法调用的是DoWithTimeout,这里发起RESP协议组包,并发送数据给redis服务端,之后读取redis服务器返回的数据。


大家如果觉得有啥疑惑或者不正确,都可以在评论或者加微信(dandyhzh)一起谈论。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容