并发包的锁
LockSupport 工具类
JDK 中的 rt.jar 包里面的 LockSupport 是一个工具类,它的主要作用是挂起和唤醒线程。
LockSupport 类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport类的方法的线程是不持有许可证的。LockSupport是使用Unsafe类实现的,其主要方法:
-
static void park();如果调用park方法的线程已经拿到了与Support关联的许可证,则调用LockSupport.park() 时会马上返回,否则会被阻塞挂起,默认情况下线程不持有许可证。
如果其他线程调用了此线程的interrupt方法,设置了中断标志或线程被虚假唤醒,则阻塞线程也会返回,park最好使用循环判断执行。
-
static void unpark(Thread thread);如果thread未获得LockSupport与Thread类的关联许可,则让thread持有;
若thread之前被park挂起,则thread被唤醒;
若thread之前没有调用park,则调用unpark后再调用park方法,其会立即返回。
-
static void parkNanos(long nanos);与park的不同之处,如果线程没有拿到LockSupport的许可支持,将会在nanos时间后自动返回。
使用 jsatck pid命令可以查看线程堆栈信息
-
static void park(Object blocker);Thread 类中有一个volatile Object parkBlocker 变量,用来存放blocker变量;
-
static void parkNanos(Object blocker,long nanos);相比4 多了超时时间。
static void parkUtil(Object blocker,long deadline);
这个时间时一个时间点,
AQS 抽象同步队列
它维护了一个volatile int state(代表共享资源)和一个FIFO(双向队列)线程等待队列(多线程争用资源被阻塞时会进入此队列)
AQS的实现依赖内部的同步队列(FIFO双向队列),如果当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,将其加入同步队列的尾部,同时阻塞当前线程,当同步状态释放时,唤醒队列的头节点。
AQS队列存储被阻塞队列,其可拥有多个条件变量 ConditionObject 内部类对象,内部类对象有 signal();signalAll();await();方法分别与 synchronized 同步资源对应,相应条件变量阻塞的线程被存到该条件变量维护的条件队列,该队列是单向链表,一个AQS 可拥有多个condition。
原理图示与类图


AQS重要属性与模板方法
AQS将大部分的同步逻辑均已经实现好,继承的自定义同步器只需要实现state的获取(acquire)和释放(release)的逻辑代码就可以。AQS需要子类复写的方法均没有声明为abstract,目的是避免子类需要强制性覆写多个方法,因为一般自定义同步器要么是独占方法,要么是共享方法,只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;//状态int变量
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
//模板方法
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
独占模式的独占与释放
//1.获取资源
public final void acquire(int arg) {
//获取资源成功即结束,失败则封装节点为EXCLUSIVE添加线程到等待队列,然后中断当先线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//2.(接1)获取资源需要具体子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//3.(接1)封装节点为EXCLUSIVE后,添加线程到等待队列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//简单测试添加到队列
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//结尾节点为null或添加失败,使用CAS自旋添加节点
enq(node);
return node;
}
//4.(接3)节点入队 头节点为null则设置为头节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//5.(接1)中断当前线程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
//1.释放资源,并释放等待队列阻塞的一个线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//2.释放资源具体实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//3.释放一个合适的被阻塞挂起的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
独占锁 ReentrantLock

Sysc类直接继承自AQS,它的子类NonfairSync 和 FairSysn 分别实现的获取锁的非公平与公平策略。
void lock();获取锁
void interruptibly();对中断进行响应
boolean tryLock();尝试获取锁,不会引起当前线程阻塞
boolean tryLock(long timeout, TimeUnit unit);与tryLock不同,设置了超时时间,时间超时未获取锁则返回false;
void unlock();释放锁
具体实现
//非公平锁的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//公平锁的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//当前节点有先驱节点则获取失败
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
读写锁 ReentrantReadWriteLock

通过state高16位表示读状态,低16位表示写状态。写锁可重入,是独占锁。读锁为共享锁
读锁实现
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible有资格 or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//高十六位
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
写锁实现
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
//低十六位
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
StampedLock(JDK8)
内部有不可重入锁,可相互转换
- 写锁 writeLock 独占锁
- 悲观读锁 readLock 共享锁
- 乐观读锁 tryOptimisticRead 利用版本号,在本地存快照,不使用锁