1、lock介绍
lock和synchronized区别
(1)、与synchronized相比,Lock提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票,支持实现公平锁和非公平锁、独占锁和非独占锁。
(2)、Lock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放。但Lock必须手动释放,锁释放一定要在finally中处理。
(3)、Lock和synchronized在实现方面不同,synchronized使用对象内部的一个叫做监视器锁(monitor)来实现的,Lock通过队列同步器来实现。
2、lock的实现与队列同步器AQS
java.util.concurrent.locks类图
(1)、ReentrantLock结构
Sync与ReentrantLock是组合关系,且FairSync(公平锁)、NonfairySync(非公平锁)是Sync的子类。Sync继承AQS(AbstractQueuedSynchronizer)。
队列同步器AbstractQueuedSynchronizer面向锁的实现,屏蔽同步状态管理、线程排队、等待和唤醒操作。
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
* 通过静态内部类Syn实现,Syn继承AQS
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//再次获取同步状态(可重入的关键)
//如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//只要CAS更新同步状态成功就获取到锁。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//相比非公平锁的nonfairTryAcquire方法,增加!hasQueuedPredecessors() 是否有前驱线程的判断,保证获取锁的顺序性
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//释放锁tryRelease的实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*默认非公平锁
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//省略.............
}
公平锁与非公平锁的比较
公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。公平性锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁出现了一个线程连续获取锁的情况。非公平性锁可能使线程“饥饿”,当一个线程请求锁时,只要获取了同步状态即成功获取锁。在这个前提下,刚释放锁的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。
非公平锁可能使线程“饥饿”,为什么它又被设定成默认的实现呢?非公平性锁模式下线程上下文切换的次数少,因此其性能开销更小。公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。
可重入
“可重入”意味着自己可以再次获得自己的内部锁,而不需要阻塞。当某一个线程请求成功后,JVM会记录该锁的持有线程 并且将计数设置为1,如果这时其他线程请求该锁时则必须等待。当该线程再次请求请求获得锁时,计数会+1;当占有线程退出同步代码块时,计数就会-1,直到为0时,释放该锁。这时其他线程才有机会获得该锁的占有权。
(2)、队列同步器
队列同步器节点和数据结构
Node节点
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
队列同步器提供状态管理方法(final修饰,不可重写)
•getState():获取当前同步状态。
•setState(int newState):设置当前同步状态。
•compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
队列同步器中可重写方法
队列同步器中模板方法(不可重写)
(3)、独占锁与AQS的实现
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)
//tryAcquire虚函数
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
//获取失败,等待的线程创建节点,添加到队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//循环CAS设置尾节点,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,
//否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变得“串行化”了。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//非首节点线程前驱节点出队或者被中断而从等待状态返回,检查自己的前驱是否是头节点,如果是则尝试获取同步状态。
//可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,
//这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
final Node p = node.predecessor();
//检查当前节点是否是首节点,并尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //获取同步状态失败后判断是否需要阻塞或中断
parkAndCheckInterrupt()) //阻塞当前线程
interrupted = true;//acquire方法判断,进行中断
}
} finally {
if (failed)
cancelAcquire(node);//node.waitStatus = Node.CANCELLED,且unpark后继节点
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//获取前驱节点的等待状态
if (ws == Node.SIGNAL)
//SIGNAL状态:前驱节点释放同步状态或者被取消,将会通知后继节点。因此,可以放心的阻塞当前线程,返回true。
/* This node has already set status asking a release to signal it, so it can safely park.*/
return true;
if (ws > 0) {//前驱节点被取消了,跳过前驱节点并重试
/* Predecessor was cancelled. Skip over predecessors and indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {//独占模式下,一般情况下这里指前驱节点等待状态为SIGNAL
/* waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking. */
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//设置当前节点等待状态为SIGNAL
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//阻塞当前线程,等待unpark
return Thread.interrupted();
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
//AQS的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒后继节点
return true;
}
return false;
}
(4)、共享锁与AQS实现
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态;在唤醒时候,多个等待共享锁的线程同时唤醒。如程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。
CountDownLatch、Semaphore都是基于共享锁AQS实现的。
//ReentrantReadWriteLock的tryAcquireShared实现
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)//有写锁,且非当前线程占有写锁
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//共享锁状态增加
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
public final void acquireShared(int arg) {
//tryAcquireShared请求判断又是由子类实现判断
if (tryAcquireShared(arg) < 0)
//失败后加入队列
doAcquireShared(arg);
}
//发现共享整个挂起的doAcquireShared方法跟独占模式的acquireQueued处理差不多,
//唯一有区别的似乎就是如果pre节点是头结点,当前节点请求成功,独占模式只是将节点设置为head然后
//return,这里除了sethead还将唤醒队列的中其他节点
private void doAcquireShared(int arg) {
//节点状态是共享,之前的独占模式是EXCLUSIVE
//跟独占一样加入队列,Node.SHARED为Node类静态变量,尾节点是共享节点,则不会重复添加
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//自旋
//还是判断pre是不是头结点,是就再次请求
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//这里和独占不同,独占模式下只是设置成头结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
//当前node请求成功后判断next节点是共享节点就继续release
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
/*
使用for保证队列中节点一定会被传递,即使有其他acquire或release在进行
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//节点状态为SIGNAL表示需要向后传递
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//不同于独占锁,多个线程释放,需要CAS操作
continue; // 失败了就loop
unparkSuccessor(h);
}
//为0就设置成PROPAGETE表示需要传播
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
(5)、独占式超时获取同步状态
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3、Condition的实现分析
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSyn-chronizer.Node。
添加等待队列尾节点的引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
await() 加入等待队列、释放锁、阻塞线程。并循环等待通知加入同步队列
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入同步队列
Node node = addConditionWaiter();
//释放同步状态,即释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//被唤醒后的线程,将从await()方法中的while循环中退出
//(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),
//进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);//阻塞线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
signal通知,从等待队列中获取node,加入同步队列。unpark线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。
//当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}