数据库事务介绍
在我们使用数据库时,Android为我们提供了事务操作,简而言之就是在事务中,我们执行各种sql语句时,要么都执行成功,但只要其中一个sql语句失败时,那么会触发回滚,我们之前执行完成的sql语句也会失败。在下面代码中,手动抛出一个异常后,第二个sql语句执行失败,那么已经执行完成的第一个sql语句就会回滚。
/**
* 使用事务来进行数据库操作,两种操作要么都完成,要么都失败(事务)
*/
SQLiteDatabase db = dbHelper.getWritableDatabase();
db.beginTransaction();//开启事物
try {
db.delete("book",null,null);
if (true){
//这里手动抛出一个异常,让事物失败
// throw new NullPointerException();//由于我们手动抛出了一个异常,这样添加数据的代码就无法执行了,但是由于事物的存在,此时旧数据也无法删除
}
db.execSQL("insert into book(name,author,pages,price) values(?,?,?,?)", new String[]{
"android ", "tonycheng", "550", "79"
});
db.setTransactionSuccessful();
}finally {
db.endTransaction();
}
源码:
一.db.beginTransaction():
/**
* Begins a transaction in EXCLUSIVE mode.
* <p>
* Transactions can be nested.
* When the outer transaction is ended all of
* the work done in that transaction and all of the nested transactions will be committed or
* rolled back. The changes will be rolled back if any transaction is ended without being
* marked as clean (by calling setTransactionSuccessful). Otherwise they will be committed.
* </p>
* <p>Here is the standard idiom for transactions:
*
* <pre>
* db.beginTransaction();
* try {
* ...
* db.setTransactionSuccessful();
* } finally {
* db.endTransaction();
* }
* </pre>
*/
public void beginTransaction() {
beginTransaction(null /* transactionStatusCallback */, true);
}
/**transactionListener listener that should be notified when the
* transaction begins, commits, or is rolled back, either
* explicitly or by a call to {@link #yieldIfContendedSafely}.
*/
private void beginTransaction(SQLiteTransactionListener transactionListener,
boolean exclusive) {
acquireReference();
try {
getThreadSession().beginTransaction(
exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE :
SQLiteSession.TRANSACTION_MODE_IMMEDIATE,
transactionListener,
getThreadDefaultConnectionFlags(false /*readOnly*/), null);
} finally {
releaseReference();
}
}
beginTransaction的注释大概意思是开启一个事务,事务是可以嵌套的,当一个外层事务的事务结束了,这个事务中所有的工作都将结束,嵌套的事务将被提交或者被回滚。
还有其中回滚的情况就是没有调用setTransactionSuccessful方法,调用了之后,事务将被提交。
TRANSACTION_MODE_EXCLUSIVE:
当一个事务开启时,session获得一个唯一的锁,当持有了这个锁时,只有当前的session可以进行读写。
/**transactionListener listener that should be notified when the
* transaction begins, commits, or is rolled back, either
* explicitly or by a call to {@link #yieldIfContendedSafely}.
*/
private void beginTransaction(SQLiteTransactionListener transactionListener,
boolean exclusive) {
//获取一个参考,mReferenceCount++,当mReferenceCount<=0时,抛出一个尝试去打开已关闭的对象异常
acquireReference();
try {
getThreadSession().beginTransaction(
exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE :
SQLiteSession.TRANSACTION_MODE_IMMEDIATE,
transactionListener,
getThreadDefaultConnectionFlags(false /*readOnly*/), null);
} finally {
releaseReference();
}
}
第三个参数:大概意思是获取一个适当的连接方式的flag
int getThreadDefaultConnectionFlags(boolean readOnly) {
int flags = readOnly ? SQLiteConnectionPool.CONNECTION_FLAG_READ_ONLY :
SQLiteConnectionPool.CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY;
if (isMainThread()) {
flags |= SQLiteConnectionPool.CONNECTION_FLAG_INTERACTIVE;
}
return flags;
}
调用了beginTransaction开启事务之后,默认transactionListener为null,这个listener是监听我们的事务的开始,提交,或者回滚状态。exclusive 默认为false。
getThreadSession()通过ThreadLocal获取当前线程的Session,返回SQLiteSession对象,他提供了一种使用database的能力。
SQLiteSession的beginTransaction:
/**
*@param transactionMode #TRANSACTION_MODE_EXCLUSIVE
*
*/
public void beginTransaction(int transactionMode,
SQLiteTransactionListener transactionListener, int connectionFlags,
CancellationSignal cancellationSignal) {
//检查事务是否被标记成功,如果成功则抛出异常
throwIfTransactionMarkedSuccessful();
beginTransactionUnchecked(transactionMode, transactionListener, connectionFlags,
cancellationSignal);
}
######beginTransactionUnchecked:开启事务
private void beginTransactionUnchecked(int transactionMode,
SQLiteTransactionListener transactionListener, int connectionFlags,
CancellationSignal cancellationSignal) {
if (cancellationSignal != null) {
cancellationSignal.throwIfCanceled();
}
//1.事务为空 获取连接
if (mTransactionStack == null) {
acquireConnection(null, connectionFlags, cancellationSignal); // might throw
}
try {
// Set up the transaction such that we can back out safely
// in case we fail part way.
if (mTransactionStack == null) {
// Execute SQL might throw a runtime exception.
switch (transactionMode) {
case TRANSACTION_MODE_IMMEDIATE:
mConnection.execute("BEGIN IMMEDIATE;", null,
cancellationSignal); // might throw
break;
case TRANSACTION_MODE_EXCLUSIVE:
//2. 开始执行
mConnection.execute("BEGIN EXCLUSIVE;", null,
cancellationSignal); // might throw
break;
default:
mConnection.execute("BEGIN;", null, cancellationSignal); // might throw
break;
}
}
// Listener might throw a runtime exception.
if (transactionListener != null) {
try {
transactionListener.onBegin(); // might throw
} catch (RuntimeException ex) {
if (mTransactionStack == null) {
mConnection.execute("ROLLBACK;", null, cancellationSignal); // might throw
}
throw ex;
}
}
// Bookkeeping can't throw, except an OOM, which is just too bad...
Transaction transaction = obtainTransaction(transactionMode, transactionListener);
transaction.mParent = mTransactionStack;
mTransactionStack = transaction;
} finally {
if (mTransactionStack == null) {
releaseConnection(); // might throw
}
}
}
1.事务为空 获取连接
private void acquireConnection(String sql, int connectionFlags,
CancellationSignal cancellationSignal) {
if (mConnection == null) {
assert mConnectionUseCount == 0;
//1.1先从SQLiteConnectionPool连接池中获取SQLiteConnection对象:
mConnection = mConnectionPool.acquireConnection(sql, connectionFlags,
cancellationSignal); // might throw
mConnectionFlags = connectionFlags;
}
//连接数+1
mConnectionUseCount += 1;
}
1.1 acquireConnection:
public SQLiteConnection acquireConnection(String sql, int connectionFlags,
CancellationSignal cancellationSignal) {
//获取连接
SQLiteConnection con = waitForConnection(sql, connectionFlags, cancellationSignal);
synchronized (mLock) {
if (mIdleConnectionHandler != null) {
mIdleConnectionHandler.connectionAcquired(con);
}
}
return con;
}
private SQLiteConnection waitForConnection(String sql, int connectionFlags,
CancellationSignal cancellationSignal) {
//是否可写连接。可写的连接同一时间只能存在一个。 默认flag传进来此时不为0
final boolean wantPrimaryConnection =
(connectionFlags & CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY) != 0;
final ConnectionWaiter waiter;
final int nonce;
synchronized (mLock) {
throwIfClosedLocked();
// Abort if canceled.
if (cancellationSignal != null) {
cancellationSignal.throwIfCanceled();
}
// Try to acquire a connection.
SQLiteConnection connection = null;
if (!wantPrimaryConnection) {
//1.1.1 尝试获取只读连接
connection = tryAcquireNonPrimaryConnectionLocked(
sql, connectionFlags); // might throw
}
if (connection == null) {
//1.1.2 尝试获取可写连接
connection = tryAcquirePrimaryConnectionLocked(connectionFlags); // might throw
}
//获取到连接 返回
if (connection != null) {
return connection;
}
//没有可用的连接 则进入等待
//在这一步中,用ConnectionWaiter来封装等待中的连接信息,并按优先级放入一个链表,随后进入等待状态。获取到连接后,等待状态结束,返回连接。
// No connections available. Enqueue a waiter in priority order.
//主线程中的连接优先级更高
final int priority = getPriority(connectionFlags);
final long startTime = SystemClock.uptimeMillis();
// waiter是一个ConnectionWaiter对象。它同时也是一个链表,有一个同类的mNext成员变量。
waiter = obtainConnectionWaiterLocked(Thread.currentThread(), startTime,
priority, wantPrimaryConnection, sql, connectionFlags);
ConnectionWaiter predecessor = null;
ConnectionWaiter successor = mConnectionWaiterQueue;
// 按照优先级向mConnectionWaiterQueue添加waitor对象。mConnectionWaiterQueue不是复用池,而是有效的等待队列(也是链表)。
//将优先级高的排在下一个
while (successor != null) {
if (priority > successor.mPriority) {
waiter.mNext = successor;
break;
}
predecessor = successor;
successor = successor.mNext;
}
if (predecessor != null) {
predecessor.mNext = waiter;
} else {
mConnectionWaiterQueue = waiter;
}
//观察recycleConnectionWaiterLocked方法,mNonce在waiter每次被复用完成回收时自增1
nonce = waiter.mNonce;
}
// Set up the cancellation listener.
if (cancellationSignal != null) {
cancellationSignal.setOnCancelListener(new CancellationSignal.OnCancelListener() {
@Override
public void onCancel() {
synchronized (mLock) {
if (waiter.mNonce == nonce) {
//nonce的作用在这里体现。防止waiter对象复用造成误取消。
cancelConnectionWaiterLocked(waiter);
}
}
}
});
}
try {
// Park the thread until a connection is assigned or the pool is closed.
// Rethrow an exception from the wait, if we got one.
long busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS;
long nextBusyTimeoutTime = waiter.mStartTime + busyTimeoutMillis;
for (;;) {
// Detect and recover from connection leaks.
if (mConnectionLeaked.compareAndSet(true, false)) {
synchronized (mLock) {
//有泄露连接被关闭的话,最大连接限制下就可能有位置空出来,这时候就可以通过waiter 尝试分配一个连接
wakeConnectionWaitersLocked();
}
}
// Wait to be unparked (may already have happened), a timeout, or interruption.
//等待。那么unpark在哪里?在wakeConnectionWaitersLocked中。这个方法在上面泄露测试时调用过。
LockSupport.parkNanos(this, busyTimeoutMillis * 1000000L);
// Clear the interrupted flag, just in case.
Thread.interrupted();
// Check whether we are done waiting yet.
synchronized (mLock) {
throwIfClosedLocked();
//等到了一个Connection。这个mAssignedConnection是何时赋值的呢?
//也是在wakeConnectionWaitersLocked中赋值的。
final SQLiteConnection connection = waiter.mAssignedConnection;
final RuntimeException ex = waiter.mException;
if (connection != null || ex != null) {
//回收waiter,会造成mNonce自增1
recycleConnectionWaiterLocked(waiter);
if (connection != null) {
return connection;
}
throw ex; // rethrow!
}
//没拿到连接,继续等。
final long now = SystemClock.uptimeMillis();
if (now < nextBusyTimeoutTime) {
busyTimeoutMillis = now - nextBusyTimeoutTime;
} else {
logConnectionPoolBusyLocked(now - waiter.mStartTime, connectionFlags);
busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS;
nextBusyTimeoutTime = now + busyTimeoutMillis;
}
}
}
} finally {
// Remove the cancellation listener.
if (cancellationSignal != null) {
cancellationSignal.setOnCancelListener(null);
}
}
}
1.1.2 先看尝试获取可写连接 tryAcquirePrimaryConnectionLocked:
private SQLiteConnection tryAcquirePrimaryConnectionLocked(int connectionFlags) {
// If the primary connection is available, acquire it now.
//同时只能存在一个可写连接,用一个成员变量mAvailablePrimaryConnection缓存空闲连接
SQLiteConnection connection = mAvailablePrimaryConnection;
if (connection != null) {
//如果有可用的可写连接,获取到之后就把mAvailablePrimaryConnection缓存置空
mAvailablePrimaryConnection = null;
//保存获取到的可写连接,并通过connectionFlags判断这个连接是否是只读的
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
// Make sure that the primary connection actually exists and has just been acquired.
//如果上一个if造成mAvailablePrimaryConnection缓存置空,mAcquiredConnections中就会有一个primary connection,这里就会返回null。上一层的waitForConnection接到null会进入等待状态,这个后面讨论。
//此处mAcquiredConnections在上一个if中finishAcquireConnectionLocked方法赋值
for (SQLiteConnection acquiredConnection : mAcquiredConnections.keySet()) {
if (acquiredConnection.isPrimaryConnection()) {
return null;
}
}
// Uhoh. No primary connection! Either this is the first time we asked
// for it, or maybe it leaked?
//如果没有在上面返回null,那么这一定是第一次请求primary connnection,或者有一个连接泄露了(未recycle的情况下finalize)。
//去创建并打开一个新的SQLiteConnection。
connection = openConnectionLocked(mConfiguration,
true /*primaryConnection*/); // might throw
//将这个SQLiteConnection保存到mAcquiredConnections这个Map中
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
1.1.1 尝试获取只读连接 tryAcquireNonPrimaryConnectionLocked:
private SQLiteConnection tryAcquireNonPrimaryConnectionLocked(
String sql, int connectionFlags) {
// Try to acquire the next connection in the queue.
SQLiteConnection connection;
//只读连接可以有多个,用一个ArrayList缓存了所有空闲连接
final int availableCount = mAvailableNonPrimaryConnections.size();
//如果有缓存空闲连接 那么选一个sql语句一样的连接,并保存在mAcquiredConnections中
if (availableCount > 1 && sql != null) {
// If we have a choice, then prefer a connection that has the
// prepared statement in its cache.
for (int i = 0; i < availableCount; i++) {
connection = mAvailableNonPrimaryConnections.get(i);
if (connection.isPreparedStatementInCache(sql)) {
mAvailableNonPrimaryConnections.remove(i);
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
}
}
//如果有缓存空闲连接但没有sql语句一样的 那么选最后一个,并保存在mAcquiredConnections中
if (availableCount > 0) {
// Otherwise, just grab the next one.
connection = mAvailableNonPrimaryConnections.remove(availableCount - 1);
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
// 一个空闲连接都没有。
// Expand the pool if needed.
// 计算有多少连接(空闲+使用中)。这里肯定没有空闲non primary连接了,而如果有空闲primary连接,则要 += 1。
int openConnections = mAcquiredConnections.size();
if (mAvailablePrimaryConnection != null) {
openConnections += 1;
}
if (openConnections >= mMaxConnectionPoolSize) {
// 超过数据库连接限制,放弃治疗。连接限制与数据库底层实现有关。
return null;
}
// 没超限,还能再开一个连接。所以开连接并返回。
connection = openConnectionLocked(mConfiguration,
false /*primaryConnection*/); // might throw
// 保存这个连接到mAcquiredConnections中。
finishAcquireConnectionLocked(connection, connectionFlags); // might throw
return connection;
}
2.获取到连接后,SQLiteConnection.execute ,这里貌似是检查sql语句的,最后调用了native方法,这里不关注。
beginTransaction:开启事务的时候传入的readOnly为false,所以SQLiteSession会从SQLiteConnectionPool中获取一个独占的连接。并且在SQLiteSession执行其它SQL语句的情况下,执行完成会将连接释放回连接池,而beginTransaction操作则不会,而是持有这一个连接直至同一线程内调用endTransaction。
二.db.setTransactionSuccessful():
public void setTransactionSuccessful() {
throwIfNoTransaction();
throwIfTransactionMarkedSuccessful();
//主要干了一件事就是设置事务的标记位 mMarkedSuccessful为true
mTransactionStack.mMarkedSuccessful = true;
}
三.endTransaction():
public void endTransaction(CancellationSignal cancellationSignal) {
throwIfNoTransaction();
assert mConnection != null;
endTransactionUnchecked(cancellationSignal, false);
}
private void endTransactionUnchecked(CancellationSignal cancellationSignal, boolean yielding) {
if (cancellationSignal != null) {
cancellationSignal.throwIfCanceled();
}
final Transaction top = mTransactionStack;
//判断是否成功
boolean successful = (top.mMarkedSuccessful || yielding) && !top.mChildFailed;
RuntimeException listenerException = null;
final SQLiteTransactionListener listener = top.mListener;
if (listener != null) {
try {
if (successful) {
//提交
listener.onCommit(); // might throw
} else {
//回滚
listener.onRollback(); // might throw
}
} catch (RuntimeException ex) {
listenerException = ex;
successful = false;
}
}
//创建事务的时候 top.mParent为null
mTransactionStack = top.mParent;
//回收这个事务
recycleTransaction(top);
if (mTransactionStack != null) {
if (!successful) {
mTransactionStack.mChildFailed = true;
}
} else {
try {
if (successful) {
//如果成功 那么调用native方法去执行
mConnection.execute("COMMIT;", null, cancellationSignal); // might throw
} else {
mConnection.execute("ROLLBACK;", null, cancellationSignal); // might throw
}
} finally {
//1.释放连接
releaseConnection(); // might throw
}
}
if (listenerException != null) {
throw listenerException;
}
}
1.释放连接 releaseConnection:
private void releaseConnection() {
assert mConnection != null;
assert mConnectionUseCount > 0;
if (--mConnectionUseCount == 0) {
try {
//mConnection 在我们开启事务的时候获取的连接 默认开启是ReadOnly为false,获取的是primaryConnection 唯一的连接
//这里尝试去释放它
mConnectionPool.releaseConnection(mConnection); // might throw
} finally {
mConnection = null;
}
}
}
public void releaseConnection(SQLiteConnection connection) {
synchronized (mLock) {
if (mIdleConnectionHandler != null) {
//释放连接
mIdleConnectionHandler.connectionReleased(connection);
}
//在缓存里移除这个连接
AcquiredConnectionStatus status = mAcquiredConnections.remove(connection);
if (status == null) {
throw new IllegalStateException("Cannot perform this operation "
+ "because the specified connection was not acquired "
+ "from this pool or has already been released.");
}
if (!mIsOpen) {
closeConnectionAndLogExceptionsLocked(connection);
} else if (connection.isPrimaryConnection()) {
//这里置空我们当时获取并赋值的唯一缓存mAvailablePrimaryConnection
if (recycleConnectionLocked(connection, status)) {
assert mAvailablePrimaryConnection == null;
mAvailablePrimaryConnection = connection;
}
wakeConnectionWaitersLocked();
} else if (mAvailableNonPrimaryConnections.size() >= mMaxConnectionPoolSize - 1) {
closeConnectionAndLogExceptionsLocked(connection);
} else {
if (recycleConnectionLocked(connection, status)) {
mAvailableNonPrimaryConnections.add(connection);
}
wakeConnectionWaitersLocked();
}
}
}