Go commons pool介绍
如果你需要开发一个连接池,用于实现连接对象的重复使用,从而避免频繁的创建和销毁连接,造成响应的延时。那么你完全可以基于Go commons pool 项目快速开始。
Go commons pool是一个通用的go语言对象池,基于Java版本的Apache Commons Pool改写,性能测试结果和Java版本的相近,已经可以用于生产环境。Go commons pool实现了Java版本的主要功能,具体包括:
1、自定义的 PooledObjectFactory。
2、丰富的设置选项,可以精确控制对象的生命周期。详细参看ObjectPoolConfig。
- 对象池是否是 LIFO (后进先出) 或者是 FIFO (先进先出)
- 对象池的容量控制
- 对象池对象的验证配置
- 获取对象时是否阻塞以及最大等待时间配置
- 对象池对象的回收机制配置(支持后台定时任务检测回收)
- 对象池对象的抛弃机制配置(主要用于防止对象池对象借出后未归还,导致对象泄露)
但不包含以下Apache commons pool的功能:
1、KeyedObjectPool 实现
2、ProxiedObjectPool 实现
3、对象池的统计功能
尽管如此,基于Go commons pool构建自己的连接池,是非常方便且足够强大的。
如何构建PooledObjectFactory
构建自定义的对象工厂需要实现一个接口——PooledObjectFactory
type PooledObjectFactory interface {
/**
* Create a pointer to an instance that can be served by the
* pool and wrap it in a PooledObject to be managed by the pool.
*
* return error if there is a problem creating a new instance,
* this will be propagated to the code requesting an object.
*/
MakeObject(ctx context.Context) (*PooledObject, error)
/**
* Destroys an instance no longer needed by the pool.
*/
DestroyObject(ctx context.Context, object *PooledObject) error
/**
* Ensures that the instance is safe to be returned by the pool.
*
* return false if object is not valid and should
* be dropped from the pool, true otherwise.
*/
ValidateObject(ctx context.Context, object *PooledObject) bool
/**
* Reinitialize an instance to be returned by the pool.
*
* return error if there is a problem activating object,
* this error may be swallowed by the pool.
*/
ActivateObject(ctx context.Context, object *PooledObject) error
/**
* Uninitialize an instance to be returned to the idle object pool.
*
* return error if there is a problem passivating obj,
* this exception may be swallowed by the pool.
*/
PassivateObject(ctx context.Context, object *PooledObject) error
}
- MakeObject 方法用于创建一个新的连接对象。
- DestroyObject 用于销毁已有的连接对象
- ValidateObject 用于检查对象是否可用。用于自动销毁不可用对象,或借出连接等操作的自动检查等。
上述接口方法的实现,应基于自己的业务需要进行开发。例如我们想为baidurpc 的 client chuang创建一个连接池,实现案例可参考:
//声明连接池对象结构
type ClientPoolObject struct {
name string
client *baidurpc.RpcClient
conn *baidurpc.TCPConnection
}
//首先创建自定义的ObjFactory
type PoolFactory struct {
log logger.Logger
host string
port int
timeout *time.Duration
}
//实现接口PooledObjectFactory
func (pf *PoolFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
url := baidurpc.URL{}
url.SetHost(&pf.host).SetPort(&pf.port)
connection, err := baidurpc.NewTCPConnection(url, pf.timeout)
if err != nil {
return nil, errNewConnectionFail
}
pf.log.Debugf("do baidurpc.NewTCPConnection ok")
// 创建client
rpcClient, err := baidurpc.NewRpcCient(connection)
if err != nil {
pf.log.Errorf("NewRpcClient re error %s", err.Error())
return nil, errNewRpcClientFail
}
obj := pool.NewPooledObject(&ClientPoolObject{
name: addr,
client: rpcClient,
conn: connection,
})
pf.log.Debugf("the new object = %s", utils.ToJSON(obj))
return obj, nil
}
func (pf *PoolFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
obj := object.Object
if obj != nil {
p := obj.(*ClientPoolObject)
p.client.Close()
err := p.conn.Close()
if err != nil{
pf.log.Errorf("destroy the obj the connect {%s} error:%s",p.name,err.Error())
}
pf.log.Debugf("destroy the obj %s", p.name)
}
return nil
}
func (pf *PoolFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
obj := object.Object
if obj == nil {
return false
}
c := obj.(*ClientPoolObject)
return c.conn.TestConnection() == nil
}
func (pf *PoolFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
return nil
}
func (pf *PoolFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
return nil
}
连接池的使用和参数说明
连接池在使用时,需要首先利用自己构造的objFactory 创建 ObkectPool
p := pool.NewObjectPool(context.Background(), &PoolFactory{
log: log,
host: "127.0.0.1",
port: 50000,
timeout: &defaultTimeOut,
}, pool.NewDefaultPoolConfig())
//借出对象
obj, err := p.BorrowObject(ctx)
if err != nil {
return fmt.Errorf("do BorrowObject error:%w ", err)
}
//用完后放回对象
defer func() {
err = b.connPool.ReturnObject(ctx, obj)
if err != nil {
fmt.Printf("[ERROR] do BorrowObject error:%s ", err.Error())
}
}()
// TODO 具体操作逻辑
连接池的配置参数说明:
1、LIFO:连接池是否具有 LIFO(后进先出)行为TRUE:作为 LIFO(后进先出)队列总是返回空闲对象池中最近使用的对象。FALSE:作为 FIFO(先进先出)队列总是返回空闲对象池中最旧的对象。
2、MaxTotal:连接池允许分配对象的上限。为负值时表示无限制。
3、MaxIdle:连接池中空闲对象的上限。如果 MaxIdle在重负载系统上设置得太低,可能会看到对象被销毁后,立即有新对象被创建。这是活跃的 goroutine 返回请求结果的速度比发起请求的速度更快,导致空闲的数量超过 maxIdle 的对象。 maxIdle 的最佳值在不同的系统会有所不同。
4、MinIdle:连接池要维护的最小空闲对象数此设置仅TimeBetweenEvictionRuns 大于零的情况下有效。用于尝试确保连接池在空闲对象驱逐运行期间保留所需的最小实例数如果 MinIdle 的配置值大于 MaxIdle 的配置值则将使用 MaxIdle 的值。
5、TestOnCreate:是否会在连接池创建对象之前验证可用性验证方法是由对象工厂的 ValidateObject()方法提供。当执行 ObjectPool.BorrowObject()期间刚好需要创建新的对象时,则创建时执行校验如果校验失败则 ObjectPool.BorrowObject() 返回错误。
6、TestOnBorrow:是否会在连接池借出对象之前验证可用性验证方法是由对象工厂的 ValidateObject()方法提供。如果校验失败则销毁获取的连接并重新获取新的对象。
7、TestOnReturn:是否会在连接池放回对象之前验证可用性。验证方法是由对象工厂的 ValidateObject()方法提供。如果校验失败则销毁获取的连接而不是放回该连接。
8、TestWhileIdle:在池中闲置的对象会被空闲对象驱逐器验证可用性(如果有的话 - 见TimeBetweenEvictionRuns )。验证方法是由对象工厂的 ValidateObject()方法提供。如果对象验证失败,对象将从连接池中移除并销毁。请注意,单独设置此属性无效除非通过设置 TimeBetweenEvictionRuns为正值。
9、BlockWhenExhausted:当活跃的商品池已经达到最大参数的限制时,执行 ObjectPool.BorrowObject() 是否阻塞。
10、MinEvictableIdleTime:对象可以在池中闲置的最短时间,如果该参数为正值,对象驱逐器执行过程中会将存活时间超过这个参数值的对象销毁。若为非正值则不会因为空闲时间而将任何对象从池中逐出。
11、SoftMinEvictableIdleTime:当前空闲连接数量大于 MinIdle 时,对象可以在池中闲置的最短时间如果MinEvictableIdleTime 为正数,SoftMinEvictableIdleTime 被忽略。
12、NumTestsPerEvictionRun:每次驱逐器运行期间要检查的最大对象数(如果驱逐策略开启)如果该参数为正值,检测对象数量为该参数值和当前空闲连接数中较小的值。如果该参数为负值,检测次数执行将是 math.Ceil(ObjectPool.GetNumIdle()/math.Abs(PoolConfig.NumTestsPerEvictionRun)) 这意味着当值为 -n 时大约有 n 分之一的空闲对象将被检测。
13、EvictionPolicyName:连接池使用的驱逐器策略名称。
如果使用自定义策略,需要先注册:RegistryEvictionPolicy(name, policy)默认驱逐策略:
if (config.IdleSoftEvictTime < idleTime &&
config.MinIdle < idleCount) ||
config.IdleEvictTime < idleTime {
return true
}
return false
14、TimeBetweenEvictionRuns:对象驱逐器后台执行的时间间隔。如果ObjectPool创建后相关参数修改后,需要调用方法ObjectPool.StartEvictor()使其生效。
15、EvitionContext:后台执行对象驱逐器的的 context.Context。