ReentrantLock + Condition
// condition 使用
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
condition.await();
condition.signal();
lock.unlock();
}
翻阅源码过程中发现,CAS使用的技巧。
在操作Node节点,线程状态(waitStatus)时无论是否已加锁成功 都会使用CAS。思考原因!!
new ReentrantLock()
public ReentrantLock() {
// 默认使用非公平锁
sync = new NonfairSync();
}
// 实现类
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
// lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 尝试获取锁的方法 调用父类Sync的实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
lock.newCondition()
// ReentrantLock
public Condition newCondition() {
return sync.newCondition();
}
// Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
// AbstractQueuedSynchronizer
// AQS中定义了 ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
...
}
lock.lock()
// ReentrantLock
public void lock() {
sync.lock();
}
// NonfairSync
final void lock() {
// CAS 尝试修改 threadState
// unsafe.compareAndSwapInt(this, stateOffset, expect, update);
// 1 表示加锁状态 0 表示无锁状态 大于1 表示锁的重入次数 注意此时设置值使用了CAS操作
if (compareAndSetState(0, 1)){
// 加锁成功 设置独占线程值(将当前线程设置为锁的独占线程)
setExclusiveOwnerThread(Thread.currentThread());
}
else{
// CAS加锁失败 重试
acquire(1);
}
}
// AQS arg = 1
public final void acquire(int arg) {
// 尝试获取锁 正常情况会失败(这里对于抢占锁来讲只是 又走了一下lock方法的CAS操作)
// 抢一次不行 立马再抢一下,存个侥幸心理
if (!tryAcquire(arg) &&
// 抢不到只好 先添加阻塞队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// NonfairSync acquires = 1
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// Sync acquires = 1
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取线程状态 由于未加锁成功 默认是 0
int c = getState();
if (c == 0) {
// cas 再次尝试加锁
if (compareAndSetState(0, acquires)) {
// 成功设置当前线程独占
setExclusiveOwnerThread(current);
return true;
}
}
// 状态不为0 表示其他线程持有锁 判断持有锁的线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 如果是 设置重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow 超出最大值变负数
throw new Error("Maximum lock count exceeded");
// 设置当前状态 unsafe
// 思考:AQS 中的state 是共享变量 为什么这里读取和设置不需要 CAS
setState(nextc);
return true;
}
return false;
}
// AQS
// 此时mode = null
private Node addWaiter(Node mode) {
// 新建Node节点 节点的nextWaiter 为null
// netWaiter Condition 等待队列下个节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 先快速尝试入队,失败再执行完整的入队逻辑
Node pred = tail;
// tail 为阻塞队列尾结点,如果尾结点不为空表示队列不为null
if (pred != null) {
node.prev = pred;
// 这里使用尾插法 思考 为什么要使用尾插法 而不是头插法
// 尝试将当前节点修改为尾节点
if (compareAndSetTail(pred, node)) {
// 修改成将 之前尾节点的 next节点指向当前节点
pred.next = node;
return node;
}
}
// 如果尾结点为null 或者 CAS 尾结点操作失败(其他线程并发 导致尾结点变更)
// 调用enq执行真正的入队操作
enq(node);
return node;
}
// AQS
private Node enq(final Node node) {
// 自旋直到入队成功
for (;;) {
Node t = tail;
// 再次判断尾结点是否为null 此段代码允许并发 有可能已有新的变化
if (t == null) { // Must initialize
// 头尾节点为null 表示队列为null 需要初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 将node节点设置为 尾结点
if (compareAndSetTail(t, node)) {
t.next = node;
return t; // 思考此处返回 t 的原因是什么
}
}
}
}
// AQS node 为当前节点 arg = 1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋 等待 被唤醒后 找头节点并尝试获取锁,失败后继续等待
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 成功后设置当前节点为 头节点 将原头结点置位Null
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 自旋 现将节点状态 设置为 -1,然后park阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// AQS
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 等待状态 int 类型初始化未赋值 所以首次 应该为 0
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// AQS
private final boolean parkAndCheckInterrupt() {
// 这个位置阻塞
LockSupport.park(this);
return Thread.interrupted();
}
// LockSupport
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
Lock.unlock()
// ReentrantLock
public void unlock() {
// 同样调用AQS 释放锁 每次state -1 与重入次数匹配
sync.release(1);
}
// AQS
public final boolean release(int arg) {
// 释放锁 tryRelease 由具体的子类 来实现如何释放锁 与acquire类似
if (tryRelease(arg)) {
// 释放成功后 获取头结点
Node h = head;
// 头结点不为null 表示阻塞队列不为空 需要执行唤醒
// waitStatus != 0 表示阻塞线程 非初始状态
// 已知 waitStatus 有5种 1 取消状态 0 初始状态(int默认为0) -1 阻塞等待被唤醒状态 -2 condition条件 -3 传播?暂时不懂
// 个人理解此处为什么不使用 h.waitStatus == -1
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// ReentrantLock->Sync
protected final boolean tryRelease(int releases) {
// 获取当前 锁的状态
int c = getState() - releases;
// 如果当前线程非持有锁的线程 直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c == 0 表示完全释放锁,否则表示减少重入次数
if (c == 0) {
free = true;
// 设置当前锁的独占线程为Null 允许其他线程抢占锁
setExclusiveOwnerThread(null);
}
// 设置锁的状态值 思考:为什么此处不使用CAS操作,而lock方法acquire时需要使用CAS
setState(c);
return free;
}
// AQS 唤醒 Node:head节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 正常情况 此处为 -1 阻塞等待唤醒状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 此处为什么又使用CAS
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取头节点的下个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果next节点为null 从尾部向前遍历,找到最前的 匹配节点 作为本次唤醒节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 为什么不唤醒头节点?
// 头结点在创建的时候是一个 new Node() 操作 可以理解为是一个Null节点,阻塞队列添加元素的时候是尾插法 也就是说第一个 等待线程会作为 tail 节点存在,所以找到头结点后要next下一个节点 作为需要唤醒的元素
}
// AQS 加锁阻塞在这个位置,唤醒后继续从这里开始
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// Thread.interrupted() 会返回线程中断状态,如果是中断触发的唤醒操作 会返回true
// unpark唤醒操作返回false
return Thread.interrupted();
}
// AQS
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 重新开始自旋操作 此时获取的pred节点为head节点 (唤醒的是head节点的next节点)
final Node p = node.predecessor();
// 重新尝试抢占锁
if (p == head && tryAcquire(arg)) {
// 抢占成功 将当前节点设置为Head节点 思考 此处为什么不使用CAS
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果是中断唤醒 将interrupted变量置为 true 后续返回true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//AQS 设置头节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
condition.await();
// AQS
public final void await() throws InterruptedException {
// 判断是否被中断
if (Thread.interrupted())
throw new InterruptedException();
// 添加到等待队列
Node node = addConditionWaiter();
// 释放获得的锁 saveState 为锁state值
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断是否满足阻塞条件
while (!isOnSyncQueue(node)) {
// 阻塞等待唤醒
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// AQS -> ConditionObject
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除非 Condition状态节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 使用当前线程,初始化Node节点 并将头节点、尾结点指向他
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node; // 此处为什么不先设置尾结点?
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// AQS
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前锁的状态 有可能是重入n次
int savedState = getState();
// 释放锁 与lock.unlock()逻辑一致
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
Condition.signal()
// AQS -> ConditionObject (Condition接口)
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 从头开始准备唤醒
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// AQS
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 尝试修改当前Node节点 状态 修改失败 尝试唤醒下一个节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 状态修改成功 先重新加入阻塞队列
// 注意这里p为node的前一个节点
Node p = enq(node);
int ws = p.waitStatus;
// 为什么要修改p的状态 唤醒node节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒
return true;
}
问题
1. 很多地方用了CAS操作,比如修改锁的状态值AQS的state、Thread的waitStatus。在修改state时会考虑如果加锁成功状态直接修改无需CAS,但是在修改waitStatus时,及时已经获取到锁依然会使用CAS。
2. Condition等待队列为什么在signal时修改pred节点状态。
3. 为什么使用尾插法而不是头插发
ThreadPoolExecutor
new ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime, // 存活时间
TimeUnit unit, // 存活时间单位
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, // 线程工厂 自定义线程属性、名称
RejectedExecutionHandler handler // 拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
executor()
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}