一、 Concurrent包的实现原理:
由于Java的CAS
同时具有volatile
读和volatile
写的内存语义,因此Java
线程之间的通信现在有了下面4种方式。
1)A线程写volatile
变量,随后B线程读这个volatile
变量。
2)A线程写volatile
变量,随后B线程用CAS更新这个volatile
变量。
3)A线程用CAS
更新一个volatile
变量,随后B线程用CAS
更新这个volatile变量。
4)A线程用CAS
更新一个volatile
变量,随后B线程读这个volatile
变量。
Java的CAS
会使用现代处理器上提供的高效机器级别的原子指令,这些原子指令以原子方式对内存执行读-改-写操作,这是在多处理器中实现同步的关键(从本质上来说,能够支持原子性读-改-写指令的计算机,是顺序计算图灵机的异步等价机器,因此任何现代的多处理器都会去支持某种能对内存执行原子性读-改-写操作的原子指令)。同时,volatile
变量的读/写和CAS
可以实现线程之间的通信。把这些特性整合在一起,就形成了整个concurrent
包得以实现的基石。如果我们仔细分析concurrent
包的源代码实现,会发现一个通用化的实现模式。
首先,声明共享变量为volatile
。
然后,使用CAS
的原子条件更新来实现线程之间的同步。
同时,配合以volatile
的读/写和CAS
所具有的volatile
读和写的内存语义来实现线程之间的通信。
AQS
,非阻塞数据结构和原子变量类(java.util.concurrent.atomic
包中的类),这些concurrent
包中的基础类都是使用这种模式来实现的,而concurrent
包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent
包的实现示意图如下所示。
二、Lock简介:
我们下来看concurent
包下的lock
子包。锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源。在Lock
接口出现之前,java
程序主要是靠synchronized
关键字实现锁功能的,而java5之后,并发包中增加了Lock
接口,它提供了与synchronized一样的锁功能。虽然它失去了像synchronize
关键字隐式加锁解锁的便捷性,但是却拥有了锁获取和释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。通常使用显示使用lock的形式如下:
Lock lock = new ReentrantLock();
lock.lock();
try {
.......
} finally {
lock.unlock();
}
需要注意的是synchronized
同步块执行完成或者遇到异常是锁会自动释放,而Lock
必须调用unlock()
方法释放锁,因此在finally块中释放锁。
2.1 Lock接口API
我们现在就来看看Lock
接口定义了哪些方法:
void lock(); //获取锁
void lockInterruptibly() throws InterruptedException;//获取锁的过程能够响应中断
boolean tryLock();//非阻塞式响应中断能立即返回,获取锁放回true反之返回fasle
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//超时获取锁,在超时内或者未中断的情况下能够获取锁
Condition newCondition();//获取与lock绑定的等待通知组件,当前线程必须获得了锁才能进行等待,进行等待时会先释放锁,当再次获取锁时才能从等待中返回
上面是Lock
接口下的五个方法,也只是从源码中英译中翻译了一遍,感兴趣的可以自己的去看看。那么在locks
包下有哪些类实现了该接口了?先从最熟悉的ReentrantLock
说起。
public class ReentrantLock implements Lock, java.io.Serializable
很显然ReentrantLock
实现了Lock
接口,接下来我们来仔细研究一下它是怎样实现的。当你查看源码时你会惊讶的发现ReentrantLock
并没有多少代码,另外有一个很明显的特点是:基本上所有的方法的实现实际上都是调用了其静态内存类Sync
中的方法,而Sync
类继承了AbstractQueuedSynchronizer(AQS)
。可以看出要想理解ReentrantLock
关键核心在于对队列同步器AbstractQueuedSynchronizer(简称同步器)
的理解。
2.2 AQS浅析
关于AQS在源码中有十分具体的解释:
/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
*
* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class. Class
* {@code AbstractQueuedSynchronizer} does not implement any
* synchronization interface. Instead it defines methods such as
* {@link #acquireInterruptibly} that can be invoked as
* appropriate by concrete locks and related synchronizers to
* implement their public methods.
*
* <p>This class supports either or both a default <em>exclusive</em>
* mode and a <em>shared</em> mode. When acquired in exclusive mode,
* attempted acquires by other threads cannot succeed. Shared mode
* acquires by multiple threads may (but need not) succeed. This class
* does not "understand" these differences except in the
* mechanical sense that when a shared mode acquire succeeds, the next
* waiting thread (if one exists) must also determine whether it can
* acquire as well. Threads waiting in the different modes share the
* same FIFO queue. Usually, implementation subclasses support only
* one of these modes, but both can come into play for example in a
* {@link ReadWriteLock}. Subclasses that support only exclusive or
* only shared modes need not define the methods supporting the unused mode.
*
* <p>This class defines a nested {@link ConditionObject} class that
* can be used as a {@link Condition} implementation by subclasses
* supporting exclusive mode for which method {@link
* #isHeldExclusively} reports whether synchronization is exclusively
* held with respect to the current thread, method {@link #release}
* invoked with the current {@link #getState} value fully releases
* this object, and {@link #acquire}, given this saved state value,
* eventually restores this object to its previous acquired state. No
* {@code AbstractQueuedSynchronizer} method otherwise creates such a
* condition, so if this constraint cannot be met, do not use it. The
* behavior of {@link ConditionObject} depends of course on the
* semantics of its synchronizer implementation.
*
* <p>This class provides inspection, instrumentation, and monitoring
* methods for the internal queue, as well as similar methods for
* condition objects. These can be exported as desired into classes
* using an {@code AbstractQueuedSynchronizer} for their
* synchronization mechanics.
*
* <p>Serialization of this class stores only the underlying atomic
* integer maintaining state, so deserialized objects have empty
* thread queues. Typical subclasses requiring serializability will
* define a {@code readObject} method that restores this to a known
* initial state upon deserialization.
*
* <h3>Usage</h3>
*
* <p>To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
* #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* </ul>
*
* Each of these methods by default throws {@link
* UnsupportedOperationException}. Implementations of these methods
* must be internally thread-safe, and should in general be short and
* not block. Defining these methods is the <em>only</em> supported
* means of using this class. All other methods are declared
* {@code final} because they cannot be independently varied.
*
* <p>You may also find the inherited methods from {@link
* AbstractOwnableSynchronizer} useful to keep track of the thread
* owning an exclusive synchronizer. You are encouraged to use them
* -- this enables monitoring and diagnostic tools to assist users in
* determining which threads hold locks.
*
* <p>Even though this class is based on an internal FIFO queue, it
* does not automatically enforce FIFO acquisition policies. The core
* of exclusive synchronization takes the form:
*
* <pre>
* Acquire:
* while (!tryAcquire(arg)) {
* <em>enqueue thread if it is not already queued</em>;
* <em>possibly block current thread</em>;
* }
*
* Release:
* if (tryRelease(arg))
* <em>unblock the first queued thread</em>;
* </pre>
*
* (Shared mode is similar but may involve cascading signals.)
*
* <p id="barging">Because checks in acquire are invoked before
* enqueuing, a newly acquiring thread may <em>barge</em> ahead of
* others that are blocked and queued. However, you can, if desired,
* define {@code tryAcquire} and/or {@code tryAcquireShared} to
* disable barging by internally invoking one or more of the inspection
* methods, thereby providing a <em>fair</em> FIFO acquisition order.
* In particular, most fair synchronizers can define {@code tryAcquire}
* to return {@code false} if {@link #hasQueuedPredecessors} (a method
* specifically designed to be used by fair synchronizers) returns
* {@code true}. Other variations are possible.
*
* <p>Throughput and scalability are generally highest for the
* default barging (also known as <em>greedy</em>,
* <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
* While this is not guaranteed to be fair or starvation-free, earlier
* queued threads are allowed to recontend before later queued
* threads, and each recontention has an unbiased chance to succeed
* against incoming threads. Also, while acquires do not
* "spin" in the usual sense, they may perform multiple
* invocations of {@code tryAcquire} interspersed with other
* computations before blocking. This gives most of the benefits of
* spins when exclusive synchronization is only briefly held, without
* most of the liabilities when it isn't. If so desired, you can
* augment this by preceding calls to acquire methods with
* "fast-path" checks, possibly prechecking {@link #hasContended}
* and/or {@link #hasQueuedThreads} to only do so if the synchronizer
* is likely not to be contended.
*
* <p>This class provides an efficient and scalable basis for
* synchronization in part by specializing its range of use to
* synchronizers that can rely on {@code int} state, acquire, and
* release parameters, and an internal FIFO wait queue. When this does
* not suffice, you can build synchronizers from a lower level using
* {@link java.util.concurrent.atomic atomic} classes, your own custom
* {@link java.util.Queue} classes, and {@link LockSupport} blocking
* support.
*
* <h3>Usage Examples</h3>
*
* <p>Here is a non-reentrant mutual exclusion lock class that uses
* the value zero to represent the unlocked state, and one to
* represent the locked state. While a non-reentrant lock
* does not strictly require recording of the current owner
* thread, this class does so anyway to make usage easier to monitor.
* It also supports conditions and exposes
* one of the instrumentation methods:
*
* <pre> {@code
* class Mutex implements Lock, java.io.Serializable {
*
* // Our internal helper class
* private static class Sync extends AbstractQueuedSynchronizer {
* // Reports whether in locked state
* protected boolean isHeldExclusively() {
* return getState() == 1;
* }
*
* // Acquires the lock if state is zero
* public boolean tryAcquire(int acquires) {
* assert acquires == 1; // Otherwise unused
* if (compareAndSetState(0, 1)) {
* setExclusiveOwnerThread(Thread.currentThread());
* return true;
* }
* return false;
* }
*
* // Releases the lock by setting state to zero
* protected boolean tryRelease(int releases) {
* assert releases == 1; // Otherwise unused
* if (getState() == 0) throw new IllegalMonitorStateException();
* setExclusiveOwnerThread(null);
* setState(0);
* return true;
* }
*
* // Provides a Condition
* Condition newCondition() { return new ConditionObject(); }
*
* // Deserializes properly
* private void readObject(ObjectInputStream s)
* throws IOException, ClassNotFoundException {
* s.defaultReadObject();
* setState(0); // reset to unlocked state
* }
* }
*
* // The sync object does all the hard work. We just forward to it.
* private final Sync sync = new Sync();
*
* public void lock() { sync.acquire(1); }
* public boolean tryLock() { return sync.tryAcquire(1); }
* public void unlock() { sync.release(1); }
* public Condition newCondition() { return sync.newCondition(); }
* public boolean isLocked() { return sync.isHeldExclusively(); }
* public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
* public void lockInterruptibly() throws InterruptedException {
* sync.acquireInterruptibly(1);
* }
* public boolean tryLock(long timeout, TimeUnit unit)
* throws InterruptedException {
* return sync.tryAcquireNanos(1, unit.toNanos(timeout));
* }
* }}</pre>
*
* <p>Here is a latch class that is like a
* {@link java.util.concurrent.CountDownLatch CountDownLatch}
* except that it only requires a single {@code signal} to
* fire. Because a latch is non-exclusive, it uses the {@code shared}
* acquire and release methods.
*
* <pre> {@code
* class BooleanLatch {
*
* private static class Sync extends AbstractQueuedSynchronizer {
* boolean isSignalled() { return getState() != 0; }
*
* protected int tryAcquireShared(int ignore) {
* return isSignalled() ? 1 : -1;
* }
*
* protected boolean tryReleaseShared(int ignore) {
* setState(1);
* return true;
* }
* }
*
* private final Sync sync = new Sync();
* public boolean isSignalled() { return sync.isSignalled(); }
* public void signal() { sync.releaseShared(1); }
* public void await() throws InterruptedException {
* sync.acquireSharedInterruptibly(1);
* }
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/
同步器是用来构建锁和其他同步组件的基础框架,它的实现主要依赖一个int成员变量来表示同步状态以及通过一个FIFO
队列构成等待队列。它的子类必须重写AQS
的几个protected
修饰的用来改变同步状态的方法,其他方法主要是实现了排队和阻塞机制。状态的更新使用getState,setState
以及compareAndSetState
这三个方法。
子类被推荐定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态的获取和释放方法来供自定义同步组件的使用,同步器既支持独占式获取同步状态,也可以支持共享式获取同步状态,这样就可以方便的实现不同类型的同步组件。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者的关系:锁是面向使用者,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队,等待和唤醒等底层操作。锁和同步器很好的隔离了使用者和实现者所需关注的领域。
2.3 AQS的模板方法设计模式
AQS
的设计是使用模板方法设计模式,它将一些方法开放给子类进行重写,而同步器给同步组件所提供模板方法又会重新调用被子类所重写的方法。举个例子,AQS
中需要重写的方法tryAcquire
:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
ReentrantLock
中NonfairSync
(继承AQS
)会重写该方法为:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
而AQS
中的模板方法acquire()
:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
会调用tryAcquire
方法,而此时当继承AQS
的NonfairSync
调用模板方法acquire
时就会调用已经被NonfairSync
重写的tryAcquire
方法。这就是使用AQS
的方式,在弄懂这点后会lock
的实现理解有很大的提升。可以归纳总结为这么几点:
- 同步组件(这里不仅仅值锁,还包括
CountDownLatch
等)的实现依赖于同步器AQS
,在同步组件实现中,使用AQS
的方式被推荐定义继承AQS的静态内存类; -
AQS
采用模板方法进行设计,AQS
的protected
修饰的方法需要由继承AQS
的子类进行重写实现,当调用AQS
的子类的方法时就会调用被重写的方法; -
AQS
负责同步状态的管理,线程的排队,等待和唤醒这些底层操作,而Lock
等同步组件主要专注于实现同步语义; - 在重写
AQS
的方式时,使用AQS
提供的getState(),setState(),compareAndSetState()
方法进行修改同步状态
AQS可重写的方法如下图(摘自《java并发编程的艺术》一书):
在实现同步组件时AQS提供的模板方法如下图:
AQS
提供的模板方法可以分为3类:
- 独占式获取与释放同步状态;
- 共享式获取与释放同步状态;
- 查询同步队列中等待线程情况;
同步组件通过AQS
提供的模板方法实现自己的同步语义。
三、一个例子
下面使用一个例子来进一步理解下AQS
的使用。这个例子也是来源于AQS
源码中的example。
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
// 继承AQS的静态内存类
// 重写方法
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
//使用同步器的模板方法实现自己的同步语义
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
MutexDemo:
public class MutextDemo {
private static Mutex mutex = new Mutex();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
mutex.lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.unlock();
}
});
thread.start();
}
}
}
执行情况:
上面的这个例子实现了独占锁的语义,在同一个时刻只允许一个线程占有锁。 MutexDemo
新建了10个线程,分别睡眠3s。从执行情况也可以看出来当前Thread-0
正在执行占有锁而其他Thread-1
,Thread-2
等线程处于WAIT
状态。按照推荐的方式,Mutex
定义了一个继承AQS
的静态内部类Sync,并且重写了AQS
的tryAcquire
等等方法,而对state
的更新也是利用了setState()
,getState()
,compareAndSetState()
这三个方法。在实现实现Lock
接口中的方法也只是调用了AQS
提供的模板方法(因为Sync
继承AQS
)。从这个例子就可以很清楚的看出来,在同步组件的实现上主要是利用了AQS
,而AQS
“屏蔽”了同步状态的修改,线程排队等底层实现,通过AQS
的模板方法可以很方便的给同步组件的实现者进行调用。而针对用户来说,只需要调用同步组件提供的方法来实现并发编程即可。同时在新建一个同步组件时需要把握的两个关键点是:
- 实现同步组件时推荐定义继承
AQS
的静态内部类,并重写需要的protected
修饰的方法; - 同步组件语义的实现依赖于
AQS
的模板方法,而AQS
模板方法又依赖于被AQS
的子类所重写的方法。
通俗点说,因为AQS
整体设计思路采用模板方法设计模式,同步组件以及AQS
的功能实际上别切分成各自的两部分:
同步组件实现者的角度:
通过可重写的方法:独占式:tryAcquire()
(独占式获取同步状态),tryRelease()
(独占式释放同步状态);共享式 :tryAcquireShared()(共享式获取同步状态),tryReleaseShared()
(共享式释放同步状态);告诉AQS
怎样判断当前同步状态是否成功获取或者是否成功释放。同步组件专注于对当前同步状态的逻辑判断,从而实现自己的同步语义。这句话比较抽象,举例来说,上面的Mutex
例子中通过tryAcquire
方法实现自己的同步语义,在该方法中如果当前同步状态为0(即该同步组件没被任何线程获取),当前线程可以获取同时将状态更改为1返回true,否则,该组件已经被线程占用返回false。很显然,该同步组件只能在同一时刻被线程占用,Mutex
专注于获取释放的逻辑来实现自己想要表达的同步语义。
AQS的角度
而对AQS
来说,只需要同步组件返回的true和false即可,因为AQS
会对true和false会有不同的操作,true会认为当前线程获取同步组件成功直接返回,而false的话就AQS
也会将当前线程插入同步队列等一系列的方法。
总的来说,同步组件通过重写AQS
的方法实现自己想要表达的同步语义,而AQS
只需要同步组件表达的true和false即可,AQS
会针对true和false不同的情况做不同的处理,至于底层实现,可以看下一篇文章。
参考文献
《java并发编程的艺术》