主要内容有:
Lock接口
队列同步器
重入锁
读写锁
LockSupport工具
Condition接口
1.Lock接口
锁是用来控制多个线程访问共享资源的方式。一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的方法共享资源,比如读写锁)。在 Lock 接口出现之前,Java 程序是靠 synchronized 关键字实现锁功能的,而 Java 5 之后,并发包中新增了 Lock 接口用来实现锁功能,它提供了与 synchronized 关键字类似的同步功能,只是在使用时需要显式的获取和释放锁。虽然它缺少了synchronized 隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种 synchronized 关键字所不具备的同步特性。
Lock API如下图所示:
Lock接口的实现基本都是通过聚合队列同步器AbstractQueuedSynchronizer的子类来完成线程访问控制的。
2.队列同步器AQS
队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的三个方法( getState、setState(int newState)、compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。同步器既可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,这样就可以方便的实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock等等)。
同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义,锁是面向使用者的,它定义使用者与锁交互的接口,隐藏了实现细节;同步器面向的锁的实现者,它简化了锁的实现,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作,锁和同步器很好的隔离了使用者和实现者所需关注的领域。
2.1队列同步器的接口与示例
同步器的设计是基于模板方法模式的,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法最终会调用使用者重写的方法。调动路径为Lock API > 模板方法 > 重写方法。重写同步器指定方法时,需要使用同步器提供的如下三个方法来访问或者修改同步状态:
getState():获取当前同步状态。
setState(int newState):设置当前同步状态。
compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
同步器可重写的方法如下图所示:
同步器提供的模板方法如下图所示:
下面通过一个独占锁的示例来深入了解一下同步器的工作原理。独占锁就是同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于等待队列中等待,只有获取锁的线程释放了锁,后续的线程才能够获取锁,代码如下:
//MutexLock代码
public class MutexLock implements Lock {
private static final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return null;
}
}
//测试代码
public class MutexTest {
private static final String TAG = "MutexTest";
public static void testMutexLock(){
final Lock lock = new MutexLock();
class Worker extends Thread{
@Override
public void run() {
lock.lock();
try{
Thread.sleep(1000);
Log.d(TAG, "ThreadName is: " + Thread.currentThread().getName() + " " + Thread.currentThread().getId());
Thread.sleep(1000);
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
int i =10;
for (int j = 0; j < i; j++) {
Worker worker = new Worker();
worker.start();
}
}
}
//日志如下,可以发现同一时刻只有一个线程获取到锁
test 17:17:23.030 4095-4197/test D/MutexTest: ThreadName is: Thread-2 848
test 17:17:25.030 4095-4198/test D/MutexTest: ThreadName is: Thread-3 849
test 17:17:27.030 4095-4199/test D/MutexTest: ThreadName is: Thread-4 850
test 17:17:29.030 4095-4200/test D/MutexTest: ThreadName is: Thread-5 851
test 17:17:31.030 4095-4201/test D/MutexTest: ThreadName is: Thread-6 852
test 17:17:33.040 4095-4203/test D/MutexTest: ThreadName is: Thread-8 854
test 17:17:35.040 4095-4204/test D/MutexTest: ThreadName is: Thread-9 855
test 17:17:37.040 4095-4205/test D/MutexTest: ThreadName is: Thread-10 856
test 17:17:39.040 4095-4202/test D/MutexTest: ThreadName is: Thread-7 853
test 17:17:41.040 4095-4206/test D/MutexTest: ThreadName is: Thread-11 857
//Lock锁API、模板方法、重写方法调用关系如下:
W/System.err: at .....MutexLock$Sync.tryAcquire(MutexLock.java:18) //重写方法最后调用重写方法
W/System.err: at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1222) //模板方法
W/System.err: at .....MutexLock.lock(MutexLock.java:46) //MutexLock锁API lock()
2.2 队列同步器实现分析
下面从实现角度分析同步器是如何完成线程同步的,主要介绍同步队列和独占式同步状态获取与释放,其它更多内容可以直接阅读《Java并发编程的艺术》。
同步队列:同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器将当前线程以及等待状态等信息构成一个节点并加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。同步队列中的节点用来保存获取的同步状态失败的线程引用、等待状态以及前驱和后继节点。节点是构建同步队列的基础,同步器拥有首节点和尾节点,没有成功获取同步状态的线程将会成为节点加入该队列的尾部。
独占式同步状态获取与释放:同步器通过acquire(int arg)方法获取同步状态,代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代码主要完成了同步状态的获取,节点的构造、加入同步队列以及在同步队列中自旋等相关工作。主要逻辑为:首先调用自定义同步器实现的tryAcquire(arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态),并通过addWaiter(Node node)方法将该节点添加到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得节点以死循环方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。独占式同步状态获取流程如下图所示:
在释放同步状态时,同步器调用tryRelease()方法释放同步状态,然后通过LockSupport来唤醒头结点的后继节点。
自定义同步组件TwinsLock:通过编写一个自定义同步组件来加深对同步器的理解,我们来设计一个同步工具:该工具在同一时刻,只允许最多两个线程同时访问,超过两个线程的访问将被阻塞,TwinsLock同一时刻支持多个线程访问,这显然是共享式访问,这就要求必须重写tryAcquireShared和tryReleaseShared方法,这样才能保证同步器共享式同步状态的获取与释放方法得以执行。同一时刻允许最多两个线程同时访问,表明同步资源数为2,这样设置初始状态status为2,当一个线程进行获取,status减1,该线程释放,则status加1,状态的合理范围是0、1、2。0表示已经有两个线程获取了同步状态,再有线程获取时,该线程只能阻塞,在同步状态更新时,需要使用compareAndSet方法做原子性保障。TwinsLock代码如下所示:
//TwinsLock代码
public class TwinsLock implements Lock {
//同一时刻只允许两个线程访问
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer{
Sync(int arg) {
if(arg < 0){
throw new IllegalArgumentException("arg must large than zero");
}
setState(arg);
}
@Override
protected int tryAcquireShared(int arg) {
for(;;){
int count = getState();
int newCount = count - arg;
if(newCount < 0 || compareAndSetState(count,newCount)){
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for(;;){
int count = getState();
int newCount = count + arg;
if(compareAndSetState(count,newCount)){
//成功释放锁
Log.d(TwinsLockTest.TAG, "ThreadName:" + Thread.currentThread().getName() + " release lock" );
return true;
}
}
}
Condition newCondition(){
return new ConditionObject();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}
@Override
public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {
return false;
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
//测试代码
public class TwinsLockTest {
public static final String TAG = "TwinsLockTest";
public static void testLock(){
final Lock lock = new TwinsLock();
class Worker extends Thread{
@Override
public void run() {
//获取锁
lock.lock();
try{
Thread.sleep(1000);
Log.d(TAG, "ThreadName:" + Thread.currentThread().getName() + " has lock");
Thread.sleep(1000);
}catch (Exception e){
}finally {
//释放锁
lock.unlock();
}
}
}
int i =10;
for (int j = 0; j < i; j++) {
Worker worker = new Worker();
worker.start();
}
}
}
//打印日志,可以看出同一时刻只有两个线程获取了锁,当持有锁的线程释放锁之后,两个新的线程才可以获取锁并运行
20:18:59.360 31360-31444/... D/TwinsLockTest: ThreadName:Thread-3 has lock
20:18:59.360 31360-31443/... D/TwinsLockTest: ThreadName:Thread-2 has lock
20:19:00.360 31360-31443/... D/TwinsLockTest: ThreadName:Thread-2 release lock
20:19:00.360 31360-31444/... D/TwinsLockTest: ThreadName:Thread-3 release lock
20:19:01.360 31360-31445/... D/TwinsLockTest: ThreadName:Thread-4 has lock
20:19:01.360 31360-31446/... D/TwinsLockTest: ThreadName:Thread-5 has lock
20:19:02.360 31360-31446/... D/TwinsLockTest: ThreadName:Thread-5 release lock
20:19:02.360 31360-31445/... D/TwinsLockTest: ThreadName:Thread-4 release lock
20:19:03.370 31360-31447/... D/TwinsLockTest: ThreadName:Thread-6 has lock
20:19:03.370 31360-31448/... D/TwinsLockTest: ThreadName:Thread-7 has lock
20:19:04.370 31360-31447/... D/TwinsLockTest: ThreadName:Thread-6 release lock
20:19:04.370 31360-31448/... D/TwinsLockTest: ThreadName:Thread-7 release lock
20:19:05.370 31360-31449/... D/TwinsLockTest: ThreadName:Thread-8 has lock
20:19:05.370 31360-31450/... D/TwinsLockTest: ThreadName:Thread-9 has lock
20:19:06.370 31360-31449/... D/TwinsLockTest: ThreadName:Thread-8 release lock
20:19:06.370 31360-31450/... D/TwinsLockTest: ThreadName:Thread-9 release lock
20:19:07.370 31360-31452/... D/TwinsLockTest: ThreadName:Thread-11 has lock
20:19:07.370 31360-31451/... D/TwinsLockTest: ThreadName:Thread-10 has lock
20:19:08.380 31360-31451/... D/TwinsLockTest: ThreadName:Thread-10 release lock
20:19:08.380 31360-31452/... D/TwinsLockTest: ThreadName:Thread-11 release lock
3.重入锁
重入锁ReetrantLock,就是支持重入的锁,它表示该锁能够支持一个线程对资源的重复加锁,除此之外还支持获取锁时的公平和非公平性选择。ReentrantLock 虽然没能像 synchronized 关键字一样支持隐式的重进入,但是在调用 lock 方法时,已经获取到锁的线程,能够再次调用 lock 方法获取锁而不被阻塞。公平的获取锁即锁获取是顺序的,ReentrantLock 提供了一个构造方法,能够控制锁获取是否是公平的。事实上,公平锁机制往往没有非公平的效率高。读写锁ReentrantReadWriteLock构造函数可以选择使用公平锁还是非公平锁,如下代码所示:
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被自己阻塞,该特性需要解决以下两个问题:
(1)线程再次获取锁:锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
(2)锁的最终释放:线程重复n次获取锁,就需要n次的解锁。
4.读写锁
4.1读写介绍及示例
Java 并发包提供读写锁的实现是 ReentrantReadWriteLock。之前提到的锁基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。读写锁使用示例如下:
public class ReentrantLockDemo {
static Map<String,Object> map=new HashMap<>();
static ReentrantReadWriteLock reentrantReadWriteLock=new ReentrantReadWriteLock();
static Lock readLock=reentrantReadWriteLock.readLock();
static Lock writeLock=reentrantReadWriteLock.writeLock();
//获取一个 key 对应的 value
public static final Object get(String key){
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
//设置 key 对应的 value,并返回旧的 value
public static final Object put(String key,Object value){
writeLock.lock();
try {
return map.put(key,value);
} finally {
writeLock.unlock();
}
}
//清空所有数据
public static final void clear(){
writeLock.lock();
try {
map.clear();
} finally {
writeLock.unlock();
}
}
}
4.2读写锁实现分析
ReentrantReadWriteLock的实现主要包括:读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级。本文主要介绍读写状态设计,其它内容可参考《Java并发编程的艺术》或直接阅读源码。
读写状态的设计:读写锁依赖同步器来实现同步功能,而读写状态就是其同步器的同步状态,ReentrantLock中同步状态表示锁被一个线程重复获取的次数,而读写锁自定义同步器需要再同步状态(一个整型变量)上维护多个读线程和一个写线程状态,该状态称为读写锁实现的关键。如果一个整型变量上维护多种状态,就一定需要按位切割使用这个变量,读写锁将变量切分成两个部分,高16位表示读,低16位表示写,划分方式如下图所示:
读写锁如何迅速确定读和写的状态呢?答案是通过位运算,将设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位抹去),读状态等于S>>>16(无符号右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。
在线程池中也是采用int分位表示线程池状态和线程数量,即采用ctl变量来表示保存线程池状态以及线程个数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl用来控制线程池的状态,并用来表示线程池线程数量。ctl类型为AtomicInteger,那用一个基础如何表示以上五种状态以及线程池工作线程数量呢?int型变量占用4字节,共32位,因此采用位表示,可以解决上述问题。5种状态使用5种数值进行表示,需要占用3位,余下的29位就可以用来表示线程数。因此,高三位表示进程状态,低29位为线程数量,所以线程池最大线程个数是2的29次方。
将|与&结合起来,可以使用int分位也可以存放多个boolean型变量,例如一个类,有一个属性是用boolean表示,隔了一段时间,又需要新加一个boolean表示新的属性,如果需要增加很多boolean变量存放新的属性,可以采用int分位来解决该问题,简单来说就是用一个int型的每一位表示一个boolean型的属性,通过|来设置属性,通过&来获取属性,每个属性的值都是2的n次方,如下代码所示:
public static final int IS_STUDENT = 1;
public static final int IS_ADULT = 2;
public static final int IS_SINGLE = 4;
//使用properties来存放多个boolean变量
private int properties;
//使用properties属性
public void setProperties(int properties) {
this.properties = properties;
}
//提取properties属性
public boolean isStudent() {
return (properties & IS_STUDENT) != 0;
}
public boolean isAdult() {
return (properties & IS_ADULT) != 0;
}
public boolean isSingle() {
return (properties & IS_SINGLE) != 0;
}
5.LockSupport工具
当需要阻塞或者唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作,LockSupport 定义了一组公共的静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而 LockSupport 也成为了构建同步组件的基础工具。 LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。LockSupport提供的阻塞和唤醒方法下图所示:
6.Condition接口
任意一个Java对象,都拥有一组监视器方法(定义在 java.long.Object 上),主要包括 wait()、wait(long timeout)、notify、以及 notifyAll 方法,这些方法与 synchronized 同步关键字配合,可以实现等待 / 通知模式。Condition 接口也提供了类似 Object 的监视器方法,与 Lock 配合可以实现等待/通知模式。
Object支持一个等待队列,Condition可以直接多个等待队列。
参考资料
《Java并发编程的艺术》