工作中或多或少都会遇到并发问题,解决此类问题最好的解决方法就是通过锁,本文通过分析最常见的类ReentrantLock的源码来了解jdk大神的实现细节。以便于在合适的时候使用锁。
public class Demo implements Runnable {
private static int count = 0;
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Demo());
thread.start();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("result: " + count);
}
@Override
public void run() {
//unsafe
for (int i = 0; i < 10000; i++) {
count++;
}
}
常见的例子起10个线程去分别累计10000次,因为访问了同样的变量。最终的结果不会是100000。就需要给计算方法加同步。使用ReentrantLock如下:
public class LockDemo implements Runnable {
private static int count = 0;
private Lock lock = new ReentrantLock();
public LockDemo(Lock lock){
this.lock= lock;
}
public static void main(String[] args) {
Lock lock = new ReentrantLock();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new LockDemo(lock));
thread.start();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("result: " + count);
}
@Override
public void run() {
try {
this.lock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
}
finally {
this.lock.unlock();
}
}
}
ReentrantLock方法如下表:
方法名 | 作用 |
---|---|
ReentrantLock() | 构造函数,默认非公平锁方式构建 |
ReentrantLock(boolean fair) | 构造函数,指定公平锁或非公平锁方式构建 |
int getHoldCount() | 由AbstractQueuedSynchronizer子类实现,获取当前线程持有锁的次数,如果非当前线程返回0 |
Thread getOwner() | 由AbstractQueuedSynchronizer子类实现,获取持有锁的线程 |
Collection<Thread> getQueuedThreads() | 由AbstractQueuedSynchronizer子类实现,获取正在排队等待获取锁的线程集合 |
int getQueueLength() | 由AbstractQueuedSynchronizer子类实现,获取正在排队等待获取锁的线程数量 |
Collection<Thread> getWaitingThreads(Condition condition) | 由AbstractQueuedSynchronizer子类实现,获取等待唤醒的线程集合 |
int getWaitQueueLength(Condition condition) | 由AbstractQueuedSynchronizer子类实现,获取等待唤醒的线程数量 |
boolean hasQueuedThread(Thread thread) | 由AbstractQueuedSynchronizer子类实现,该线程是否正在排队获取锁 |
boolean hasQueuedThreads() | 由AbstractQueuedSynchronizer子类实现,是否有线程正在排队获取锁 |
boolean hasWaiters(Condition condition) | 由AbstractQueuedSynchronizer子类实现,是否有线程等待被唤醒 |
boolean isFair() | 判断当前锁是否是公平锁 |
boolean isHeldByCurrentThread() | 由AbstractQueuedSynchronizer子类实现,判断锁是否被当前线程持有 |
boolean isLocked() | 由AbstractQueuedSynchronizer子类实现,当前锁是否被持有 |
int getQueueLength() | 由AbstractQueuedSynchronizer子类实现,获取正在排队等待获取锁的线程数量 |
void lock() | 由AbstractQueuedSynchronizer子类实现,获取锁的过程 |
void lockInterruptibly() | 由AbstractQueuedSynchronizer子类实现,获取不到锁则抛出异常 |
Condition newCondition() | 由AbstractQueuedSynchronizer子类实现,构造一个条件对象 |
boolean tryLock() | 由AbstractQueuedSynchronizer子类实现,尝试获取锁,获取不到立即返回 |
boolean tryLock(long timeout, TimeUnit unit) | 由AbstractQueuedSynchronizer子类实现,在指定时间内尝试获取锁,获取不到抛出异常 |
void unlock() | 由AbstractQueuedSynchronizer子类实现,释放锁的过程 |
从方法列表可以看出来,ReentrantLock的大部分实现都是由AbstractQueuedSynchronizer提供,ReentrantLock中有3个内部类,Sync直接继承了AbstractQueuedSynchronizer。NonfairSync非公平模式和FairSync公平模式又继承了Sync。具体实现细节先不讨论。至少我们知道了ReentrantLock和AbstractQueuedSynchronizer是息息相关的。那么接下来我们先看下AbstractQueuedSynchronizer是什么东西。
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer抽象队列同步器,听名字就是一个线程安全的队列,实际上维护了两个队列,一个是等待获取锁的队列(AbstractQueuedSynchronizer自身维护),一个是等待被唤醒的队列(内部类ConditionObject维护)。
两个队列差别在于:
AbstractQueuedSynchronizer构建出来的队列,头节点是一个没有Thread的空节点,其标识作用,而Condition构建出来的队列,头节点就是真正等待的节点
AbstractQueuedSynchronizer构建出来的队列,节点之间有next与pred相互标识该节点的前一个节点与后一个节点的地址,而Condition构建出来的队列,只使用了nextWaiter标识下一个等待节点的地址
Condition 队列中的节点肯定是获取过锁,只是不满足人为指定的条件而主动阻塞释放锁,并且在被唤醒时会插入到AQS队列中,并不是立即执行的。
看下AbstractQueuedSynchronizer.Node持有的属性:
字段名 | 类型 | 默认 | 作用 |
---|---|---|---|
SHARED | Node | new Node | 标志当前节点在共享模式等待 |
EXCLUSIVE | Node | null | 标志当前节点在独占模式等待 |
CANCELLED | int | 1 | 等待状态值,说明该节点的线程已经被取消或者中断 |
SIGNAL | int | -1 | 等待状态值,说明该节点的后置节点需要取消阻塞 |
CONDITION | int | -2 | 等待状态值,说明该节点当前处于Condition队列中 |
PROPAGATE | int | -3 | 等待状态值,说明该节点处于共享模式中,它被唤醒后,需要继续唤醒后置节点 |
waitStatus | int | 0 | 等待状态值,上述字段为该状态的具体情况的值 |
prev | Node | null | Sync队列中,当前队列的前置节点 |
next | Node | null | Sync队列中,当前队列的后置节点 |
thread | Thread | null | 当前节点需要执行的线程 |
nextWaiter | Node | null | condition队列中,当前队列的后置节点(Sync队列中标记独占还是共享) |
下图是运行中生成的两种队列结构。
看完了Node,下面看一下AQS中变量和方法。
方法名 | 作用 |
---|---|
void acquire(int arg) | 获取锁 |
void acquireInterruptibly(int arg) | 获取锁,获取不到抛出异常 |
boolean acquireQueued(final Node node, int arg) | 队列中的线程尝试获取锁 |
void acquireShared(int arg) | 共享模式获取锁 |
void acquireSharedInterruptibly(int arg) | 共享模式获取锁,获取不到抛出异常 |
Node addWaiter(Node mode) | 增加Node,mode为Node的模式(公平,非公平) |
boolean apparentlyFirstQueuedIsExclusive() | 返回Sync队列中除头结点第一个不是共享模式的节点 |
void cancelAcquire(Node node) | 取消获取锁 |
boolean compareAndSetHead(Node update) | CAS方式设置头节点 |
boolean compareAndSetNext(Node node,Node expect,Node update) | CAS方式设置后置节点 |
boolean compareAndSetState(int expect, int update) | CAS方式设置锁的状态 |
boolean compareAndSetTail(Node expect, Node update) | CAS方式设置尾节点 |
boolean compareAndSetWaitStatus(Node node,int expect, int update) | CAS方式设置节点状态 |
void doAcquireInterruptibly(int arg) | 增加节点,且在队列中尝试获取锁,获取不到则抛出异常 |
boolean doAcquireNanos(int arg, long nanosTimeout) | 增加节点,且在队列中在固定时间内尝试获取锁 |
void doAcquireShared(int arg) | 共享模式下,增加节点,且在队列中尝试获取锁 |
void doAcquireSharedInterruptibly(int arg) | 共享模式下,增加节点,且在队列中尝试获取锁,获取不到则抛出异常 |
boolean doAcquireSharedNanos(int arg, long nanosTimeout) | 共享模式下,增加节点,且在队列中在固定时间内尝试获取锁,获取不到则抛出异常 |
void doReleaseShared() | 释放锁 |
Node enq(final Node node) | 插入到Sync队列中 |
boolean findNodeFromTail(Node node) | 从尾部开始查询节点是否在Sync队列中 |
Thread fullGetFirstQueuedThread() | 获取队列中第一个等待的线程 |
int fullyRelease(Node node) | 完全释放锁 |
Collection<Thread> getExclusiveQueuedThreads() | 获取队列中是独占模式的节点线程集合 |
Thread getFirstQueuedThread() | 获取队列中第一个等待的线程 |
Collection<Thread> getQueuedThreads() | 获取队列中线程集合 |
int getQueueLength() | 获取队列中的集合大小 |
Collection<Thread> getSharedQueuedThreads() | 获取队列中是共享模式的节点线程集合 |
int getState() | 获取锁的状态 |
Collection<Thread> getWaitingThreads(ConditionObject condition) | 获取等待队列中的线程集合 |
int getWaitQueueLength(ConditionObject condition) | 获取等待队列中的线程大小 |
int getWaitQueueLength(ConditionObject condition) | 获取等待队列中的线程大小 |
boolean hasContended() | 是否构建了Sync队列 |
boolean hasQueuedPredecessors() | 判断是否有等待线程 |
boolean hasQueuedThreads() | Sync队列中是否存在排队的节点 |
boolean hasWaiters(ConditionObject condition) | 等待唤醒的队列是否有节点 |
boolean isHeldExclusively() | 由AQS子类实现,是否被当前线程持有 |
boolean isOnSyncQueue(Node node) | 该节点是否在Sync队列中 |
boolean isQueued(Thread thread) | 该线程是否在Sync队列中 |
boolean owns(ConditionObject condition) | 判断该condition是否使用锁 |
boolean parkAndCheckInterrupt() | 阻塞当前线程并获取中断状态 |
boolean release(int arg) | 释放锁 |
boolean releaseShared(int arg) | 释放共享锁 |
void selfInterrupt() | 中断当前线程 |
void setHead(Node node) | 设置头节点 |
void setState(int newState) | 设置锁状态 |
boolean shouldParkAfterFailedAcquire(Node pred, Node node) | 是否应该阻塞当前节点如果没有获取锁 |
boolean transferAfterCancelledWait(Node node) | 在取消阻塞时将condition队列的节点转移到Sync队列中 |
boolean transferForSignal(Node node) | 在唤醒condition节点时将condition队列的节点转移到Sync队列中 |
boolean tryAcquire(int arg) | 由AQS子类实现,尝试获取锁 |
int tryAcquireShared(int arg) | 由AQS子类实现,共享模式下尝试获取锁 |
boolean tryRelease(int arg) | 由AQS子类实现,尝试释放锁 |
boolean tryReleaseShared(int arg) | 由AQS子类实现,共享模式下尝试释放锁 |
void unparkSuccessor(Node node) | 取消阻塞该节点 |
看一下ConditionObject中的方法和属性
方法名 | 作用 |
---|---|
Node addConditionWaiter() | 增加Condition队列节点 |
void await() | 阻塞当前线程 |
boolean await(long time, TimeUnit unit) | 在一定时间内,阻塞当前线程 |
void awaitUninterruptibly() | 阻塞当前线程,不抛出中断异常 |
boolean awaitUntil(Date deadline) | 阻塞当前线程直到某个时间 |
int checkInterruptWhileWaiting(Node node) | 检查当前节点在阻塞过程中是否被中断过 |
void doSignal(Node first) | 唤醒第一个等待的节点 |
void doSignalAll(Node first) | 唤醒所有等待的节点 |
Collection<Thread> getWaitingThreads() | 获取正在等待的线程集合 |
int getWaitQueueLength() | 获取正在等待的线程集合大小 |
boolean hasWaiters() | 判断condition中是否存在等待的节点 |
isOwnedBy(AbstractQueuedSynchronizer sync) | 判断是当前AQS创建的condition |
reportInterruptAfterWait(int interruptMode) | 在等待中如果发生了中断,根据场景是抛出异常还是继续中断 |
reportInterruptAfterWait(int interruptMode) | 在等待中如果发生了中断,根据场景是抛出异常还是继续中断 |
void unlinkCancelledWaiters() | 排除掉队列中已经取消的节点 |
void unlinkCancelledWaiters() | 排除掉队列中已经取消的节点 |
firstWaiter | 队列中的首节点 |
lastWaiter | 队列中的尾节点 |