所谓对象池,即一个放对象的池子。目的是为了复用对象,以减少创建对象的开销,如连接池、线程池等。
commons-pool2是apache下的一款对象池开源组件,在学习它的原理前,首先考虑下如果我们自实现对象池,会有哪些问题需要考虑?
底层用什么数据结构来做对象池的容器?
对象池要有什么属性,支持哪些方法?
对象在对象池中的生命周期是什么样的?
从对象池获取/归还的步骤?
接下来我们带着这些问题去学习commons-pool2。
首先看下专门用来生成对象的抽象工厂类BasePooledObjectFactory,它实现了以下接口:
/**
* Creates an instance that can be served by the pool and wrap it in a
* {@link PooledObject} to be managed by the pool.
*
* @return a {@code PooledObject} wrapping an instance that can be served by the pool
*
* @throws Exception if there is a problem creating a new instance,
* this will be propagated to the code requesting an object.
*/
PooledObject makeObject()throws Exception;
/**
* Destroys an instance no longer needed by the pool.
* <p>* It is important for implementations of this method to be aware that there
* is no guarantee about what state <code>obj</code>will be in and the
* implementation should be prepared to handle unexpected errors.
* </p> * <p>* Also, an implementation must take in to consideration that instances lost
* to the garbage collector may never be destroyed.
* </p>*
* @param p a {@code PooledObject} wrapping the instance to be destroyed
*
* @throws Exception should be avoided as it may be swallowed by
* the pool implementation.
*
* @see #validateObject
* @see ObjectPool#invalidateObject
*/
void destroyObject(PooledObject p)throws Exception;
/**
* Ensures that the instance is safe to be returned by the pool.
*
* @param p a {@code PooledObject} wrapping the instance to be validated
*
* @return <code>false</code> if <code>obj</code>is not valid and should
* be dropped from the pool, <code>true</code>otherwise.
*/
boolean validateObject(PooledObject p);
/**
* Reinitializes an instance to be returned by the pool.
*
* @param p a {@code PooledObject} wrapping the instance to be activated
*
* @throws Exception if there is a problem activating <code>obj</code>,
* this exception may be swallowed by the pool.
*
* @see #destroyObject
*/
void activateObject(PooledObject p)throws Exception;
/**
* Uninitializes an instance to be returned to the idle object pool.
*
* @param p a {@code PooledObject} wrapping the instance to be passivated
*
* @throws Exception if there is a problem passivating <code>obj</code>,
* this exception may be swallowed by the pool.
*
* @see #destroyObject
*/
void passivateObject(PooledObject p)throws Exception;
前三个方法很容易理解,后面的activateObject和passivateObject看注释是从池中获取对象后再做一次初始化、归还对象给池前做一次取消初始化。也就是说通过实现这2个方法,实现在获取、归还对象前做一些操作。
抽象类BasePooledObjectFactory只实现了makeObject方法:
@Override
public PooledObject<T> makeObject() throws Exception {
return wrap(create());
}
其中,wrap是抽象方法,由子类实现。这个方法就是用PooledObject封装池中的对象,而DefaultPooledObject就是PooledObject的实现类。
private final T object;
private PooledObjectState state = PooledObjectState.IDLE; // @GuardedBy("this") to ensure transitions are valid
private final long createTime = System.currentTimeMillis();
private volatile long lastBorrowTime = createTime;
private volatile long lastUseTime = createTime;
private volatile long lastReturnTime = createTime;
private volatile boolean logAbandoned = false;
private volatile CallStack borrowedBy = NoOpCallStack.INSTANCE;
private volatile CallStack usedBy = NoOpCallStack.INSTANCE;
private volatile long borrowedCount = 0;
看类的属性可知DefaultPooledObject类主要做一些封装,如创建时间、归还时间等属性。
生成池化对象的工厂类BasePooledObjectFactory比较简单,接下来我们对象池的核心实现类GenericObjectPool,首先看下它的属性:
父类BaseGenericObjectPool属性:
看属性名我们知道,对象池中有对象最大数量、最小空闲数量、空闲对象集合、所有对象映射map、创建/获取/归还/空闲时是否进行检测属性、驱逐属性等等,基本上大家在使用连接池都有过相关设置。
再看下前面的第一个问题:底层用什么数据结构来做对象池的容器?这里使用的是双端阻塞队列LinkedBlockingDeque,对象池需要有新增、移除、阻塞等方法,而这里新增对象时又有先进先出、后进先出的选择(lifo属性),所以选择LinkedBlockingDeque是比较合理的。下面主要看下获取、归还对象的实现:
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
//检查池是否打开
assertOpen();
//配置了AbandonedConfig(防泄漏检查配置)并且相关条件满足后,会移除相关废弃对象(满足配置条件)
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
removeAbandoned(ac);
}
PooledObject<T> p = null;
// Get local copy of current config so it is consistent for entire
// method execution
final boolean blockWhenExhausted = getBlockWhenExhausted();
boolean create;
final long waitTime = System.currentTimeMillis();
while (p == null) {
create = false;
//尝试获取对象
p = idleObjects.pollFirst();
if (p == null) {
//获取不到则创建对象
p = create();
if (p != null) {
create = true;
}
}
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst();
} else {
//指定时间内获取池内对象
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
} else {
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
}
if (!p.allocate()) {
p = null;
}
if (p != null) {
try {
//在返回对象给调用者前,调用激活对象方法
factory.activateObject(p);
} catch (final Exception e) {
try {
destroy(p);
} catch (final Exception e1) {
// Ignore - activation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
if (p != null && getTestOnBorrow()) {
boolean validate = false;
Throwable validationThrowable = null;
try {
//校验对象有效性
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {
try {
destroy(p);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
//更新一些状态,如对象借出数、空闲时间、等待时间等
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
}
创建对象create()
private PooledObject<T> create() throws Exception {
int localMaxTotal = getMaxTotal();
// This simplifies the code later in this method
if (localMaxTotal < 0) {
localMaxTotal = Integer.MAX_VALUE;
}
final long localStartTimeMillis = System.currentTimeMillis();
final long localMaxWaitTimeMillis = Math.max(getMaxWaitMillis(), 0);
// Flag that indicates if create should:
// - TRUE: call the factory to create an object
// - FALSE: return null
// - null: loop and re-test the condition that determines whether to
// call the factory
Boolean create = null;
while (create == null) {
synchronized (makeObjectCountLock) {
final long newCreateCount = createCount.incrementAndGet();
if (newCreateCount > localMaxTotal) {
// The pool is currently at capacity or in the process of
// making enough new objects to take it to capacity.
createCount.decrementAndGet();
if (makeObjectCount == 0) {
// There are no makeObject() calls in progress so the
// pool is at capacity. Do not attempt to create a new
// object. Return and wait for an object to be returned
create = Boolean.FALSE;
} else {
// There are makeObject() calls in progress that might
// bring the pool to capacity. Those calls might also
// fail so wait until they complete and then re-test if
// the pool is at capacity or not.
//若池已满,且有对象在创建,则等待一定时间
makeObjectCountLock.wait(localMaxWaitTimeMillis);
}
} else {
// The pool is not at capacity. Create a new object.
makeObjectCount++;
create = Boolean.TRUE;
}
}
// Do not block more if maxWaitTimeMillis is set.
if (create == null &&
(localMaxWaitTimeMillis > 0 &&
System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {
create = Boolean.FALSE;
}
}
if (!create.booleanValue()) {
return null;
}
final PooledObject<T> p;
try {
//创建对象
p = factory.makeObject();
if (getTestOnCreate() && !factory.validateObject(p)) {
createCount.decrementAndGet();
return null;
}
} catch (final Throwable e) {
createCount.decrementAndGet();
throw e;
} finally {
synchronized (makeObjectCountLock) {
//创建完毕,唤醒等待makeObjectCountLock的线程
makeObjectCount--;
makeObjectCountLock.notifyAll();
}
}
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getLogAbandoned()) {
p.setLogAbandoned(true);
p.setRequireFullStackTrace(ac.getRequireFullStackTrace());
}
createdCount.incrementAndGet();
allObjects.put(new IdentityWrapper<>(p.getObject()), p);
return p;
}
归还方法returnObject
@Override
public void returnObject(final T obj) {
final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
if (p == null) {
//如果AbandonedConfig配置不为空且对象池中没有此对象,则抛异常
if (!isAbandonedConfig()) {
throw new IllegalStateException(
"Returned object not currently part of this pool");
}
return; // Object was abandoned and removed
}
markReturningState(p);
final long activeTime = p.getActiveTimeMillis();
if (getTestOnReturn() && !factory.validateObject(p)) {
try {
//清理对象
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
//保持空闲对象逻辑
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
try {
//钝化对象,可以做一些清理操作
factory.passivateObject(p);
} catch (final Exception e1) {
swallowException(e1);
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
if (!p.deallocate()) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
final int maxIdleSave = getMaxIdle();
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
} else {
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}
updateStatsReturn(activeTime);
}
目前已对对象池原理做了一些简析,通过以上的分析,基本解决最初提出的几个问题了。