5.1 同步容器类
同步容器类包括Vector和HashTable以及Collection.synchronizedXxx等工厂方法。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
5.1.1 同步容器类的问题
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。容器中常见的复合操作包括:迭代,跳转,条件运算(比如“若没有则添加”)等。
- 迭代问题:如下为一个简单的Vector的遍历,看起来这段遍历代码并没有什么问题,但实际上在多线程情况下,假设线程A在迭代遍历,同时线程B在修改Vector,比如删除操作,那么就可能会抛出异常。
for (int i = 0; i < vector.size(); i ++)
dosomething(vector.get(i));
为了解决这个问题,我们可以对其进行客户端加锁操作:
synchronized(vector){
for (int i = 0; i < vector.size(); i ++)
dosomething(vector.get(i));
}
- 条件运算问题
假设Vector定义两个方法:getLast和deleteLast,它们都会进行“先检查后运行”操作。
public static Object getLast(Vector list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
假设线程A在包含10个元素的Vector上调用getLast,同时线程B在同一个Vector上调用deleteLast,这些操作交替执行如下图:
那么当执行到get操作时,会报出异常。
为了解决这个问题,我们可以采用客户端加锁的方法来保证复合操作的线程安全性:
public static Object getLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
如此,便可以保证线程的安全性
5.1.2 迭代器与ConcurrentModificationException
在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为是“及时失败(fail-fast)”的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException异常。
注意:这种“及时失败”的迭代器并不是一种完备的处理机制,而只是“善意地”捕获并发错误,因此,只能作为并发问题的预警指示器。
解决方法:
a. 在迭代期间对容器加锁,某些线程必须在等待迭代过程结束才可以进行访问或修改,如果容器的规模很大,或者每个元素执行操作的时间很长,那么这些线程将长时间等待。我们知道持有锁的时间越长,那么在锁上的竞争就可能越激烈,如果许多线程都在等待锁被释放,那么将极大地降低吞吐量和CPU的利用率。
b. ”克隆“容器,并在副本上进行迭代。由于副本被封闭在线程内,因此其他线程无法在迭代期间对其进行修改。但克隆容器时存在显著的性能开销。
5.1.3 隐藏迭代器
public class HiddenIterator{
private final Set<Integer> set = new Hashset<Integer>();
public synchronized void add(Integer i){ set.add(i); }
public synchronized void remove(Integer i){ set.remove(i); }
public void addTenThings(){
Random r = new Random();
for (itn i = 0; i < 10; i ++)
add(r.nextInt());
//进行了隐式地迭代
System.out.println("DEBUG: added ten elements to " + set);
}
}
如上,addTenThings方法可能会抛出ConcurrentModificationException异常,因为在生成调试信息时,toString会对容器set进行迭代。
除此之外,常见的隐式迭代器还有hashCode和equals,containsAll,removeAll等方法。
5.2 并发容器
- 同步容器将所有对容器状态的访问都进行了串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重减低。
- 为此我们通过并发容器来代替同步容器,从而提高伸缩性并降低风险。
常见的并发容器:
- ConcurrentHashMap:用于代替同步且基于散列的Map。
- 与HashMap一样,ConcurrentHashMap也是基于散列的Map,但它使用了一种完全不同的加锁策略来提高并发性和伸缩性。ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。在这种机制下,任意数量的读取线程可以并发地访问Map,并且一定数量的写线程可以并发地修改Map。
- ConcurrentHashMap提供的迭代器不会抛出ConcurrentModificationException异常,因此不需要在迭代过程对容器进行加锁。因为ConcurrentHashMap返回的迭代器具有弱一致性,而并非”及时失败“。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以在迭代器被构造后将修改操作反映给容器。
注意:
a. 在多线程环境下,我们通常选取ConcurrentHashMap,只有当应用程序需要加锁Map以进行独占访问时,才应该放弃ConcurrentHashMap。
b. 由于ConcurrentHashMap不能被加锁来执行独占访问,因此我们无法使用客户端加锁的方式来创建新的原子操作。但ConcurrentHashMap提供了一些”额外的原子Map操作“。
public interface ConcurrentHashMap<k,V> extends Map<K,V>{
//仅当K没有相应的映射值时才插入
V putIfAbsent(K key, V value);
//仅当K被映射到V时才移除
boolean remove(K key, V value);
//仅当K被映射到oldValue时才被替换
boolean replace(K key, V oldValue, V newValue);
//仅当K被映射到某个值时,才替换为newValue
boolean replace(K key, V newValue);
}
- CopyOnWriteArrayList
a. CopyOnWriteArrayList保证正确地发布一个事实不可变的对象,那么在访问该对象是就不需要进一步的同步。
b. CopyOnWriteArrayList是通过在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。
c. CopyOnWriteArrayList的迭代器保留一个指向底层的基础数组引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰。
d. CopyOnWriteArrayList的迭代器不会抛出ConcurrentModificationException异常。
e. CopyOnWriteArrayList的最大缺点是每当修改容器时都需要复制底层数组,这需要一定的开销。因此,仅当迭代器操作远远大于修改操作时,才应该用CopyOnWriteArrayList容器。例如,我们常见的通知系统:在分发通知时需要迭代已注册监听器链表,并调用每个监听器。
5.3 阻塞队列和生产者-消费者模式
阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法会阻塞直到有空间可用;如果队列为空,那么take方法会阻塞直到有元素可用。
阻塞队列支持生产者-消费者模式,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中取出。
两种特殊情况:
a.如果生产者不能尽快地产生工作使消费者保持忙碌,那么消费者就只能一直等待,直到有工作可做。在某些情况下,这种方式是合适的,比如在服务器应用程序中,没有任何客户请求服务。但在其他另一些情况下,就不适用,比如“网页爬虫”中,有无穷的工作需要完成。
解决方法:
调整生产者线程数量和消费者线程数量之间的比率,从而实现更高的资源利用率。
b.如果生产者生成的速率比消费者处理工作的速率快,那么工作项会在队列中累积起来,最终耗尽内存。
解决方法:
使用有界队列,当队列充满时,生产者将被阻塞并且不能工作,而消费者就有时间来赶上工作处理进度。阻塞队列提供了offer方法,如果数据项不能被添加到队列中,那么会返回一个失败状态。这样我们就可以创建更多灵活的策略来处理负荷过载的情况。比如将数据项写入磁盘,或者减少生产者线程的数量。
在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。
- 常见的阻塞队列(BlockingQueue)的实现
a. LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和 ArrayList类似,但比同步list有更好的并发性。
b. PriorityBlockingQueue是一个按优先级排序的队列。它既可以根据元素的自然顺序来比较元素,也可以使用Comparator方法来比较。
c. SynchronousQueue并不是一个真正的队列,因为它不会为队列中元素维护存储空间,它维护的是一组线程,这些线程在等待把元素加入或移除。因为SynchronousQueue没有存储功能,因此put和take方法会被阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并总有一个消费者是准备好交付的工作时,才适合使用同步队列。
5.3.2 串行线程封闭
通过将多个并发的任务存入队列实现任务的串行化,并未这些串行化的任务创建唯一的一个工作进程处理。
本质:使用一个开销更小的锁(队列锁)去代替另一个可能开销更大的锁(非线程安全对象引用的锁)。
- 适用场景:
a. 需要使用非线程安全对象,但又不希望引入锁。
b. 任务的执行涉及I/O操作,不希望过多的I/O线程增加上下文切换。
5.3.3 双端队列与工作密取
Deque是一个双端队列,实现了队列在队列头和队列尾的高效插入和移除。具体实现包括ArrayQueue和LinkedBlockingQueue。
在工作密取中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。
工作密取模式比传统的生产者-消费者模式具有更高的可伸缩性。因为在大多数情况下,它们都只是访问自己的双端队列,从而极大地减少了竞争,并且当它需要访问另一个队列时,它会从尾部获取工作。
5.5 同步工具类
阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore),栅栏(Barrier)以及闭锁(Latch)。
5.5.1 闭锁
闭锁是一种同步工具类,可以延迟进程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁达到结束状态之前,这扇门一直是关闭状态的,并且没有任何线程能通过,当达到结束状态时,这扇门会打开并允许所有的线程通过。
闭锁可以用来确保某些活动直到其他活动都完成后才继续执行
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行
- 确保某个服务在其依赖的所有其他服务都已经启动之后才启动
- 等待直到某个操作的所有参与者(例如,在多玩游戏中的所有玩家)都就绪再继续执行
CountDownLatch是一种灵活的闭锁实现
- 闭锁包含一个计数器,该计数器初始化为一个正数,表示需要等待的事件数量。
- countDown方法递减计数器,表示有一个事件已经发生。
- await方法等待计数器达到零。如果计数器的值为非零,那么await会一直阻塞直到计数器为零。
如下例子:
我们需要保证所有线程就绪后才开始计时,并保证所有线程完成任务后停止计时,最终返回时间。
public class TestHarness{
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException{
//作为保证同时开始执行任务的闭锁
final CountDownLatch startGate = new CountDownLatch(1);
//作为保证全部执行完任务的闭锁
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i ++) {
Thread t = new Thread(){
public void run(){
try{
//在此处,线程会被阻塞,直到startGate执行了await方法
startGate.await();
try{
task.run();
} finally{
endGate.countDown();
}
} catch (InterruptedException ignored) { }
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
//在此处当前线程会被阻塞,直到任务线程全部完成任务,endGate值为0,才释放
endGate.await();
long end = system.nanoTime();
return end - start;
}
}
我们在这里设置了两个闭锁,其中startGate闭锁保证了所有线程就绪后才开始计时,endGate闭锁保证了所有线程完成任务才结束计时。
FutureTask
FutureTask也可以用作闭锁,是一种抽象的可生成结果的Runnable,并且处于以下三种状态:
等待运行,正在运行和运行完成。
FutrueTask的闭锁体现在它的get方法,如果任务完成,那么get会立即返回结果,否则get会阻塞直到任务完成进入完成状态。
public class Preloader{
//通过call方法返回执行的结果
private final FutrueTask<ProductInfo> futrue =
new FutrueTask<ProductInfo>(new Callable<ProductInfo>(){
public ProductInfo call() throws DataLoadException{
return loadProductInfo();
}
});
private final Thread thread = new Thread(futrue);
public void start(){ thread.start(); }
public ProductInfo get()
throws DataLoadException,InterruptedException{
try{
return futrue.get();
} catch (ExecutionException ex){
Throwable cause = e.getCause();
if (cause instanceof DataLoadException) {
throw (DataLoadException) cause;
} else {
throw launderThrowable(cause);
}
}
}
}
如上,我们使用FutrueTask来提前加载稍后需要的数据。Preloader创建了一个FutureTask,其中包含从数据库加载产品信息,以及一个执行运算的线程。当程序稍后需要ProductInfo时,可以调用get方法,如果数据已经加载,那么返回这些数据,否则将等待加载完成后返回。
注意:在Preloader中,当get方法抛出ExecutionException时,可能是这三种情况:Callable抛出的异常,RuntimeExcetpion,以及Error。Preloader会首先检查已知的受检查类型异常,并重新抛出给它们,剩下的就是未检查异常交给了launderThrowable来处理。
public static RuntimeException launderThrowable(Throwable t){
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
return (Error) t;
else
throw new IllegalStateException("Not unchecked",t);
}
5.5.3 信号量
- 计数信号量(Counting Semaphore)用来同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。
- 它主要包含两个操作acquire和release。acquire将阻塞直到有许可,release方法将释放一个许可。
- 使用Semaphore将任何一种容器变为有界阻塞容器。
public class BoundedHashSet<T>{
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound){
this.set = Collections.synchronizedSet(new Hashset<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException{
//获取许可,若容器数量已经达到bound,则被阻塞直到有许可被释放
sem.acquire();
boolean wasAdded = false;
try{
wasAdded = set.add(o);
return wasAdded;
} finally{
//若添加失败,则释放许可
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o){
boolean wasRemoved = set.remove(o);
//若删除失败,则释放许可
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
如上,我们通过Semaphore实现了一个可阻塞的有界容器。
5.5.4 栅栏
- 闭锁是一次性对象,一旦进入终止状态,就不能被重置。
- 栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。
- 栅栏与闭锁的关键区别在于:所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
- 当线程达到栅栏时将调用await方法,这个方法将阻塞直到所有的线程都达到这个栅栏位置。
举个例子:有三个工人合作建桥,有三个桩,每人打一个,同时打完之后才能一起搭桥(搭桥需要三个人一起合作)。也就是三个人都打完桩之后才能继续工作。
public class CyWork implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;
public CyWork(CyclicBarrier cyclicBarrier, String name){
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
@Override
public void run() {
System.out.println(name + "正在打桩....");
try {
Thread.sleep(5000);
System.out.println(name + "完成打桩。");
//阻塞线程,直到所有线程都到达栅栏
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + ": 其他人都打完桩了,开始搭桥了。");
}
public static void main(String[] args){
ExecutorService executorService = Executors.newFixedThreadPool(3);
//定义栅栏
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
CyWork work1 = new CyWork(cyclicBarrier, "张三");
CyWork work2 = new CyWork(cyclicBarrier, "李四");
CyWork work3 = new CyWork(cyclicBarrier, "王五");
executorService.execute(work1);
executorService.execute(work2);
executorService.execute(work3);
executorService.shutdown();
}
}
输出结果:
张三正在打桩....
李四正在打桩....
王五正在打桩....
李四完成打桩。
张三完成打桩。
王五完成打桩。
王五: 其他人都打完桩了,开始搭桥了。
李四: 其他人都打完桩了,开始搭桥了。
张三: 其他人都打完桩了,开始搭桥了。
如上,我们使用栅栏的方式保证三个工人都完成打桩工作后,才进行搭桥工作。
Exchanger是一种双方栅栏,各方在栅栏位置上交换数据。
Exchanger可以在两个线程之间交换数据,只能是2个线程,不支持更多的线程之间交换数据。
当线程A调用Exchange对象的exchange方法之后,它会陷入阻塞状态,直到线程B也调用exchange方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。
举个例子:我们模拟将钱和商品进行交换,线程A持有钱,线程B持有商品,两者之间进行交换。
public class ExchangeTest {
public static void main(String[] args){
ExchangeTest test = new ExchangeTest();
//定义Exchanger
Exchanger<String> exchanger = new Exchanger<>();
test.new Money(exchanger).start();
test.new Product(exchanger).start();
}
class Money extends Thread{
private String data;
private Exchanger<String> exchanger = null;
Money(Exchanger<String> exchanger){
this.exchanger = exchanger;
data = "钱";
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在把数据<" + data +">交换出去");
try {
Thread.sleep(3000);
//进行交换数据
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() + "换回来的数据为<" + data +">");
}
}
class Product extends Thread{
private String data;
private Exchanger<String> exchanger = null;
Product(Exchanger<String> exchanger){
this.exchanger = exchanger;
data = "商品";
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在把数据<" + data +">交换出去");
try {
Thread.sleep(3000);
//进行交换数据
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() + "换回来的数据为<" + data+">");
}
}
}
输出结果:
线程Thread-0正在把数据<钱>交换出去
线程Thread-1正在把数据<商品>交换出去
线程Thread-0换回来的数据为<商品>
线程Thread-1换回来的数据为<钱>
如上,我们可以看到两个线程之间成功地进行了交换数据。
- 适用场景:当两个线程执行不对称的操作时,Exchanger会非常有用。
比如当一个线程向缓冲区写入数据,而另一个线程从缓存区读取数据。这些线程可以通过Exchanger来进行汇合,并将满的缓冲区与空的缓冲区进行交换。
5.6 构建高效可伸缩的结果缓存
重用之前的计算结果能降低延迟,提高吞吐量,但却需要消耗更多的内存。
- 使用HashMap和同步机制来初始化缓存
public interface Computable<A,V>{
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunction implements Computable<String,BigInteger>{
public BigInteger compute(String arg){
//经过长时间的计算后
return new BigInteger(arg);
}
}
public class Memoizer<A,V> implements Computable<A,V>{
private final Map<A,V> cache = new HashMap<A,V>();
private final Computable<A,V> c;
public Memoizer(Computable<A,V> c){
this.c = c;
}
public synchronized V compute(A arg) throws InterruptedException{
V result = cache.get(arg);
if (result == null){
result = c.compute(arg);
cache.put(arg,result);
}
return result;
}
}
如上,因为HashMap是线程不安全的,因此我们使用了synchronized关键字来保证线程安全性。虽然这样的操作能够保证线程安全,但每次只能有一个线程能够执行compute方法。如果一个线程正在计算结果,那么其他调用compute方法的线程将长时间被阻塞。
- 使用ConcurrentHashMap代替HashMap
public class Memoizer<A,V> implements Computable<A,V>{
private final Map<A,V> cache = new ConcurrentHashMap<A,V>();
private final Computable<A,V> c;
public Memoizer(Computable<A,V> c){
this.c = c;
}
public V compute(A arg) throws InterruptedException{
V result = cache.get(arg);
if (result == null){
result = c.compute(arg);
cache.put(arg,result);
}
return result;
}
}
这个方法并第一个方法有更好的并发性,多线程可以并发地访问。但这里却存在一个不足之处:当两个线程同时调用compute时存在一个漏洞,可能会导致计算得到相同的结果。比如线程A和线程B都携带数据arg="test"
,若此时缓存中不存在这样的缓存,那么线程A先判断为null,那么进行计算,计算时,线程B尽量进行判断为null,同样B也会进行计算,那么这样就完全失去了缓存的意义了,特别是当多个线程在进行取相同的值时。
实际上,上面的问题在于:如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行,那么很可能重复这个计算。
- 基于FutrueTask的Memoizing封装器
public class Memoizer<A,V> implements Computable<A,V>{
private final Map<A,Futrue<V>> cache = new ConcurrentHashMap<A,Futrue<V>>();
private final Computable<A,V> c;
public Memoizer(Computable<A,V> c){
this.c = c;
}
public V compute(A arg) throws InterruptedException{
Futrue<V> f = cache.get(arg);
if (f == null) {
Callable<V> cal = new Callable<V>(){
public V call() throws InterruptedException{
return c.compute(arg);
}
};
FutrueTask<V> ft = new FutrueTask<V>(cal);
f = ft;
cache.put(arg,ft);
//在这里调用c.compute方法
ft.run();
}
try{
return f.get();
} catch (ExecutionException e){
throw launderThrowable(e.getCause());
}
}
}
在这里,使用FutrueTask来表示返回的结果,我们知道FutrueTask的特点是:若结果已经计算出来,则立即返回结果。如果其他线程正在计算该结果,那么新到的线程会一直等待这个结果计算出来。
但这里依旧存在一个缺陷:仍然存在两个线程计算出相同的结果,但这个漏洞发生的几率远小于上面的方法。造成这个缺陷的原因是compute方法中的if代码块是非原子的”先检查后执行“操作。因此,存在两个线程同一时间内调用compute方法判断到结果为null,然后进行计算。
- 最终版本:
public class Memoizer<A,V> implements Computable<A,V>{
private final Map<A,Futrue<V>> cache = new ConcurrentHashMap<A,Futrue<V>>();
private final Computable<A,V> c;
public Memoizer(Computable<A,V> c){
this.c = c;
}
public V compute(A arg) throws InterruptedException{
Futrue<V> f = cache.get(arg);
if (f == null) {
Callable<V> cal = new Callable<V>(){
public V call() throws InterruptedException{
return c.compute(arg);
}
};
FutrueTask<V> ft = new FutrueTask<V>(cal);
f = cache.putIfAbsent(arg,ft);
if (f == null) {f = ft; ft.run(); }
}
try{
return f.get();
} catch (ExecutionException e){
throw launderThrowable(e.getCause());
}
}
}
如上,我们采用ConcurrentHashMap中的putIfAbsent方法来解决这个非原子操作问题,先判断是否存在这个计算,若存在,则不会添加。
在因式分解servlet中使用Memoizer来缓存结果
public class Factorizer implements Servlet{
//调用计算方法,并返回结果
private final Computable<BigInteger,BigInteger[]> c =
new Computable<BigInteger,BigInteger[]>(){
public BigInteger[] compute(BigInteger arg){
return factor(arg);
}
}
//设置缓存
private final Computable<BigInteger,BigInteger[]> cache =
new Memoizer<BigInteger,BigInteger[]>(c);
public void service(ServletRequest req, ServletResponse resp){
try{
BigInteger i = extractFromRequest(req);
encodeIntoResponse(resp, cache.get(i));
} catch (InterruptedException e){
encodeIntoResponse(resp, "factorization interrupted");
}
}
}