同步器剖析
即使许多同步器(锁,信号量,阻塞队列等)的功能不同,它们的内部设计通常却差不多。换句话说,它们在内部由相同(或类似)的基本部分组成。在设计同步器时,了解这些基本部件可能会有很大帮助。
注意:本文的内容是Jakob Jenkov, Toke Johansen and Lars Bjørn于2004年春季在IT University of Copenhagen开展的学生项目M.Sc.的一部分结果。在这个项目中,我们询问了Doug Lea是否知道类似的工作。有趣的是,他在Java 5并发组件的开发过程中也得出了类似的结论。我相信,Doug Lea的工作在“Java Concurrency in Practice”一书中有所描述 ,这本书还包含一个标题为"Anatomy of a Synchronizer"的章节,内容与本文类似,但不完全相同。
大多数(可能不是全部)同步器的目的是保护代码的某些区域(临界区)不受线程的并发访问的影响。为此,在同步器中一般需要以下部件:
并非所有的同步器都具有所有这些部件,并且有的部件可能和这里描述的部件会略有不同。通常,你可以找到这些部分中的一个或多个。
状态 State
访问条件使用同步器的状态来确定是否可以授予线程访问权限。在锁中,状态保存在boolean
中表示是否Lock
被锁定。在有界信号量中,内部状态保存在一个计数器(int)和一个上限(int)中,表示当前“take”的数量和“take”的最大数量。在阻塞队列中,状态保存在List
中的元素和最大队列大小(int)成员(如果有的话)中。
这里有Lock
和BoundedSemaphore
的两个代码片段,state代码上加入了注释:
public class Lock{
//state is kept here
private boolean isLocked = false;
public synchronized void lock()
throws InterruptedException{
while(isLocked){
wait();
}
isLocked = true;
}
...
}
public class BoundedSemaphore {
//state is kept here
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
this.signal++;
this.notify();
}
...
}
访问条件 Access Condition
访问条件决定了是否允许调用了test-and-set-state方法的线程设置状态。访问条件通常基于同步器的state,通常在while循环中检查访问条件以防止 虚假唤醒。当检查访问条件的值时,它要么是true要么是false。
在锁中,访问条件只检查isLocked
成员变量的值 。在有界信号量 中,实际上有两种访问条件,这取决于你是否在尝试“获取”或“释放”信号量。如果线程尝试获取信号量,signals
变量则会针对上限值检查。如果线程试图释放信号量,signals
变量则针对0检查。
以下是Lock
和BoundedSemaphore
的两个代码片段,其中访问条件加入了注释。注意总是在while循环中检查条件:
public class Lock{
private boolean isLocked = false;
public synchronized void lock() throws InterruptedException{
//access condition
while(isLocked){
wait();
}
isLocked = true;
}
...
}
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
//access condition
while(this.signals == bound) wait();
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
//access condition
while(this.signals == 0) wait();
this.signals--;
this.notify();
}
}
状态改变
一旦线程获得对临界区的访问权限,它就必须改变同步器的状态以(可能)阻止其他线程进入它。换句话说,状态需要反映一个线程正在临界区内执行的事实,这应该能影响其他尝试访问临界区的线程的访问条件。
在锁中,状态更改是代码设置isLocked = true
。在信号量中,它是代码signals--
或signals++
;
以下是两个代码片段,状态更改代码加入了注释:
public class Lock{
private boolean isLocked = false;
public synchronized void lock() throws InterruptedException{
while(isLocked){
wait();
}
//state change
isLocked = true;
}
public synchronized void unlock(){
//state change
isLocked = false;
notify();
}
}
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
//state change
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
//state change
this.signals--;
this.notify();
}
}
通知策略
一旦线程改变了同步器的状态,它有时可能需要通知其他等待线程状态发生了改变。也许这种状态改变可能会使其他线程的访问条件的值为true。
通知策略通常分为三类。
- 通知所有等待的线程。
- 随机通知N个等待线程中的一个。
- 通知N个等待线程中指定的一个。
通知所有等待的线程非常简单。所有等待的线程都在同一个对象调用wait()
。一旦线程想要通知所有等待的线程,它只需调用这个对象的notifyAll()
方法。
通知单个随机等待线程也很容易,只需在同一个对象上调用notify()
即可通知等待线程。调用notify()
不保证将通知哪个等待线程,因此,这个术语叫做“随机等待线程”。
有时你可能需要通知特定而不是随机的等待线程。例如,如果你需要保证以特定顺序通知等待线程,这个顺序称为同步器中的顺序,或者某些优先顺序。为了实现这一点,每个等待的线程必须在它自己的独立对象上调用wait()
。当通知线程想要通知特定的等待线程时,它可以调用指定线程自己的独立对象的notify()
。这方面的一个例子可以在“ 饥饿与公平 ”一文中找到。
下面是一个代码片段,其中包含以注释标记的通知策略(通知一个随机等待线程):
public class Lock{
private boolean isLocked = false;
public synchronized void lock() throws InterruptedException{
while(isLocked){
//wait strategy - related to notification strategy
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify(); //notification strategy
}
}
Test and Set 方法
同步器通常有两种类型的方法,test-and-set
方法是第一种类型(set
方法是另一种)。Test-and-set意味着调用此方法的线程根据访问条件测试同步器的内部状态。如果满足条件,则线程设置同步器的内部状态以反映线程已获得访问权限。
状态改变通常导致其他试图获得访问权的线程的访问条件变为false,但可能并非总是如此。例如,在读写锁中,获取读取访问权限的线程将更新读写锁定的状态以反映这一点,但只要没有线程请求写入访问权限,其他请求读取访问权限的线程也将被授予访问权限。
必须以原子方式执行test-and-set操作,这意味着在测试和设置状态之间不允许其他线程执行test-and-set方法。
test-and-set方法的程序流程通常如下:
- 必要时在测试前设置状态
- 针对访问条件测试状态
- 如果不满足访问条件,等待
- 如果满足访问条件,设置状态,并在必要时通知等待线程
下面显示的ReadWriteLock类的lockWrite()
方法是一个测试和设置方法的示例。调用lockWrite()
的线程首先在test之前设置状态(writeRequests++
),然后它根据canGrantWriteAccess()
方法中的访问条件测试内部状态。如果测试成功,则在退出方法之前再次设置内部状态。注意,此方法不会通知等待的线程。
public class ReadWriteLock{
private Map<Thread, Integer> readingThreads =
new HashMap<Thread, Integer>();
private int writeAccesses = 0;
private int writeRequests = 0;
private Thread writingThread = null;
...
public synchronized void lockWrite() throws InterruptedException{
// 1 必要时在测试前设置状态
writeRequests++;
Thread callingThread = Thread.currentThread();
// 2 针对访问条件测试状态
while(! canGrantWriteAccess(callingThread)){
// 3 如果不满足访问条件,等待
wait();
}
// 4 如果满足访问条件,设置状态
writeRequests--;
writeAccesses++;
writingThread = callingThread;
}
...
}
下面显示的BoundedSemaphore
类有两个测试和设置方法:take()
和 release()
。两种方法都测试并设置内部状态。
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
// 2
while(this.signals == bound) wait(); // 3
// 4
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
// 2
while(this.signals == 0) wait(); // 3
// 4
this.signals--;
this.notify();
}
}
Set 方法
set方法是同步器通常包含的第二种方法。set方法只是设置同步器的内部状态而不先测试它。set方法的典型示例是Lock
类的unlock()
方法。持有锁的线程可以始终解锁,而无需测试Lock
是否已经被解锁。
set方法的程序流程通常如下:
- 设置内部状态
- 通知等待线程
下面是一个unlock()
方法的示例:
public class Lock{
private boolean isLocked = false;
public synchronized void unlock(){
//set
isLocked = false;
notify();
}
}
非阻塞算法
并发上下文中的非阻塞算法是允许线程访问共享状态(或以其他方式协作或通信)而不阻塞所涉及的线程的算法。更一般地说,如果一个线程的暂停不会导致算法中涉及的其他线程暂停,则称算法是非阻塞的。
为了更好地理解阻塞和非阻塞并发算法之间的区别,我将首先解释阻塞算法,然后再解释非阻塞算法。
阻塞并发算法
阻塞并发算法是一种算法:
A:执行线程请求的操作 - 或者
B:阻塞线程直到可以安全地执行操作
许多类型的算法和并发数据结构都是阻塞的。例如,java.util.concurrent.BlockingQueue接口的不同实现都是阻塞型数据结构。如果线程尝试将元素插入到一个BlockingQueue
中并且此时队列没有空间,则阻塞(暂停)插入线程,直到BlockingQueue
有存放新元素的空间。
此图说明了保护共享数据结构的阻塞算法的行为:
非阻塞并发算法
非阻塞并发算法是一种算法:
- A:执行线程请求的操作 - 或者
- B:通知请求线程无法执行操作
Java也包含几个非阻塞数据结构。AtomicBoolean, AtomicInteger,AtomicLong 和AtomicReference是非阻塞的数据结构的例子。
此图说明了保护共享数据结构的非阻塞算法的行为:
非阻塞与阻塞算法
阻塞和非阻塞算法之间的主要区别在于它们的行为的第二步,如上面两节所述。换句话说,区别在于阻塞和非阻塞算法在无法执行请求的操作时所执行的操作:阻塞算法会阻塞线程,直到可以执行请求的操作,而非阻塞算法通知请求操作的线程无法执行操作。
使用阻塞算法,线程可能会被阻塞,直到可以执行请求的操作。通常,是另一个线程的执行操作,使得第一个线程可以执行所请求的操作。如果由于某种原因导致其他线程在应用程序中的其他位置被挂起(阻塞),从而无法执行,也使得第一个线程的请求操作不能执行,则第一个线程将被无限期地阻塞 - 或者直到另一个线程最终执行必要的行动。
例如,如果一个线程试图将一个元素插入一个满的BlockingQueue
中,那么该线程将阻塞,直到另一个线程从该BlockingQueue
中取出一个元素。如果由于某种原因,能够从BlockingQueue
中获取元素的线程在应用程序中的其他位置被阻止(暂停),则尝试插入新元素的线程将被无限期地阻塞 - 或者直到线程最终从这个BlockingQueue
获取元素。
非阻塞并发数据结构
在多线程系统中,线程通常通过某种数据结构进行通信。这样的数据结构可以是从简单变量到更高级的数据结构(如队列,映射,堆栈等)的任何一种。为了保证多个线程对数据结构的正确并发访问,数据结构必须由一些 并发算法 保护。保护算法是使数据结构成为 并发数据结构 的原因。
如果保护并发数据结构的算法是阻塞的(使用线程暂停),则称其为 阻塞算法 。因此,数据结构被称为 阻塞的并发数据结构 。
如果保护并发数据结构的算法是非阻塞的,则称其为 非阻塞算法 。因此,数据结构被称为 非阻塞的并发数据结构。
每个并发数据结构旨在支持某种通信方法。你使用哪种并发数据结构取决于你的通信需求。我将在以下部分介绍一些非阻塞并发数据结构,并解释它们可以在什么情况下使用。这些非阻塞数据结构如何工作的解释应该可以让你了解如何设计和实现非阻塞数据结构。
Volatile变量
Java volatile变量是始终直接从主内存读取的变量。将一个新值赋给给volatile变量时,该值始终立即写入主存储器。这可以保证volatile变量的最新值始终对其他CPU上运行的其他线程可见。其他线程每次都会从主内存中读取volatile的值,而不是从运行线程的CPU的CPU缓存中读取。
易失性变量是非阻塞的。将值写入volatile变量是原子操作,它不会被中断。但是,对volatile变量执行的 读取 - 更新 - 写入 序列不是原子的。因此,如果由多个线程执行此代码仍可能导致竞争条件:
volatile myVar = 0;
...
int temp = myVar;
temp++;
myVar = temp;
首先,volatile变量myVar
的值从主存储器读入到temp变量,然后temp变量增加1,然后将temp变量的值赋给volatile变量myVar
,这意味着它将被写回主存储器。
如果两个线程执行此代码并且它们都读取了myVar
的值,增加1并将新值写回主存储器,那么你可能会遇到只会将1而不是将2添加到myVar变量的风险(例如,两个线程)读取值19,增加到20,然后写回20)。
你可能认为你不会像上面那样编写代码,但在实践中,上面的代码等同于:
myVar++;
上述代码执行时,将myVar
值读入到CPU寄存器或本地CPU缓存中,添加1,然后将CPU寄存器或CPU缓存中的值写回主存储器。
一个写线程的情况
在某些情况下,你只有一个线程会写入共享变量,而有多个线程读取该变量的值。当只有一个线程更新变量时,无论有多少线程读取它,都不会出现竞争条件。因此,只要你只有一个线程写入共享变量,就可以使用volatile变量。
当多个线程对共享变量执行 读取 - 更新 - 写入 操作序列时,会发生竞争条件。如果你只有一个线程执行 读取 - 更新 - 写入操作 序列,并且所有其他线程仅执行读取操作,则不会发生竞争条件。
这是一个计数器类,它不使用同步但仍然是并发的:
public class SingleWriterCounter {
private volatile long count = 0;
/**
* Only one thread may ever call this method,
* or it will lead to race conditions.
*/
public void inc() {
this.count++;
}
/**
* Many reading threads may call this method
* @return
*/
public long count() {
return this.count;
}
}
只要只有一个线程调用inc()
,就可以有多个线程访问该计数器类的同一个实例,而且我并不是说一次一个线程。我的意思是,只允许相同的一个线程调用inc()
,而多个线程可以同时调用count()
。这不会导致任何竞争条件。
此图说明了线程访问volatile变量count
的情形:
基于Volatile变量的更高级数据结构
可以创建使用volatile变量组合形成的数据结构,其中每个volatile变量仅由一个线程写入,由多个线程读取,每个volatile变量可以由不同的线程(但一次只能有一个线程)写入。使用这样的数据结构,多个线程可以能够使用volatiel变量以非阻塞方式向彼此发送信息。
这是一个简单的双写计数器类:
public class DoubleWriterCounter {
private volatile long countA = 0;
private volatile long countB = 0;
/**
* Only one (and the same from thereon) thread may ever call this method,
* or it will lead to race conditions.
*/
public void incA() { this.countA++; }
/**
* Only one (and the same from thereon) thread may ever call this method,
* or it will lead to race conditions.
*/
public void incB() { this.countB++; }
/**
* Many reading threads may call this method
*/
public long countA() { return this.countA; }
/**
* Many reading threads may call this method
*/
public long countB() { return this.countB; }
}
如你所见,DoubleWriterCounter
现在包含两个volatile变量,以及两对递增和读取方法。一次只有一个线程可以调用incA()
,并且一次只有一个线程可以调用incB()
,不过它每次可以是不同的线程。允许许多个线程同时调用countA()
和countB()
,这不会导致竞争条件。
DoubleWriterCounter
可用于两个线程进行通信。这两个count可以是产生的任务数和执行完的任务数,下图显示了两个线程通过类似于上面的数据结构进行通信:
聪明的读者会发现可以通过使用两个SingleWriterCounter
实例来达到DoubleWriterCounter
的效果。如果需要,你甚至可以使用更多线程和SingleWriterCounter
实例。
使用CAS的乐观锁
如果你确实需要多个线程来写入相同的共享变量,那么volatile变量就无法提高足够的功能了。你将需要某种对变量的独占访问权限。使用Java中的synchronized块可以看到这种独占访问的方式:
public class SynchronizedCounter {
long count = 0;
public void inc() {
synchronized(this) {
count++;
}
}
public long count() {
synchronized(this) {
return this.count;
}
}
}
注意inc()
和count()
方法都包含同步块。但是我们想要避免使用同步块和wait() - notify()
调用等,因为这会使线程阻塞。
我们可以使用Java的原子变量代替这两个同步块。在这种情况下使用AtomicLong
。以下是使用AtomicLong
代替的相同的计数器类:
import java.util.concurrent.atomic.AtomicLong;
public class AtomicCounter {
private AtomicLong count = new AtomicLong(0);
public void inc() {
boolean updated = false;
while(!updated){
long prevCount = this.count.get();
updated = this.count.compareAndSet(prevCount, prevCount + 1);
}
}
public long count() {
return this.count.get();
}
}
此版本与先前版本一样是线程安全的,这个版本的有趣之处在于inc()
方法的实现。inc()
方法不再包含同步块。相反,它包含以下行:
boolean updated = false;
while(!updated){
long prevCount = this.count.get();
updated = this.count.compareAndSet(prevCount, prevCount + 1);
}
这些行不是原子操作。这意味着,两个不同的线程可以同时调用inc()
方法并执行long prevCount = this.count.get()
语句,从而获得计数器的先前计数。但是,上述代码不包含任何竞争条件。
秘密在于while
循环内两行代码中的第二行。compareAndSet()
方法调用是一个原子操作,它将AtomicLong
内部值与预期值进行比较,如果两个值相等,则为AtomicLong
设置新的内部值。compareAndSet()
方法通常由CPU中的compare-and-swap指令支持。因此,不需要同步,也不需要线程暂停。这节省了线程暂停的开销。
想象一下,AtomicLong
内部值为20。然后两个线程读取该值,并且两者都尝试调用compareAndSet(20, 20 + 1)
。由于compareAndSet()
是原子操作,因此线程将按顺序执行此方法(一次一个)。
第一个线程将预期值20(计数器的先前值)与AtomicLong
内部值进行比较。由于两个值相等,因此AtomicLong
将其内部值更新为21(20 + 1)。updated
变量将被设置为true然后退出while
循环。
现在第二个线程调用compareAndSet(20, 20 + 1)
。由于AtomicLong
内部值不再为20,因此该调用将失败,它的AtomicLong
内部值不会设置为21,updated
变量将被设置为false,并且线程将在while循环再旋转一次。这次它读取新值21并尝试将其更新为22。如果inc()
在此期间没有其他线程调用 ,则第二次迭代将成功更新AtomicLong
到22。
为什么称之为乐观锁?
上一节中展示的代码称为乐观锁。乐观锁不同于传统锁(有时也称为悲观锁),传统锁使用同步块或某种锁来阻止对共享内存的访问,而同步块或锁可能导致线程被挂起。
乐观锁允许所有线程创建共享内存的副本而不会阻塞。然后,线程可以对其副本进行修改,并尝试将其修改后的版本写回共享内存。如果没有其他线程已经对共享内存进行修改,则compare-and-swap
操作允许线程将其更改写入共享内存。如果另一个线程已经更改了共享内存,则线程必须获取新副本,进行更改并尝试再次将它们写入共享内存。
这被称为乐观锁的原因是线程获得了他们想要更改的数据的副本并执行他们的更改,在乐观的假设下,其他线程在此期间不会对共享内存进行更改。当这个乐观的假设成立时,线程只是设法更新共享内存而不锁定。当这个假设是错误的时,工作就被丢弃了,但仍然没有锁定。
乐观锁往往在共享内存中的低到中竞争时效果最佳。如果共享内存中的竞争非常高,那么线程将浪费大量CPU周期来复制和修改共享内存,但无法将更改写回共享内存。但是,如果你在共享内存上有很多竞争,那么你应该考虑重新设计代码以降低竞争。
乐观锁是非阻塞的
我在这里展示的乐观锁机制是非阻塞的。如果一个线程获得共享内存的副本并在尝试修改它时被阻止(无论出于何种原因),不会阻止其他线程访问共享内存。
使用传统的加锁/解锁范例,当一个线程加锁,该锁将会对所有其他线程保持锁定,直到拥有该锁的线程再次解锁。如果持有锁的线程在其他地方被阻塞,那么该锁将保持锁定很长时间 - 甚至可能无限期。
不可交换数据结构
简单的比较和交换乐观锁适用于这类共享数据结构,其中整个数据结构可以在一个CAS操作中与新数据结构交换swapped(exchanged)。但是,使用修改后的副本交换整个数据结构并不总是可能的或可行的。
想象一下,如果共享数据结构是队列。尝试从队列中插入或获取元素的每个线程都必须复制整个队列并对副本进行所需的修改。这可以通过一个AtomicReference
来完成,复制引用,复制和修改队列,并尝试将AtomicReference
中的引用指向的队列和新创建的队列进行交换。
但是,一个大的数据结构可能需要大量内存和CPU周期才能复制。这将使你的应用程序花费更多内存,并在复制上浪费大量时间。这将影响应用程序的性能,尤其是在数据结构竞争很高的情况下。此外,线程复制和修改数据结构所花费的时间越长,其他一些线程在其间修改数据结构的可能性就越大。如你所知,如果另一个线程在复制后修改了共享数据结构,则所有其他线程都必须重新启动其复制 - 修改操作。这将进一步增加对性能和内存消耗的影响。
下一节将解释实现非阻塞数据结构的方法,这些数据结构可以同步更新,而不仅仅是复制和修改。
Sharing Intended Modifications
线程可以共享其共享数据结构的intended modification,而不是复制和修改整个共享数据结构。想要对共享数据结构进行修改的线程的过程变为:
- 检查另一个线程是否已提交对数据结构的intended modification。
- 如果没有其他线程提交了intended modification,则创建intended modification(由对象表示)并将该intended modification提交给数据结构(使用比较和交换操作)。
- 执行共享数据结构的修改。
- 删除对intended modification的引用,以向其他线程发出已进行intended modification的信号
如你所见,第二步可以阻止其他线程提交intended modification。因此,第二步可以有效地用作共享数据结构的锁。如果一个线程成功提交了intended modification,则在执行完刚刚提交的intended modification之前,没有其他线程可以提交intended modification。
如果线程提交了intended modification,然后被阻塞导致无法执行其他工作,则共享数据结构将被锁定。共享数据结构不会直接阻塞其他线程使用数据结构,因为其他线程可以检测到他们无法提交intended modification然后可以决定执行其他内容。显然,我们需要解决这个问题。
Completable Intended Modifications
为了避免一个被提交的intended modification锁定共享数据结构,被提交的intended modification对象必须包含足够的信息以供另一个线程完成修改。因此,如果提交intended modification的线程一直没有完成修改,则另一个线程可以代表它完成修改,并使得共享数据结构可供其他线程使用。
这是一个说明上述非阻塞算法蓝图的图:
必须将修改作为一个或多个CAS操作来执行。因此,如果两个线程试图完成intended modification,则只有一个线程能够执行CAS操作。一旦这个CAS操作完成,另一个尝试完成的CAS操作将失败。
A-B-A 问题
上述算法可能遇到A-B-A问题。A-B-A问题指的是变量从A变为B然后再变回A的情况。因此,对于另一个线程,不可能检测到变量确实已经发生了改变。
如果线程A检查正在进行的更新,复制数据然后被线程调度程序挂起,则线程B也能够在此时访问共享数据结构。如果线程B执行了一次数据结构的完全更新,并删除其intended modification,对于线程A来说就好像自从复制数据结构后没有发生任何修改一样。但是,确实进行了修改。当线程A继续根据其现在过时的数据结构副本执行更新时,数据结构将撤消线程B的修改。
下图说明了上述情况下的ABA问题:
A-B-A 问题解决
A-B-A问题的常见解决方案是不仅将指针和预期的修改对象交换,还将指针与一个计数器组合,然后使用一个CAS操作交换指针+计数器。这在C和C++等支持指针的语言中是可行的。因此,即使当前修改指针被设置回指向“未进行修改”,指针+计数器的计数器部分也将增加,使得更新对其他线程可见。
在Java中,你无法将引用和计数器合并为单个变量。相反,Java提供了一个AtomicStampedReference
类,它可以使用比较和交换操作原子地交换引用和标记。
一个非阻塞算法的模板
下面是一个代码模板,旨在让你了解如何实现非阻塞算法。该模板基于本教程前面给出的描述。
注意:我不是非阻塞算法的专家,所以下面的模板可能有一些错误。不要在我的模板基础上建立自己的非阻塞算法实现。该模板仅用于让你了解非阻塞算法的代码的外观。如果你想实现自己的非阻塞算法,请首先研究一些真实的,有效的非阻塞算法实现,以了解有关它们在实践中如何实现的更多信息。
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicStampedReference;
public class NonblockingTemplate {
public static class IntendedModification {
public AtomicBoolean completed = new AtomicBoolean(false);
}
private AtomicStampedReference<IntendedModification> ongoingMod =
new AtomicStampedReference<IntendedModification>(null, 0);
//declare the state of the data structure here.
public void modify() {
while(!attemptModifyASR());
}
public boolean attemptModifyASR(){
boolean modified = false;
IntendedModification currentlyOngoingMod = ongoingMod.getReference();
int stamp = ongoingMod.getStamp();
if(currentlyOngoingMod == null){
//copy data structure state - for use
//in intended modification
//prepare intended modification
IntendedModification newMod = new IntendedModification();
boolean modSubmitted = ongoingMod.compareAndSet(null, newMod, stamp, stamp + 1);
if(modSubmitted){
//complete modification via a series of compare-and-swap operations.
//note: other threads may assist in completing the compare-and-swap
// operations, so some CAS may fail
modified = true;
}
} else {
//attempt to complete ongoing modification, so the data structure is freed up
//to allow access from this thread.
modified = false;
}
return modified;
}
}
非阻塞算法很难实现
非阻塞算法很难正确设计和实现。在尝试实现你自己的非阻塞算法之前,请查看是否有人已根据你的需求开发了相应的非阻塞算法。
Java已经附带了一些非阻塞实现(例如ConcurrentLinkedQueue
),并且很可能在未来的Java版本中获得更多的非阻塞算法实现。
除了Java的内置非阻塞数据结构之外,还可以使用一些开源的非阻塞数据结构。例如,LMAX Disrupter(类似队列的数据结构)和Cliff Click的非阻塞HashMap。有关更多资源的链接,请参阅我的Java concurrency references page。
非阻塞算法的好处
与阻塞算法相比,非阻塞算法有几个好处。本节将介绍这些好处。
选择
非阻塞算法的第一个好处是,线程可以在无法执行请求的操作时选择执行其他操作。请求线程可以选择执行哪些操作,而不只是被阻塞。有时线程没有其他事情可以做,在这种情况下,它可以选择阻塞或等待自己,从而释放CPU用于其他任务。但至少请求线程可以选择。
在单个CPU系统上,暂停一个无法执行所需操作的线程,让其他能够执行其工作的线程在CPU上运行也许是有意义的。但即使在单CPU系统上,阻塞算法也可能导致死锁,饥饿和其他并发问题。
没有死锁
非阻塞算法的第二个好处是,一个线程的暂停不会导致其他线程的暂停。这意味着不会发生死锁,两个线程等待彼此释放他们想要的锁时不会被阻塞。由于线程在无法执行其请求的操作时不会被阻塞,因此它们彼此等待时也不会被阻塞。非阻塞算法仍然可能导致锁定,其中两个线程一直尝试某些操作,但一直被告知无法完成(由于另一个线程的操作)。
没有线程暂停
暂停和重新激活线程的成本是昂贵的。虽然随着操作系统和线程库变得更高效,暂停和重新激活的成本随着时间的推移而下降。但是,线程暂停和重新激活仍然需要付出很高的代价。
每当线程被阻塞时,它就会被暂停,从而导致线程暂停和重新激活的开销。由于线程不会被非阻塞算法挂起,因此不会发生此开销。这意味着CPU可能会花费更多时间来执行实际业务逻辑而不是上下文切换。
在多CPU系统上,阻塞算法会对整体性能产生更大的影响。在CPU A上运行的线程可以被阻塞等待在CPU B上运行的线程,这降低了应用程序能够实现的并行度。当然,CPU A可以安排另一个线程运行,但挂起和激活线程(上下文切换)的开销是昂贵的,需要暂停的线程越少越好。
减少线程延迟
本上下文中的延迟是请求的操作变为可能和线程实际执行它之间的时间。由于线程没有在非阻塞算法中暂停,因此它们不必支付昂贵的,缓慢的重新激活开销。这意味着当请求的操作能够执行时,线程可以更快地响应,从而减少其响应延迟。
非阻塞算法通常执行忙等待直到所请求的动作能够执行来获得较低的延迟。当然,在非阻塞数据结构上具有高线程竞争的系统中,CPU可能在繁忙的等待期间最终浪费很多周期,这是要记住的事情。如果你的数据结构具有高线程竞争,则非阻塞算法可能不是最佳的。但是,通常有一些方法可以重新设计你的应用程序以减少线程争用。
相关书籍资料
图书:
Seven Concurrency Models in Seven Weeks
Concurrent Programming in Java - 1st + 2nd Ed.
文章:
关于非阻塞算法 https://www.cise.ufl.edu/tr/DOC/REP-1991-12.pdf
其他资源:
LMAX Disrupter并发数据结构 https://lmax-exchange.github.io/disruptor/