ReentrantLock
- lock() : void <-Lock
Acquires the lock if it is not held by another thread and returns immediately, setting the lock hold count to one.
尝试竞争锁,如果当前锁未被线程持有并成功获取锁,将count置为1,并立刻返回执行后续。
If the current thread already holds the lock then the hold count is incremented by one and the method returns immediately.
若锁已被当前线程持有,则count+1并立刻返回执行后续。
If the lock is held by another thread then the current thread becomes disabled for thread scheduling purposes and lies dormant until the lock has been acquired, at which time the lock hold count is set to one.
若锁被其他线程持有,则线程无法响应系统调度进入阻塞,直到成功获取锁。
- lockInterruptibly() throws InterruptedException: void <-Lock
Acquires the lock unless the current thread is {@linkplain Thread#interrupt interrupted}.
尝试竞争锁过程和lock()相同,区别是响应interrupted。
If the current thread: has its interrupted status set on entry to this method; or is {@linkplain Thread#interrupt interrupted} while acquiring the lock, then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.
若竞争锁过程被interrrupted,则抛出InterruptedException并清除interrupted状态。
- tryLock() : boolean <-Lock
Acquires the lock if it is not held by another thread and returns immediately with the value {@code true}, setting the lock hold count to one.Even when this lock has been set to use a fair ordering policy, a call to {@code tryLock()} will immediately acquire the lock if it is available, whether or not other threads are currently waiting for the lock.
尝试竞争锁,若未被其他线程持有则获取锁并立刻返回true,并将count置为1,且竞争时忽略已设置的公平策略。
If the current thread already holds this lock then the hold count is incremented by one and the method returns {@code true}. If the lock is held by another thread then this method will return immediately with the value {@code false}.
如果当前线程已持有该锁,则count+1返回true。若锁已被其他线程持有则返回false不阻塞。
- tryLock(long timeout, TimeUnit unit) throws InterruptedException : boolean
Acquires the lock if it is not held by another thread within the given waiting time and the current thread has not been {@linkplain Thread#interrupt interrupted}.
在给定时间内竞争锁对象,若被调用interrrupt则抛出异常并清除interrupted状态。
- unlock() : void()
If the current thread is the holder of this lock then the hold count is decremented. If the hold count is now zero then the lock is released. If the current thread is not the holder of this lock then {@link IllegalMonitorStateException} is thrown.
如果当前线程是锁的持有者则count自减-1,若count为零则释放锁。若当前线程不是锁的持有者抛出IllegalMonitorStateException。
- getHoldCount() : int
Return the number of holds on this lock by the current thread, or zero if this lock is not held by the current thread。
返回当前线程持有锁次数,为零表示未持有当前锁。
- getQueuedThreads() : Collection<Thread>
Returns a collection containing threads that may be waiting to acquire this lock. Because the actual set of threads may change dynamically while constructing this result, the returned collection is only a best-effort estimate.
返回等待获取锁的线程集合,因为线程是动态变动的,结果集只能做大尽量准确的。
- newCondition() : Condition
Returns a {@link Condition} instance for use with this {@link Lock} instance。
返回一个该锁的Condition实例,下面我们来看看Condition条件变量。
Condition
Conditions (also known as condition queues or condition variables ) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true. Because access to this shared state information occurs in different threads, it must be protected, so a lock of some form is associated with the condition. The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like {@code Object.wait}.
Conditions(通常被叫做条件队列或条件变量)为一个线程延缓执行等待被另一个线程notify提供了一种手段。不同线程使用共享状态时为了线程安全安全,可以使用一个关联Conditions的锁对象。关键点是一个线程进入condition队列等待时会自动释放关联锁并延缓执行,类似Object.wait()方法。
- await() throws InterruptedException : void
Causes the current thread to wait until it is signalled or {@linkplain Thread#interrupt interrupted}.
让当前线程等待直到被singalled或者interrupted。
The lock associated with this {@code Condition} is atomically released and the current thread becomes disabled for thread scheduling purposes and lies dormant until one of four things happens:
调用await()后与Condition绑定的锁对象会自动释放且当前线程休眠无法调度直到发生以下四种情况:
- Some other thread invokes the {@link #signal} method for this {@code Condition} and the current thread happens to be chosen as the thread to be awakened;
其他线程调用Condition#signal且当前线程被选中唤醒- or Some other thread invokes the {@link #signalAll} method for this {@code Condition};
其他线程调用Condition#signalAll且当前线程被选中唤醒- or Some other thread {@linkplain Thread#interrupt interrupts} the current thread, and interruption of thread suspension is supported;
其他线程interrupt当前线程,当前线程抛出InterruptedException- or A "spurious wakeup" occurs.
当一种"伪唤醒"发生。这里搜索了一下,spurious wakeup。
"This means that when you wait on a condition variable, the wait may (occasionally) return when no thread specifically broadcast or signaled that condition variable. Spurious wakeups may sound strange, but on some multiprocessor systems, making condition wakeup completely predictable might substantially slow all condition variable operations. The race conditions that cause spurious wakeups should be considered rare."
大意是在多线程操作系统中,即使条件变量(Condition)未被其他线程broadcast&signaled,也有少数情况在await()时会返回,此时改用while()检查可以解决,但会大大降低条件变量(Condition)的执行效率。
while(条件不满足){
condition_wait(cond, mutex);
}
而不是:
If( 条件不满足 ){
Condition_wait(cond,mutex);
}
In all cases, before this method can return the current thread must re-acquire the lock associated with this condition. When the thread returns it is guaranteed o hold this lock.
以上情况,当前线程在返回前,必须保证重新获取条件变量(Condition)关联的锁。
- awaitUninterruptibly() : void
Causes the current thread to wait until it is signalled.
让当前线程进入等待直到被signalled选中唤醒。
If the current thread's interrupted status is set when it enters this method, or it is {@linkplain Thread#interrupt interrupted} while waiting, it will continue to wait until signalled. When it finally returns from this method its interrupted status will still be set.
如果当前线程在waiting时被interrupted,将继续waiting直到被signalled,直到成功被唤醒返回时interrupted标识仍然不变为true。
- awaitNanos(long nanosTimeout) throws InterruptedException : long
Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.
在给定时间内等待锁。
@return an estimate of the {@code nanosTimeout} value minus the time spent waiting upon return from this method. A positive value may be used as the argument to a subsequent call to this method to finish waiting out the desired time. A value less than or equal to zero indicates that no time remains.
方法返回剩余等待时间,为0表示时间结束。
- await(long time, TimeUnit unit) throws InterruptedException : boolean
This method is behaviorally equivalent to: {@code awaitNanos(unit.toNanos(time)) > 0}
和awaitNano类似,限定时间内未获取自动返回继续执行。
- awaitUntil(Date deadline) throws InterruptedException : boolean
在给定截止日期内等待获取锁。
- signal() : void;
Wakes up one waiting thread.If any threads are waiting on this condition then one is selected for waking up. That thread must then re-acquire the lock before returning from {@code await}.
唤醒等待中的线程。如果条件变量中有等待await()的变量,选择一个唤醒继续工作,选中线程在从await()返回时必须重新获取锁。
- signalAll() : void;
Wakes up all waiting threads. If any threads are waiting on this condition then they are all woken up. Each thread must re-acquire the lock before it can return from {@code await}.
唤醒所有等待中的线程。唤醒条件变量中的所有线程,每个线程在从await()返回时都要重新获取锁(这里也要进行锁竞争)。
注意到ReentrantLock的锁实现都是通过sync这个内部类完成的,那么来看看这个类。
java.util.concurrent.locks.ReentrantLock#lock
public void lock() {
sync.lock();
}
java.util.concurrent.locks.ReentrantLock#unlock
public void unlock() {
sync.release(1);
}
java.util.concurrent.locks.ReentrantLock.Sync
Base of synchronization control for this lock. Subclassed into fair and nonfair versions below. Uses AQS state to represent the number of holds on the lock.
为ReentrantLock提供同步控制基础,子类主要是公平和非公平两个版本。使用AQS状态代表持有锁的状态码。
- Sync# abstract lock() : void
Performs {@link Lock#lock}. The main reason for subclassing is to allow fast path for nonfair version.
这里是抽象方法,具体实现跨越看非公平版本的实现。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
这里主要是用到了AQS来实现获取锁,先看compareAndSetState这个方法。
java.util.concurrent.locks.AbstractQueuedSynchronizer#compareAndSetState
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update); //调用的JNI C++ 接口
}
AQS的CAS算法,如果当前状态等于给定的expecte值,自动将同步状态设置为给定的update值,返回true,否则不做操作返回false。该方法对读和写有volatile内存语义(意思是有可见性)。
- Sync# nonfairTryAcquire(int acquires) : boolean
Performs non-fair tryLock. tryAcquire is implemented in subclasses, but both need nonfair try for trylock method.
非公平尝试竞争锁,子类实现tryLock()使用。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
首先获取当前同步状态getState(),若为0则setExclusiveOwnerThread(current),这是AbstractOwnableSynchronizer类的方法,该类后面再看,作用是当前线程设置为独占锁持有者,然后返回ture。
- Sync# tryRelease(int releases) : boolean
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先检查当前线程是否是该同步器持有者,若是在检查state是否为0,如果为0去除同步器的线程指针,释放锁返回true
- Sync# isHeldExclusively()
检查当前线程是否是同步器的持有者
- Sync# isLocked() : boolean
final boolean isLocked() {
return getState() != 0;
}
可以看到,state=0则为无锁状态。到这里Sync的接口已经结束了,Sync抽象类的主要两种实现NonfairSync和NonfairSync。区别是,如果当前线程不是锁的占有者,则FairSync则会判断当前是否有等待队列,如果有则将自己加到等待队列尾;如果当前线程不是锁的占有者,则NonfairSync并不判断是否有等待队列,直接使用compareAndSwap去进行锁的占用;
竞争锁流程
接下来我们来看一下ReentrantLock一次lock的整个流程:
ReentrantLock lock = new ReentrantLock(); //创建一个lock实例
//java.util.concurrent.locks.ReentrantLock#ReentrantLock()
public ReentrantLock() {//默认的构造方法使用的是非公平Snyc
sync = new NonfairSync();
}
lock.lock(); //java.util.concurrent.locks.ReentrantLock#lock
public void lock() {
sync.lock(); //ReentrantLock.NonfairSync#lock
}
默认构建的是非公平同步器。所以接下来进到ReentrantLock.NonfairSync#lock:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
}
到这里若compareAndSetState(0, 1)操作成功说明此事AQS处于无锁状态,则设置当前线程为持有线程,然后成功获取锁并返回;若CAS原子操作失败则进去acquire(1)。
public final void acquire(int arg) {
if (!tryAcquire(arg) && //java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里开始产生分支了。若tryAcquire(1)获取成功则acquire(1)方法直接返回,当前线程获取锁,此次lock()操作结束。若失败继续执行acquireQueued(),并且根据返回标识判定是否需要interrupt。首先我们来看tryAcquire:
tryAcquire实际最后是调用了NofairSync#nonfairTryAcquire:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); //java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //再次检查state==0,AQS处于无锁状态。
if (compareAndSetState(0, acquires)) {//重新尝试获取锁
setExclusiveOwnerThread(current);
return true;//获取锁成功
}
}
else if (current == getExclusiveOwnerThread()) { //AQS持有线程等于当前线程
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);//state+=1;并返回true
return true;
}
return false; //没有获取到锁,返回false
}
然后来看addWaiter(Node.EXCLUSIVE),将线程包装成独占节点加入到队尾:
private Node addWaiter(Node mode) { //将当前线程组装成独占节点并加入队尾
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {//若尾节点==null,则将node设置为tail
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;//设置成功返回
}
}
enq(node);//若尾节点不为空进入enq自旋入队
return node;//返回入队节点
}
private Node enq(final Node node) {
for (;;) {//自旋
Node t = tail;
if (t == null) { // Must initialize 尾节点为空,初始化头节点,头节点不持有线程和前后节点
if (compareAndSetHead(new Node()))
tail = head; //头节点==尾节点
} else {
node.prev = t;//插入节点的前驱节点指向尾节点
if (compareAndSetTail(t, node)) {//将插入节点设置为尾节点
t.next = node;//尾节点的后继节点指向插入节点
return t;//入队成功返回
}
}
}
}
接着执行acquireQueued阻塞线程:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//设置打断标识
for (;;) {
final Node p = node.predecessor();//获取前驱节点
if (p == head && tryAcquire(arg)) {//若前驱节点是头节点且尝试获取锁成功
setHead(node);//将当前节点设置为头节点,并清除插入节点的线程
p.next = null; // help GC 插入节点出队
failed = false; //标识成功
return interrupted; //返回打断标识
}
if (shouldParkAfterFailedAcquire(p, node) &&//查询是否需要挂起线程或者失败结束
parkAndCheckInterrupt()) //如果需要则打断标识返回true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);//如果获取失败(中途发生异常)node持有线程取出,ws设置为CANCELLED
}
}
//AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //如果节点状态为SIGNAL则可以安全挂起返回true
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//如果节点状态>0(CANCELLED)此节点已取消,继续向前遍历
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {//state==0 || PROPAGATE,若为0则更改为SIGNAL下循环进入则可以安全挂起该节点
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;//返回false,线程未被挂起,继续自旋
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//挂起当前线程
return Thread.interrupted();//检测当前线程是发生过打断
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null; //清除节点线程
// Skip cancelled predecessors 向前遍历找到有效的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next; //有效前驱节点的后继节点
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; //设置状态为CANCELLED
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {//如果取消节点是尾节点将有效前驱节点设置为尾节点
compareAndSetNext(pred, predNext, null);//将有效前驱节点的后继节点指向null
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head && //如果有效前驱节点不是头节点
((ws = pred.waitStatus) == Node.SIGNAL || //有效前驱节点waitStatus!=CANCELLED则设置为CANCELLED
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {//且持有线程
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);//将node节点从队列中出列
} else {//若要取消节点是head节点,则直接唤醒后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {//如果后继节点==null或者CANCELED
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//从tail往前遍历,找到最靠前的有效节点然后唤醒
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
下面是NonfairSync的一个整体lock流程:
FairSync和NofairSync的区别是,公平锁在lock的时候回去首先调用AQS#acquire(1)方法,不去直接尝试获取锁;
ReentrantLock lock = new ReentrantLock(true);//启用公平策略
lock.lock(); //-->>内部调用FairSync#lock
//java.util.concurrent.locks.ReentrantLock.FairSync#lock
final void lock() {
acquire(1); //-->java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire这里一样,但tryAcquire有区别
}
下面是FairSync的tryAcquire方法:
//java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&//这里有区别,会等待队列前面的节点线程执行完再尝试竞争
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
......
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
后面的流程和非公平同步器是一样的。