第五章笔记
5.1 同步容器类
同步容器类包括Vector和Hashtable,还有Collections.synchronizedXxx等。
5.1.1 同步容器类的问题
同步线程类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。
- 迭代
- 条件运算(检查在Map中是否存在键值K,如果没有,就加入二元组)
在并发容器中,这些复合操作也是线程安全的,但当其他线程并发修改容器时,可能会出现意料之外的行为。
程序清单5-1
public class UnsafeVectorHelpers {
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
该程序的问题是,当A线程在包含10个元素的Vector上调用getLast,同时B线程在同一个Vector上调用deleteLast,这些操作交替执行时,getLast将抛出ArrayIndexOutOfBoundsException异常。在调用size与调用getLast这两个操作之间,Vector变小了,因此在调用size时得到的索引值将不再有效。
由于同步容器类要遵守同步策略,即支持客户端加锁,所以在list上加锁就可以将getLast和deleteLast变为原子操作。
程序清单5-2
public class SafeVectorHelpers {
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
}
这种风险在迭代时依然会出现
程序清单5-3
for(int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
同之前一样,当迭代的时候,size发生变化,依然会抛出异常,可以通过在客户端加锁的方式来避免,但是要牺牲性能。
程序清单5-4
synchronized(vector){
for(int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
}
5.1.2 迭代器与ConcurrentModificationException
对容器进行迭代的时候一会选用Iterator,但是即便用迭代器如果不对容器加锁,也无法避免运行时出现ConcurrentModificationException异常。
程序清单5-5
List<Widget> widgetList
= Collections.synchronizedList(new ArrayList<Widget>);
...
// 可能抛出ConcurrentModificationException
for (Widget w : widgetList)
doSomething(w);
上面程序就是用for-each语法对List容器进行迭代,javac将生成使用Iterator的代码,反复调用hasNext和next来迭代List对象。
要想避免出现ConcurrentModificationException异常,就必须在迭代过程持有容器的锁。
但是在容器上加锁,可能会出现性能问题以及死锁。
另一种替代方法就是“克隆”容器,并在副本上进行迭代,由于副本被封闭在线程内部,所以不会出现ConcurrentModificationException异常,但是会带来显著的性能开销。
5.1.3 隐藏迭代器
虽然加锁可以防止迭代器抛出ConcurrentModificationException,但是必须在所有对共享容器做操作的地方加锁。但是实际情况中,迭代器会隐藏起来,来看下面程序
程序清单5-6
public class HiddenIterator {
@GuardedBy("this") private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i) {
set.add(i);
}
public synchronized void remove(Integer i) {
set.remove(i);
}
public void addTenThings() {
Random r = new Random();
for (int i = 0; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG: added ten elements to " + set);
}
}
System.out.println("DEBUG: added ten elements to " + set)
这行代码编译器将字符串的连接操作转换为调用StringBuilder.append(Object),而这个方法又会调用容器的toString方法,标准容器的toString方法将迭代容器,并在每个元素上调用toString来生成容器内容的格式化表示。
正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。
类似的操作还有容器的hashCode、equals、containsAll、removeAll和retainAll方法。
5.2 并发容器
通过并发容器来替代同步容器,可以极大地提高伸缩性并降低风险。
ConcurrentHashMap、CopyOnWriteArrayList
Queue:用来临时保存一组等待处理的元素,Queue上的操作不会阻塞,如果队列为空,那么获取元素的操作将返回空值。它提供了几种实现,
- ConcurrentLinkQueue:传统的先进先出队列
- PriorityQueue:这是一个(非并发)优先队列。
- BlockingQueue:扩展了Queue,增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。如果队列已满,那么插入元素的操作将一直阻塞,直到队列中出现可用的空间。在“生产者 - 消费者”这种设计模式中,阻塞队列是非常有用的。
5.2.1 ConcurrentHashMap
同步容器类在执行每个操作期间都持有一个锁。
ConcurrentHashMap是一个基于散列的Map,并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。
ConcurrentHashMap提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。
ConcurrentHashMap中没有实现对Map加锁以提供独占访问。
5.2.2 额外的原子操作
由于ConcurrentHashMap中没有实现对Map加锁以提供独占访问,因此我们无法使用客户端加锁来创建新的原子操作,但是一些常见的复合操作都已经实现为原子操作并且在ConcurrentMap的接口中声明。
程序清单5-7
public interface ConcurrentMap<K,V> extends Map<K,V> {
//仅当K没有相应的映射值时才插入
V putIfAbsent(K key, V value);
//仅当K被映射到V时才移除
boolean remove(Object key, Object value);
//仅当K被映射到oldValue时才替换为newValue
boolean replace(K key, V oldValue, V newValue);
//仅当K被映射到某个值时才替换为newValue
V replace(K key, V value);
}
5.2.3 CopyOnWriteArrayList
用于替代同步List,在迭代期间不需要对容器进行加锁或复制。(CopyOnWriteArraySet的作用是替代同步Set)。
该容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。
每次修改容器时都会复制底层数组,当容器规模较大时,需要很大的性能开销。
5.3 阻塞队列和生产者 - 消费者模式
阻塞队列提供了可阻塞的put和take方法,以及offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空那么take方法将会阻塞直到有元素可用。
常见的生产者-消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式。
offer方法作用是,如果数据项不能被添加到队列中,那么就返回一个失败状态。
BlockingQueue简化了生产者-消费者设计的实现过程,他有多种实现:
- LinkedBlockingQueue,是FIFO队列,和LinkedList类似
- ArrayBlockingQueue,是FIFO队列,和ArrayList类似
- PriorityBlockingQueue是一个按优先级排列的队列,根据元素的自然顺序来比较元素
- SynchronousQueue(后面称SQ)内部没有容量,所以不能通过peek方法获取头部元素;也不能单独插入元素,可以简单理解为它的插入和移除是“一对”对称的操作。为了兼容 Collection 的某些操作(例如contains),SQ 扮演了一个空集合的角色。
SQ 的一个典型应用场景是在线程池中,Executors.newCachedThreadPool() 就使用了它,这个构造使线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
5.3.1 示例:桌面搜索
扫描本地驱动器上的文件建立索引以便随后进行搜索,类似于桌面搜索程序或Windows索引服务。
程序清单5-8
public class ProducerConsumer {
static class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue,
final FileFilter fileFilter,
File root) {
this.fileQueue = fileQueue;
this.root = root;
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries)
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
fileQueue.put(entry);
}
}
}
static class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
try {
while (true)
indexFile(queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void indexFile(File file) {
// Index the file...
};
}
private static final int BOUND = 10;
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter() {
public boolean accept(File file) {
return true;
}
};
for (File root : roots)
new Thread(new FileCrawler(queue, filter, root)).start();
for (int i = 0; i < N_CONSUMERS; i++)
new Thread(new Indexer(queue)).start();
}
}
FileCrawler给出了一个生产者任务,在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。而且,在Indexer中还给出了一个消费者任务,从队列中取出文件名称并对它们建立索引。
生产者-消费者模式可以提高代码的可读性和可重用性:每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流,因此每个功能的代码都更加简单和清晰。
生产者 - 消费者模式带来性能优势。可以并发地执行。
程序清单5-9 启动桌面搜索
public class ProducerConsumer {
static class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue,
final FileFilter fileFilter,
File root) {
this.fileQueue = fileQueue;
this.root = root;
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries)
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
fileQueue.put(entry);
}
}
}
static class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
try {
while (true)
indexFile(queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void indexFile(File file) {
// Index the file...
};
}
private static final int BOUND = 10;
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter() {
public boolean accept(File file) {
return true;
}
};
for (File root : roots)
new Thread(new FileCrawler(queue, filter, root)).start();
for (int i = 0; i < N_CONSUMERS; i++)
new Thread(new Indexer(queue)).start();
}
}
5.3.2 串行线程封闭
5.3.3 双端队列与工作密取
Java6增加了两种容器类型,Deque和BlockingDeque,它们分别对Queue和BlockingQueue进行扩展。
Deque是一个双端队列,实现了在队列头和队列尾的高效插入和删除。具体实现包括ArrayDeque和LinkedBlockingDeque。
在生产者-消费者模式中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有自己的双端队列,如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。
工作密取非常适用于既是消费者也是生产者问题 - 当执行某个工作时可能导致出现更多的工作。
例如:网页爬虫时,发现有更多的页面需要处理;垃圾回收阶段对堆进行标记。
5.4 阻塞方法与中断方法
线程可能会阻塞或暂停执行,原因有多种:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果。
阻塞操作与执行时间很长的普通操作的差别在于,被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等待I/O操作完成,等待某个锁变为可用,或者等待外部计算的结束。
BlockingQueue的put和take等方法会抛出InterruptedException,当某方法抛出InterruptedException时,表示这是一个阻塞方法。如果这个方法被终端,那么它将努力提前结束阻塞状态。
中断是一种协作机制。一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。
处理对中断的响应,有两种选择:
- 传递InterruptedException:向上抛出该异常
- 恢复中断:有时不能抛出该异常,例如当代码是Runnable的一部分时,这时必须捕获该异常,并通过调用当前线程上的interrupt方法恢复中断状态,比如下面程序这样
程序清单5-10
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
void processTask(Task task) {
// Handle the task
}
interface Task {
}
}
5.5 同步工具类
同步工具类包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态,同步工具类包括:阻塞队列、信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。
5.5.1 闭锁
闭锁可以延迟线程的进度知道其到达终止状态。
作用:相当于一扇门,在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。可以用来确保某些活动直到其他活动都完成后才继续执行。
CountDownLatch是一种灵活的闭锁实现。countDown方法递减计数器,await方法等待计数器达到零。
程序清单5-11
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
该程序创建一定数量的线程,利用它们并发地执行指定的任务。使用了两个闭锁startGate和endGate,分别表示起始门和结束门。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的countDown方法减1,这能使主线程高效地等待直到所有工作线程都执行完成。
5.5.2 FutureTask
FutureTask也可用做闭锁。
FutureTask表示的计算是通过Callable来实现的。
Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。
FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的计算。这些计算可以在使用计算结果之前启动。
程序清单5-12
public class Preloader {
//从数据库加载产品信息
ProductInfo loadProductInfo() throws DataLoadException {
return null;
}
private final FutureTask<ProductInfo> future =
new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);
//启动线程
public void start() { thread.start(); }
//通过get方法获取结果
public ProductInfo get()
throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw LaunderThrowable.launderThrowable(cause);
}
}
interface ProductInfo {
}
}
class DataLoadException extends Exception { }
这里需要注意自定义异常ExecutionException,在调用get方法时,无论代码抛出什么异常,都会被封装到该异常中。
在Preloader中,当get方法抛出ExecutionException时,可能是以下三种情况之一:Callable抛出的受检查异常,RuntimeException,以及Error。必须对每种情况单独处理。
程序清单5-13
public class LaunderThrowable {
/**
* Coerce an unchecked Throwable to a RuntimeException
* <p/>
* If the Throwable is an Error, throw it; if it is a
* RuntimeException return it, otherwise throw IllegalStateException
*/
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}
}
5.5.3 信号量
计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。用来实现某种资源池,或者对容器施加边界。
Semaphore管理着一组许可(permit),许可的初始数量可以通过构造函数设定,操作时首先要获取到许可,才能进行操作,操作完成后需要释放许可。如果没有获取许可,则阻塞到有许可被释放。如果初始化了一个许可为1的Semaphore,那么就相当于一个不可重入的互斥锁(Mutex)。
理论的听起来有些绕口,其实假设生活中一个常见的场景:每天早上,大家都热衷于带薪上厕所,但是公司厕所一共只有10个坑位。。那么只能同时10个人用着,后面来的人都得等着(阻塞),如果走了2个人,那么又可以进去2个人。这里面就是Semaphore的应用场景,争夺有限的资源。
程序清单5-14
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
//注意一定要在finally中释放,否则就会出现死锁
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
5.5.4 栅栏
栅栏可以阻塞一组线程直到某个事件发生。
栅栏与闭锁的区别:所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,栅栏用于等待其他线程。例如:几个家庭决定在某个地方集合:“所有人6:00在麦当劳碰头,到了以后要等待其他人,之后再讨论下一步要做的事情”。
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置之后才释放,当所有线程都释放之后,栅栏将呗重置以便下次使用;如果对await的调用超时、中断,那么所有阻塞的线程都将抛出BrokenBarrierException。
程序清单5-15
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
private class Worker implements Runnable {
private final Board board;
public Worker(Board board) { this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 0;
}
}
public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}
interface Board {
int getMaxX();
int getMaxY();
int getValue(int x, int y);
int setNewValue(int x, int y, int value);
void commitNewValues();
boolean hasConverged();
void waitForConvergence();
Board getSubBoard(int numPartitions, int index);
}
}
另一种形式的栅栏是Exchanger,它是一种两房栅栏,各方在栅栏位置上交换数据。例如当一个线程向缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。
5.6 构建高效且可伸缩的结果缓存
本节将开发一个高效且可伸缩的缓存,用于改进一个高计算开销的函数,首先从简单的HashMap开始,然后分析它的并发性缺陷,并讨论如何修复它们。
程序清单5-16
public class Memoizer1 <A, V> implements Computable<A, V> {
@GuardedBy("this") private final Map<A, V> cache = new HashMap<A, V>();
private final Computable<A, V> c;
public Memoizer1(Computable<A, V> c) {
this.c = c;
}
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
interface Computable <A, V> {
V compute(A arg) throws InterruptedException;
}
class ExpensiveFunction
implements Computable<String, BigInteger> {
public BigInteger compute(String arg) {
// after deep thought...
return new BigInteger(arg);
}
}
在ExpensiveFunction中实现的Computable,需要很长时间才能计算出结果,我们将创建一个Computable包装器,帮助记住之前的计算结果,并将缓存过程封装起来。
在程序清单5-16中给出了第一种尝试,用HashMap来保存,但是它不是线程安全的,所以Memoizer1采取了加锁的方式,对整个compute方法进行同步。如果另一个线程正在计算,其他调用compute的线程可能会被阻塞很长时间。这一种不是我们想要的。
程序清单5-17
public class Memoizer2 <A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
private final Computable<A, V> c;
public Memoizer2(Computable<A, V> c) {
this.c = c;
}
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
程序清单5-17中的Memoizer2用ConcurrentHashMap代替HashMap,解决了线程不安全的问题,但是当两个线程同时调用compute时存在一个漏洞,可能会导致计算得到相同的值。
Memoizer2的问题在于,如果某个线程启动了一个开销很多的计算,而其他线程并不知道这个计算正在进行,那么很可能会重复这个计算,可以用FutureTask来实现。
如果有结果可用,那么FutureTask.get将立即返回结果,否则它会一直阻塞,直到结果计算出来再将其返回。
程序清单5-18
public class Memoizer3<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer3(Computable<A, V> c) { this.c = c; }
public V compute(final A arg) throws InterruptedException {
Future<V> f = cache.get(args);
if (f == null){
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(args);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = ft;
cache.put(arg, ft);
ft.run(); //在这里将调用c.compute
}
try{
return f.get();
} catch(ExecutionException e){
throw launderThrowable(e.getCause());
}
}
}
该程序的问题是if代码块中的复合操作cache.put(arg, ft)是在底层的Map对象上执行的,而这个对象无法通过加锁来确保原子性。
程序清单5-19
public class Memoizer <A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
在完成并发缓存的实现后,就可以为第2章中因式分解servlet添加缓存。
程序清单5-20
@ThreadSafe
public class Factorizer extends GenericServlet implements Servlet {
private final Computable<BigInteger, BigInteger[]> c =
new Computable<BigInteger, BigInteger[]>() {
public BigInteger[] compute(BigInteger arg) {
return factor(arg);
}
};
private final Computable<BigInteger, BigInteger[]> cache
= new Memoizer<BigInteger, BigInteger[]>(c);
public void service(ServletRequest req,
ServletResponse resp) {
try {
BigInteger i = extractFromRequest(req);
encodeIntoResponse(resp, cache.compute(i));
} catch (InterruptedException e) {
encodeError(resp, "factorization interrupted");
}
}
void encodeIntoResponse(ServletResponse resp, BigInteger[] factors) {
}
void encodeError(ServletResponse resp, String errorString) {
}
BigInteger extractFromRequest(ServletRequest req) {
return new BigInteger("7");
}
BigInteger[] factor(BigInteger i) {
// Doesn't really factor
return new BigInteger[]{i};
}
}