何为连接池
连接池是负责分配、管理和释放连接,它允许应用程序重复使用池中的空闲的连接,而不是每次都重新建立一个连接。 本质就是管理了一堆长链接,提供给需求方相应的句柄使用。
连接池有何用
减少网络io开销
减少了每次连接三次握手和四次挥手的开销。连接复用,自然减少了创建,关闭套接字等流程。提升系统性能控制资源
如果没有连接池管理,如每次请求,协程都创建一个连接,那么当请求量巨大时,产生非常大的浪费并且可能会导致高负载下的异常发生,最终导致所有服务都不可用。这就是为什么很多存储都会有一层proxy来管理,不让业务服务直接和存储连接。简化编程
使用者只需关心如何获取和返回的方法,无需关心底层连接、避免资源泄漏等问题
redigo是如何实现v1.8.4
首先redigo不支持cluster,作者也不打算支持,所以建议还是选择go-redis
package main
import (
"fmt"
red "github.com/gomodule/redigo/redis"
"time"
)
type Redis struct {
pool *red.Pool
}
var redis *Redis
func Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) {
con := redis.pool.Get()
// connct
if err := con.Err(); err != nil {
return nil, err
}
defer con.Close()
parmas := make([]interface{}, 0)
parmas = append(parmas, key)
if len(args) > 0 {
for _, v := range args {
parmas = append(parmas, v)
}
}
return con.Do(cmd, parmas...)
}
func initRedis() {
redis = new(Redis)
redis.pool = &red.Pool{
MaxIdle: 2, //空闲数
IdleTimeout: 240 * time.Second,
MaxActive: 0, //最大数
Dial: func() (red.Conn, error) {
c, err := red.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: func(c red.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func main() {
initRedis()
Exec("set", "dandy", "hello")
result, err := Exec("get", "dandy")
if err != nil {
fmt.Print(err.Error())
}
str, _ := red.String(result, err)
fmt.Print(str)
redis.pool.Close()
}
初始化redis.pool
type Pool struct {
// Dial conn中Dial调用初始化
Dial func() (Conn, error)
// 带有context的Dial,2选1即可
DialContext func(ctx context.Context) (Conn, error)
// 获取连接池中,校验连接是否可用,一般和PING、PONG使用
TestOnBorrow func(c Conn, t time.Time) error
// 连接池中最大空闲数
MaxIdle int
// 连接池中保持活跃的数,0没有限制
MaxActive int
// 空闲检查时间
IdleTimeout time.Duration
// wait设置为true并且pool中活跃数到达设置的最大值,直到连接池中有可用连接,get()才返回
Wait bool
// 设置连接最大存活时间 0无限制
MaxConnLifetime time.Duration
// 统计、队列等使用
mu sync.Mutex // mu protects the following fields
closed bool // set to true when the pool is closed.
active int // the number of open connections in the pool
initOnce sync.Once // the init ch once func
ch chan struct{} // limits open connections when p.Wait is true
idle idleList // idle connections
waitCount int64 // total number of connections waited for.
waitDuration time.Duration // total time waited for new connections.
}
Pool获取连接
Get获取
源码 pool.go
func (p *Pool) Get() Conn
- wait设置等待
select {
case <-p.ch://当连接池满时,会阻塞等待,直到有空闲连接
select {
case <-ctx.Done():
p.ch <- struct{}{}
return 0, ctx.Err()
default:
}
case <-ctx.Done():
return 0, ctx.Err()
}
当pool中设置了Wait,当连接满时(p.ch获取不到数据),会等待直到池中有空闲连接,就会通知ch
看activeConn.close()会调用Pool.put(),此时连接池将会有空闲连接,并且通知刚才等待的Wait ch
if p.ch != nil && !p.closed {
// 通知等待ch
p.ch <- struct{}{}
}
- 空闲时间判断
if p.IdleTimeout > 0 {
n := p.idle.count
// 只需从尾部back判断即可验证是否过期
// 如果过期,删除尾部,释放该连接,并且继续遍历
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back
p.idle.popBack()
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
}
我们可以先大致先看内部连接池双向链表的管理点击跳转,也许这样你会很容易的理解。
这里idletimeout遍历整个链表,因为idle.back.t为最早插入的时间,所以只需要检查尾部back即可。
- 从连接池头部后取空闲连接
for p.idle.front != nil {
pc := p.idle.front
// 取出头部
p.idle.popFront()
p.mu.Unlock()
// 校验连接是否正常,一般我们设置回调ping取检验,
// 自然每次都多了一次请求,性能消耗
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
// 返回可用的连接
return &activeConn{p: p, pc: pc}, nil
}
// 校验不通过,自然释放该连接
pc.c.Close()
p.mu.Lock()
p.active--
}
因为上面的条件如果都校验成功,说明链表头部有数据,我们只需pop出来,之后返回activeConn,即我们成功后去了一个连接。注意,这里activeConn很关键,里头将最早初始化创建的p(pool)存入,并且将pc(poolConn)即从链表中取出的数据存起来。基本上ac(activeConn)涵盖了后续所有可以操作的数据。 详细pc我们可以看下面
- 开始链表肯定是空的,如何获取连接
p.active++
p.mu.Unlock()
// 这里拨号,即调用我们一开始注册的回调pool中的Dial
// 这里就是创建线程池开始创建连接,即conn的管理
c, err := p.dial(ctx)
if err != nil {
p.mu.Lock()
p.active--
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
// 返回错误连接
return errorConn{err}, err
}
// pc中c为conn.go中conn的存储。
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
这里就是一开始,我们调用dial创建连接,并返回activConn得到连接。后续我们会分析conn.go
pool 中链表put的存放
func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
if !p.closed && !forceClose {
pc.t = nowFunc()
// 报存队列
p.idle.pushFront(pc)
// 超过设置,pop出时间有效时间最小的back连接
if p.idle.count > p.MaxIdle {
pc = p.idle.back
p.idle.popBack()
} else {
pc = nil
}
}
// back该连接不保存,直接关闭
if pc != nil {
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
// 上述以说明,配合wait设置使用
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
return nil
}
当Pool.get获取的连接,并没有保存在连接池中,而是当activeConn.Close()时,才调用put,保存连接。至此,pool中核心功能都已准备完毕。
idleList连接池管理
- pushFront链表存储
func (l *idleList) pushFront(pc *poolConn) {
// 这里记住,idleList中front和back始终指向
// 的是连接池中的头部和尾部
// 1 新的pc尾指针指向链表头部
pc.next = l.front
pc.prev = nil
if l.count == 0 {
// 0.当连接池为空,头尾都指向改连接
l.back = pc
} else {
// 2. 链表头前驱指针指向pc
l.front.prev = pc
}
// 3.修改l中front指向为新插入的pc
l.front = pc
l.count++
}
-
popFront删除链表
conn.go
- 连接创建
// 调用net/dial.go库进行连接
netConn, err := do.dialContext(ctx, network, address)
if err != nil {
return nil, err
}
c := &conn{
// 暂时我们研究的是返回:TCPConn
conn: netConn,
// bufio写的也很好,后续对其分析
bw: bufio.NewWriter(netConn),
br: bufio.NewReader(netConn),
readTimeout: do.readTimeout,
writeTimeout: do.writeTimeout,
}
- 之后的activeConn调用的Do方法就是调用conn中的Do
if cmd != "" {
// RESP协议组包
if err := c.writeCommand(cmd, args); err != nil {
return nil, c.fatal(err)
}
}
// bufio用法,里头Write为interface实际为TCPConn的操作
if err := c.bw.Flush(); err != nil {
return nil, c.fatal(err)
}
var deadline time.Time
if readTimeout != 0 {
deadline = time.Now().Add(readTimeout)
}
// read过期检测
if err := c.conn.SetReadDeadline(deadline); err != nil {
return nil, c.fatal(err)
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
var e error
// 获取redis服务回包数据
if reply, e = c.readReply(); e != nil {
return nil, c.fatal(e)
}
if e, ok := reply.(Error); ok && err == nil {
err = e
}
}
Do方法调用的是DoWithTimeout,这里发起RESP协议组包,并发送数据给redis服务端,之后读取redis服务器返回的数据。
大家如果觉得有啥疑惑或者不正确,都可以在评论或者加微信(dandyhzh)一起谈论。