一. 基础
锁是用来控制多个线程访问共享资源的方式。
- Java SE 5 之前,Java程序靠synchronized关键字实现锁功能(隐式获取锁)
- Java SE 5 之后,并发包中新增了Lock接口及相关实现类来实现锁功能(显示获取锁)
1. Lock接口的API(对比synchronized)
方法名称 | 描述 | 备注 |
---|---|---|
void lock() | 获取锁,当获取锁后,从该方法处返回 | 等价于synchronized的加锁 |
void lockInterruptibly() throws InterruptedException | 该方法获取锁时会响应中断,当获取锁的线程被中断时,会抛出InterruptedException | Lock特有 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法后立即返回,如果成功获取锁则返回true,否则返回false | Lock特有 |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException | 超时获取锁,当前线程立刻返回的情形:(1)当前线程在超时时间内获得锁(2)当前线程在超时时间内被中断(3)超时时间结束 | Lock特有 |
void unlock() | 释放锁 | 等价于synchronized的解锁 |
Condition newCondition() | 等待通知组件,该组件和当前的锁绑定,只有获得了锁,才能调用该组件的wait()方法,调用wait()方法后,当前线程进入WAITING状态,释放锁 | 等价于Object.wait()/notify() |
lockInterruptibly示例:
public class Test {
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Thread thread1 = new Thread(new Runner(lock));
Thread thread2 = new Thread(new Runner(lock));
thread1.start();
thread2.start();
TimeUnit.SECONDS.sleep(2);
thread2.interrupt();
}
}
class Runner implements Runnable {
private Lock lock;
public Runner() {
}
public Runner(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
try {
lock.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + " 获得锁");
int i = 0;
while (i < 10) {
i++;
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 被中断");
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " 释放锁");
}
}
}
运行结果:
Thread-0 获得锁
Thread-1 被中断
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at test.test2.Runner.run(Test.java:36)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at test.test2.Runner.run(Test.java:47)
at java.lang.Thread.run(Thread.java:745)
Thread-0 释放锁
public class Test3 {
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Thread thread1 = new Thread(new Runner(lock));
Thread thread2 = new Thread(new Runner(lock));
thread1.start();
thread2.start();
TimeUnit.SECONDS.sleep(2);
thread2.interrupt();
}
}
class Runner implements Runnable {
private Lock lock;
public Runner() {
}
public Runner(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + " 获得锁");
int i = 0;
while (i < 10) {
try {
i++;
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
运行结果:
Thread-0 获得锁
Thread-1 获得锁
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at test.test2.Runner.run(Test3.java:57)
at java.lang.Thread.run(Thread.java:745)
两者对比:
- 利用
lockInterruptibly()
加锁时,如果线程A在获取锁的时候被中断,则会响应此中断,抛出InterruptedException,其他线程执行完成释放锁后,线程A也不会再去竞争锁。 - 利用
synchronized
加锁时,如果线程A在获取锁的时候被中断,不会响应此中断,其他线程执行完成释放锁后,线程A仍然会去竞争锁。
tryLock示例:
public class Test {
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Thread thread0 = new Thread(new TryLockRunner(lock));
Thread thread1 = new Thread(new TryLockRunner(lock));
thread0.start();
TimeUnit.SECONDS.sleep(1);
thread1.start();
TimeUnit.SECONDS.sleep(2);
thread1.interrupt();
}
}
class TryLockRunner implements Runnable {
private Lock lock;
public TryLockRunner() {
}
public TryLockRunner(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
try {
boolean isLocked = lock.tryLock(10, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " 是否成功加锁: " + isLocked);
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " 释放锁");
lock.unlock();
}
}
}
运行结果:
Thread-0 是否成功加锁: true
Thread-1 释放锁
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireNanos(AbstractQueuedSynchronizer.java:936)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1247)
at java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:442)
at test.test2.TryLockRunner.run(Test.java:39)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at test.test2.TryLockRunner.run(Test.java:46)
at java.lang.Thread.run(Thread.java:745)
Thread-0 释放锁
2. 队列同步器
队列同步器(AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件的基础框架。
锁和队列同步器的关系:
- 锁的实现中聚合了队列同步器,利用同步器实现锁的语义。
- 锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节。
- 同步器面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
(1) 队列同步器的接口
- 队列同步器中可重写的方法:
方法名 | 描述 |
---|---|
boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值表示获取成功,反之表示获取失败 |
boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
- 队列同步器中不可重写的模板方法:
方法名 | 描述 |
---|---|
int getState() | 获取同步状态 |
void setState(int newState) | 设置同步状态 |
boolean compareAndSetState(int expect, int update) | CAS设置同步状态 |
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态会进入同步队列,如果此时中断当前线程,该方法将抛出InterruptedException并返回 |
boolean tryAcquireNanos(int arg,long nanos) | 在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内获取到同步状态,则返回true,否则返回false |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态后,将同步队列中的第一个节点包含的线程唤醒 |
void acquireShared(int arg) | 共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别在于同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) | 与acquireShared(int arg)相同,该方法会响应中断 |
boolean tryAcquireSharedNanos(int arg,long nanos) | 在acquireSharedInterruptibly(int arg)的基础上增加了超时限制 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection<Thread> getQueuedThreads() | 获取等待在同步队列上的线程集合 |
(2) 队列同步器分析
- 同步队列
队列同步器使用同步队列(一个FIFO的双向队列)来完成同步状态的管理。
当前线程获取同步状态失败,同步器将当前线程和等待状态等信息构造成一个节点,并将其加入同步队列,同时阻塞当前线程:
首节点是获取同步状态成功的节点,当首节点的线程释放同步状态时,会唤醒后继节点,后继节点获取同步状态成功后将自己设置为首节点:
- 独占式获取 / 释放同步状态
独占式获取同步状态:
- 调用自定义方法tryAcquire(int arg)获取同步状态;
- 如果获取失败,则构造同步节点并加入到同步队列的尾部;
- 同步节点在同步队列中自旋等待获取同步状态。
终止自旋:
前驱节点是头节点,且当前节点获取到同步状态,则设置当前节点为头节点,终止自旋。
// 独占式获取同步状态
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 1. 自定义方法,获取同步状态
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 2. 构造节点,CAS将该节点加入同步队列的尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 3. 自旋获取同步状态,获取成功设置为首节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
独占式释放同步状态:
- 释放同步状态;
- 唤醒后继节点,使后继节点重新尝试获取同步状态。
public final boolean release(int arg) {
// 1. 释放同步状态
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 2. 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 共享式获取 / 释放同步状态
共享式获取同步状态:
- 调用自定义方法tryAcquireShared(arg)获取同步状态,只要返回值大于等于0就表示获取同步状态成功;
- 如果当前线程获取同步状态失败,则构造同步节点加入到同步队列尾部;
- 自旋等待获取同步状态。
终止自旋:
前驱节点是头节点,当前节点尝试获取同步状态,如果返回值大于等于0,则设置当前节点为头节点,终止自旋。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享式释放同步状态:
- 释放同步状态;
- 唤醒后继节点。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- 独占式超时获取同步状态
超时获取同步状态是在支持中断响应的基础上,增加了超时获取的特性。
终止自旋:
- 前驱节点是头节点,当前节点获取同步状态,则设置当前节点为头节点,终止自旋;
- 超时时间到,终止自旋。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
(3) 队列同步器示例
public class LockDemo implements Lock {
// 自定义队列同步器
private static final class Synchronizer extends AbstractQueuedSynchronizer {
/**
* 判断是否占用 0-未占用 1-占用
*
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
/**
* 独占式获取同步状态
*
* @param acquires
* @return
*/
@Override
protected boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 独占式释放同步状态
*
* @param releases
* @return
*/
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
protected Condition newCondition() {
return new ConditionObject();
}
}
// 使用自定义队列同步器
private Synchronizer synchronizer = new Synchronizer();
@Override
public void lock() {
synchronizer.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
synchronizer.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return synchronizer.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return synchronizer.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
synchronizer.release(1);
}
@Override
public Condition newCondition() {
return synchronizer.newCondition();
}
public boolean isLocked() {
return synchronizer.isHeldExclusively();
}
}
(4) 总结
- Java中的锁一般都利用自定义的队列同步器来控制线程访问。
- 队列同步器中利用同步状态state来控制锁的获取和释放。
- 队列同步器中利用一个FIFO的队列来保存等待锁的线程节点。
二. 重入锁
重入锁(ReentrantLock),即支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。
synchronized关键字隐式支持重进入。
ReentrantLock支持获取锁时的公平和非公平性选择。
公平锁机制能够减少“饥饿”发生的概率。
非公平锁机制一般效率更高。
1. 实现重进入
线程获取锁:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
由源码可知:
- 如果state=0,表示还没有线程获取同步状态(即没有线程加锁),此时当前线程利用CAS加锁;
- 如果当前线程和已获取锁的线程是同一线程,则增加同步状态值,即表示当前线程可以继续加锁。
线程释放锁:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
由源码可知:
- 每次执行都会减少相应的同步状态值;
- 当同步状态值减少到0时,表示重复多次加的锁已经全部释放,此时表示锁最终释放成功。
2. 公平与非公平获取锁的区别
公平获取锁:锁的获取顺序严格按照时间顺序,即FIFO。
非公平获取锁:锁的获取不用严格按照时间顺序,线程能利用CAS设置同步状态成功,即加锁成功。
实现上在非公平获取锁的基础上加上对前驱节点的判断,即可实现公平锁。
性能上,公平锁需要进行大量的线程切换,性能损耗大;
非公平锁线程切换极少,吞吐量大,但可能导致“饥饿”。
三. 读写锁
读写锁(ReentrantReadWriteLock)由读锁和写锁组成。
- 读锁:同一时刻可以允许多个读线程访问;
- 写锁:写线程访问时,所有的读线程和其他写线程均被阻塞。
ReentrantReadWriteLock的特性:
- 公平性选择:支持非公平(默认)和公平的锁获取方式;
- 重进入:读锁和写锁都支持重进入;
- 锁降级:获取写锁,再获取读锁,然后才释放之前的写锁,使写锁降级为读锁。
1. 读写锁的实现分析
(1) 读写状态的设计
将同步状态变量分为两部分,高16位表示读,低16位表示写。
假设当前同步状态变量值为S,则:
写状态等于 S & 0x0000FFFF
;读状态等于 S >>> 16
(无符号补零右移16位)。
写状态加1等于 S + 1
;读状态加1等于 S + (1 << 16)
(即S + 0x00010000
)
(2) 写锁的获取与释放
写锁的获取:
- 当前线程获取写锁时,如果读锁已被获取 or 当前线程不是已经获取了写锁的线程,则阻塞当前线程等待;
- 否则表示当前线程可重入,增加写状态。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁的释放:
- 每次释放减少写状态;
- 当写状态等于0时表示写锁已经被释放。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
(3) 读锁的获取与释放
读锁的获取:
- 如果其他线程获取了写锁,则当前线程阻塞等待;
- 否则无论当前线程是否是已获取读锁的线程,都增加读状态。
- 读状态是所有线程获取读锁次数的总和,每个线程各自获取读锁的次数保存在ThreadLocal中。
读锁的释放:
- 读锁的每次释放都减少读状态;
- 当读状态等于0时表示所有线程的读锁已释放。
(4) 锁降级
锁降级指写锁降级为读锁。
获取写锁 --> 释放写锁 --> 获取读锁:不是锁降级。
获取写锁 --> 获取读锁 --> 释放写锁:是锁降级。
注意:
在一次处理过程中既有写操作,又有读操作,必须使用锁降级,否则处理过程中的数据会被修改。
原因:
- 如果当前线程A不获取读锁,而直接释放写锁的话,此时如果另一个线程B获取了写锁并修改了数据,然后线程B释放写锁,那么当线程A继续执行,加读锁并读取数据时,所读取的数据已经被线程B修改。
- 如果当前线程A使用锁降级,先获取读锁,那么线程B的写操作会被阻塞直到线程A释放读锁。
示例:
- 使用锁降级:
public class ReadWriteLockDemo {
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
private Map<String, String> cache = new HashMap<>();
public void readWriteTest(String key, String value) {
// 写缓存
writeLock.lock();
try {
cache.put(key, value);
System.out.println(Thread.currentThread().getName() + " PUT key: " + key + " , value: " + value);
// 锁降级
readLock.lock();
} finally {
writeLock.unlock();
}
// Sleep
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 读缓存
try {
System.out.println(Thread.currentThread().getName() + " GET key: " + key + " , value: " + cache.get(key));
} finally {
readLock.unlock();
}
}
public static void main(String[] args) {
ReadWriteLockDemo demo = new ReadWriteLockDemo();
Thread thread0 = new Thread(new Runnable() {
@Override
public void run() {
demo.readWriteTest("name", "zhangsan");
}
});
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
demo.readWriteTest("name", "lisi");
}
});
thread0.start();
thread1.start();
}
}
执行结果:
Thread-0 PUT key: name , value: zhangsan
Thread-0 GET key: name , value: zhangsan
Thread-1 PUT key: name , value: lisi
Thread-1 GET key: name , value: lisi
- 不使用锁降级:
public class ReadWriteLockDemo {
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
private Map<String, String> cache = new HashMap<>();
public void readWriteTest(String key, String value) {
// 写缓存
writeLock.lock();
try {
cache.put(key, value);
System.out.println(Thread.currentThread().getName() + " PUT key: " + key + " , value: " + value);
} finally {
writeLock.unlock();
}
// Sleep
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 读缓存
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " GET key: " + key + " , value: " + cache.get(key));
} finally {
readLock.unlock();
}
}
public static void main(String[] args) {
ReadWriteLockDemo demo = new ReadWriteLockDemo();
Thread thread0 = new Thread(new Runnable() {
@Override
public void run() {
demo.readWriteTest("name", "zhangsan");
}
});
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
demo.readWriteTest("name", "lisi");
}
});
thread0.start();
thread1.start();
}
}
执行结果:
Thread-0 PUT key: name , value: zhangsan
Thread-1 PUT key: name , value: lisi
Thread-0 GET key: name , value: lisi
Thread-1 GET key: name , value: lisi
四. LockSupport工具
同步组件使用LockSupport实现阻塞和唤醒一个线程的工作。
以park开头的一组方法:用来阻塞当前线程;
unpark(Thread thread)方法:用来唤醒一个被阻塞的线程。
五. Condition接口
每个Java对象(定义在java.lang.Object上)都拥有一组监视器方法(wait / notify),配合synchronized关键字,实现通知 / 等待。
每个Condition接口都提供了一组类似监视器的方法(await / signal),配合Lock,实现通知 / 等待。
对比项 | Object Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象的锁 | Lock.lock()获取锁;Lock.newCondition()获取Condition对象 |
调用方式 | 直接调用object.wait() | 直接调用condition.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
1. Condition 接口
方法名 | 描述 |
---|---|
void await() throws InterruptedException | 当前线程进入等待状态直到被通知或中断 |
void awaitUninterruptibly() | 当前线程进入等待状态直到被通知,对中断不敏感 |
long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或超时 |
boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或到某个时间 |
void signal() | 唤醒一个等待在Condition上的线程 |
void singalAll() | 唤醒所有等待在Condition上的线程 |
2. Condition 实现分析
ConditionObject是队列同步器 AbstractQueuedSynchronizer的内部类。
(1) 等待队列
等待队列是一个FIFO的队列。
在Object的监视器模型上,一个对象拥有一个同步队列和一个等待队列;
而并发包中的Lock(队列同步器)拥有一个同步队列和多个等待队列。
(2) 等待
调用Condition的await()方法(或者以await开头的方法),会使当前线程释放锁,并进入等待队列,同时线程状态变为等待状态。
(3) 通知
- 调用Condition的signal()方法,将会唤醒在等待队列中的首节点,并将该节点移到同步队列中;
- 在同步队列中,该节点竞争获取锁,如果成功获取锁,被唤醒的线程将从先前调用的await()方法处返回。