ReaderWriterLockSlim支持递归进入ReaderLock或WriterLock,但下面的分析不打算涉及这种情况。
如何读锁不互斥
TryEnterReadLockCore(TimeoutTracker timeout)
{
.......
// step1
EnterMyLock();
lrwc = GetThreadRWCount(false);
if (lrwc.readercount > 0)
{
ExitMyLock();
throw new LockRecursionException(SR.GetString(SR.LockRecursionException_RecursiveReadNotAllowed));
}
.......
// step 2
for (; ; )
{
// We can enter a read lock if there are only read-locks have been given out
// and a writer is not trying to get in.
// 2.1
if (owners < MAX_READER)
{
// Good case, there is no contention, we are basically done
owners++; // Indicate we have another reader
lrwc.readercount++;
break;
}
// 2.2
if (spincount < MaxSpinCount)
{
ExitMyLock();
if (timeout.IsExpired)
return false;
spincount++;
SpinWait(spincount);
EnterMyLock();
//The per-thread structure may have been recycled as the lock is acquired (due to message pumping), load again.
if(IsRwHashEntryChanged(lrwc))
lrwc = GetThreadRWCount(false);
continue;
}
// 2.3
// Drat, we need to wait. Mark that we have waiters and wait.
if (readEvent == null) // Create the needed event
{
LazyCreateEvent(ref readEvent, false);
if (IsRwHashEntryChanged(lrwc))
lrwc = GetThreadRWCount(false);
continue; // since we left the lock, start over.
}
retVal = WaitOnEvent(readEvent, ref numReadWaiters, timeout);
if (!retVal)
{
return false;
}
if (IsRwHashEntryChanged(lrwc))
lrwc = GetThreadRWCount(false);
}
// step 3
ExitMyLock();
}
第一步: 进入轻量锁
private void EnterMyLock()
{
if (Interlocked.CompareExchange(ref myLock, 1, 0) != 0)
EnterMyLockSpin();
}
这个锁不是普通的lock。它通过原子操作设置int类型lock,只有一个线程能设置成功,其他线程通过自旋和短暂sleep来等待下次设置生效。
要注意,其他线程不管是EnterReadLock,或者是EnterWriteLock,都会在这里被栏住,直到ExitMyLock被调用。
之后获取读写计数 GetThreadRWCount, 每个线程维护自己的ReaderWriterCount,如果一个线程使用了多个ReaderWriterLockSlim,则会形成一个列表。可以从列表中回收RWC。
为了防止重复进入ReaderLock,如果已有读者,就会跑出异常。
第二步:登记Reader
如果owners没有超出最大值,这是最好的情况,多个reader都可以登记。这里真正体现了读者不互斥。多个线程在调用EnterReadLock之后,都可以顺利地执行后续代码。(释放mylock后,只要owners不超限,其他线程也会进入这段代码,不会被阻塞)
如果owners超出最大值(只靠单纯地增加reader不太可能发生),这里最大的可能是之前其他线程已经EnterWriteLock,后续我们可以从TryEnterWriteLockCore看到,writer会将owners设置成超出值,从而阻挡了读者登记,跳到了2.2.
登记失败,接下来首先会尝试自旋,在自旋时间内,如果writer释放了lock,重新登记读者。
如果尝试过N次自旋后,还是无法成功,只能创建readerEvent(ManualResetEvent),并等待。当writer触发终止态时,所有等待的reader都会得到通知。
第三步:释放轻量锁
private void ExitMyLock()
{
Volatile.Write(ref myLock, 0);
}
很简单,int lock设成0,其他等待的线程又可以成功设置mylock了。
如何读写互斥
之前讲了写会通过owners排斥读,再看下读怎么排斥写。
private bool TryEnterWriteLockCore(TimeoutTracker timeout)
{
......
EnterMyLock();
lrwc = GetThreadRWCount(true);
//Can't acquire write lock with reader lock held.
if (lrwc != null && lrwc.readercount > 0)
{
ExitMyLock();
throw new LockRecursionException(SR.GetString(SR.LockRecursionException_WriteAfterReadNotAllowed));
}
......
for (; ; )
{
if (IsWriterAcquired())
{
// Good case, there is no contention, we are basically done
SetWriterAcquired();
break;
}
......
if (spincount < MaxSpinCount)
{
ExitMyLock();
if (timeout.IsExpired)
return false;
spincount++;
SpinWait(spincount);
EnterMyLock();
continue;
}
// Drat, we need to wait. Mark that we have waiters and wait.
if (writeEvent == null) // create the needed event.
{
LazyCreateEvent(ref writeEvent, true);
continue; // since we left the lock, start over.
}
retVal = WaitOnEvent(writeEvent, ref numWriteWaiters, timeout);
//The lock is not held in case of failure.
if (!retVal)
return false;
}
ExitMyLock();
writeLockOwnerId = id;
return true;
}
获取锁不用再讲。看看之后的操作,首先设置标志位。
private bool IsWriterAcquired()
{
return (owners & ~WAITING_WRITERS) == 0;
}
如果已经登记过reader,此处返回false,writer会进入后续等待,否则设置标志位。
private void SetWriterAcquired()
{
owners |= WRITER_HELD; // indicate we have a writer.
}
设置该标志位后,又会将reader阻挡在外。
后续等待的流程差不多,要注意这里创建的writerEvent是AutoResetEvent,意味着一次只能唤醒一个writer。这就避免了writer之间的竞争。
Event何时通知
readerEvent在ExistWriteLock,writerEvent在ExistReadLock。这也很科学。
写一个封装器
不难实现,支持using即可。