Item 81: Prefer concurrency utilities to wait and notify
对于wait
和notify
的使用建议,本书的第一版中做了单独的一个条目,这些建议依然有效并且列于本条目结尾。但这些建议不像曾经那么重要了。因为自Java 5以来,Java提供了高层抽象后的并发库java.util.concurrent
,这个包括三个部分的内容:执行框架、并发集合、同步器。第一部分上个条目已经结束,本条目关注剩余两个方面。
并发库实现了标准集合接口例如List
、Queue
和Map
,内部实现管理同步请求,提供了高并发支持。在并发集合中显然无法排除并发操作,所以并发集合接口提供了状态依赖的控制操作(state-dependent modify operations)。
例如:Map的putIfAbsent(key, value)
方法:如果key不存在则添加新key并绑定value,方法本身返回null值;如果key已经存在,则返回key对应的当前值。String.intern
方法就可以基于此实现——
// Concurrent canonicalizing map atop ConcurrentMap - not optimal
private static final ConcurrentMap<String, String> map =
new ConcurrentHashMap<>();
public static String intern(String s) {
String previousValue = map.putIfAbsent(s, s);
return previousValue == null ? s : previousValue;
}
ConcurrentHashMap
甚至对取数据做了优化,例如get
方法——(下面的intern方法比String.intern
方法快很多)
// Concurrent canonicalizing map atop ConcurrentMap - faster!
public static String intern(String s) {
String result = map.get(s);
if (result == null) {
result = map.putIfAbsent(s, s);
if (result == null)
result = s;
}
return result;
}
一些并发结合对象扩展了阻塞操作(blocking operation),一直等待直到任务成功执行完毕。例如BlockingQueue
扩展了Queue
的同时增加了几个方法:take
从队列头取出元素,如果队列为空则一直等待。实际上,执行器的许多实现都基于BlockingQueue实现
同步器Synchronizer
确保线程依次等待,用来协调他们之间的活动。最常用的同步器是CountDownLatch
和Semaphore
,不常用的包括CyclicBarrier
和Exchanger
;最强大的则是Phaser
“倒数锁”(Countdown latch
)允许一个或多个线程等待一个或多个线程完成事务处理。唯一的构造函数只需要一个int值表示countDown
方法要被调用多少次之后,所以等待线程才被允许继续执行。例如,假设创建一个框架计算某个action的并发执行耗时:
- 包括一个执行器执行动作
- concurrency水平表示有多大的并发量
- runnable对象
所有的工作线程做好准备等待计时器线程“发令”执行;最后一个线程执行完任务后,计时线程停止计时。使用wait
和notify
实现这个场景将十分复杂,但基于CountDownLatch
之上却十分简洁——
// Simple framework for timing concurrent execution
public static long time(Executor executor, int concurrency,
Runnable action) throws InterruptedException {
CountDownLatch ready = new CountDownLatch(concurrency);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.execute(() -> {
ready.countDown(); // Tell timer we're ready
try {
start.await(); // Wait till peers are ready
action.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
done.countDown(); // Tell timer we're done
}
});
}
ready.await(); // Wait for all workers to be ready
long startNanos = System.nanoTime();
start.countDown(); // And they're off!
done.await(); // Wait for all workers to finish
return System.nanoTime() - startNanos;
}
上述方法需要注意的点:
- 需要确保可以创建足够的执行线程,否则这个方法将永远不会停止。线程不足导致的死锁被称为:“线程饥饿死锁”(
thread starvation deadlock
) - 如果某个线程被抛出了中断异常
InterruptedException
,调用了Thread.currentThread().interrupt()
方法重新声明中断,正常返回run方法结果 - 计时使用
System.nanoTime
而不是System.currentTimeMillis
。前者更精确并且不受系统时间的影响 - 只有runnable对象需要消耗一些时间时,最终的计时才有意义。实际上,精确的“微基准”是很难获取到的。最后使用专业的框架例如
JMH
(Java Microbenchmark Harness (JMH))
上述例子只是一个演示,实际上其中的三个countdownlatch
对象可以使用一个CyclicBarrier
或Phaser
对象代替。代码将更简洁但会更难理解一些。
标准的wait用法:永远只在循环中调用wait方法。同时确保wait
方法在notify
或notifyall
方法调用之前就已经被调。否则无法保证线程会被重新唤醒
// The standard idiom for using the wait method
synchronized (obj) {
while (<condition does not hold>)
obj.wait(); // (Releases lock, and reacquires on wakeup)
... // Perform action appropriate to condition
}
有了并发框架之后,最后不要再使用原生的wait
和notify
方法控制并发流程。