ITEM 81: PREFER CONCURRENCY UTILITIES TO WAIT AND NOTIFY
这本书的第一版专门介绍了如何正确使用 wait 和 notify [Bloch01,item 50]。它的建议仍然有效,并在本项目结束时加以总结,但这个建议远没有以前那么重要。这是因为使用等待和通知的理由要少得多。自 Java 5 以来,该平台提供了更高级别的并发实用程序,可以完成以前必须在 wait 和 notify 之上手工编写的那些事情。考虑到正确使用等待和通知的困难,您应该使用更高级的并发实用程序。
java.util.concurrent 中的高级实用工具分为三类:Executor,item 80已简要介绍;concurrent collections ;和 synchronizers。本项目将简要介绍 concurrent collections 和 synchronizers 。
Concurrent collections 是标准集合接口(如列表、队列和映射) 的高性能并发实现。为了提供高并发性,这些实现在内部管理它们自己的同步(item 79)。因此,不可能从并发集合中排除并发活动;锁定它只会降低程序的速度。
因为不能在并发集合上排除并发活动,所以也不能在它们上原子地组合方法调用。因此,并发集合接口配备了依赖于状态的修改操作,这些操作将几个原语组合成单个原子操作。这些操作被证明对并发集合非常有用,因此在 Java 8 中使用默认方法将它们添加到相应的集合接口中(item 21)。
例如,Map 的 putIfAbsent(key, value) 方法在键不存在的情况下为键插入映射,并返回与键关联的前一个值,如果键不存在,则返回 null。这使得实现线程安全的规范化映射变得很容易。这个方法模拟了 String.intern 的行为。
// Concurrent canonicalizing map atop ConcurrentMap - not optimal
private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
public static String intern(String s) {
String previousValue = map.putIfAbsent(s, s);
return previousValue == null ? s : previousValue;
}
事实上,你可以做得更好。ConcurrentHashMap 针对检索操作进行了优化,比如 get。因此,首先调用 get 是值得的,只有在 get 指出有必要时才调用 putIfAbsent:
// Concurrent canonicalizing map atop ConcurrentMap - faster!
public static String intern(String s) {
String result = map.get(s);
if (result == null) {
result = map.putIfAbsent(s, s);
if (result == null)
result = s;
}
return result;
}
除了提供优秀的并发性,ConcurrentHashMap 非常快。在我的机器上,上面的 intern 方法比 String.intern 快6倍以上(但是请记住,String.intern 必须使用一些策略来防止在长期生存的应用程序中发生内存泄漏)。并发集合使得同步集合在很大程度上过时。例如,优先使用ConcurrentHashMap 而不是 Collections.synchronizedMap。简单地用并发映射替换同步映射可以显著提高并发应用程序的性能。
一些集合接口通过阻塞操作进行了扩展,阻塞操作等待(或阻塞)直到成功执行它们。例如,BlockingQueue 扩展了 Queue 并添加了几个方法,包括 take,它从队列中删除并返回头元素,如果队列为空则等待。这允许阻塞队列用于工作队列(也称为生产者-消费者队列),一个或多个生产者线程对工作项进行排队,以及一个或多个消费者线程在项目可用时从工作项取出队列并处理它们。如您所料,大多数ExecutorService 实现,包括 ThreadPoolExecutor,都使用 BlockingQueue (item 80)。
同步器是允许线程彼此等待的对象,允许它们协调自己的活动。最常用的同步器是 CountDownLatch 和 Semaphore。比较不常用的是 CyclicBarrier 和 Exchanger。最强大的同步器是 Phaser 。
CountDownLatch 是一次性使用的栅栏,允许一个或多个线程等待一个或多个其他线程做某事。CountDownLatch 的唯一构造函数接受一个int 类型,它表示在允许所有等待线程继续执行之前,必须在 countDown 上调用倒计时方法的次数。
在这个简单的原语之上构建有用的东西非常容易。例如,假设您想要构建一个简单的框架来为操作的并发执行计时。这个框架由一个单独的方法组成,该方法采用一个执行器来执行操作,一个表示要并发执行的操作数量的并发级别,以及一个表示操作的 runnable。在计时器线程启动时钟之前,所有的工作线程都已准备好运行操作。当最后一个工作线程准备运行动作时,定时器线程“启动发令枪”,允许工作线程执行动作。当最后一个工作线程完成操作时,计时器线程停止时钟。实现这个逻辑直接在等待和通知上面至少可以说是混乱的,但在 CountDownLatch 上它是惊人的简单:
// Simple framework for timing concurrent execution
public static long time(Executor executor, int concurrency, Runnable action) throws InterruptedException {
CountDownLatch ready = new CountDownLatch(concurrency);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.execute(() -> {
ready.countDown(); // Tell timer we're ready
try {
start.await(); // Wait till peers are ready
action.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
done.countDown(); // Tell timer we're done
}
});
}
ready.await(); // Wait for all workers to be ready
long startNanos = System.nanoTime();
start.countDown(); // And they're off!
done.await(); // Wait for all workers to finish
return System.nanoTime() - startNanos;
}
注意,该方法使用了三个倒计时锁存。第一个是 ready 它由工作线程用来告诉计时器线程何时准备就绪。然后工作线程等待第二个锁存,即start。当最后一个工作线程调用 ready.countDown, 定时器线程记录启动时间并调用 start.countDown, 允许所有工作线程继续进行。然后定时器线程等待第三个锁存,done,直到最后一个工作线程完成操作并调用 done.countDown。一旦发生这种情况,计时器线程就会唤醒并记录结束时间。
还有一些细节值得注意。传递给 time 方法的执行程序必须允许创建至少与给定并发级别相同的线程,否则测试将永远不会完成。这就是所谓的线程饥饿死锁[Goetz06, 8.1.1]。如果一个工作线程捕获了一个 InterruptedException,它使用 Thread.currentThread().interrupt() 重新指定中断,并从它的 run 方法返回。这允许执行程序按照它认为合适的方式处理中断。请注意 System.nanoTime 用于为活动计时。对于间隔计时,始终使用 System.nanoTime 而不是 System.currentTimeMillis。 System.nanoTime 时间更加精确,而且不受系统实时时钟调整的影响。最后,请注意,这个示例中的代码不会产生准确的计时,除非 action 执行了相当多的工作,比如一秒或更长时间。精确的微基准测试是出了名的困难,最好借助诸如 jmh [jmh] 这样的专门框架来完成。
这一项只涉及到并发实用程序可以做的事情的皮毛。例如,前面例子中的三个倒计时锁存可以用一个 cyclicbarrier 或 Phaser 实例代替。得到的代码会更简洁一些,但可能更难以理解。
虽然您应该优先使用并发实用程序来等待和通知,但您可能必须维护使用等待和通知的遗留代码。wait 方法用于让线程等待某个条件。它必须在同步区域内调用,该同步区域锁定调用它的对象。下面是使用 wait 方法的标准习语:
// The standard idiom for using the wait method
synchronized (obj) {
while (<condition does not hold>)
obj.wait(); // (Releases lock, and reacquires on wakeup)
... // Perform action appropriate to condition
}
始终使用等待循环习语来调用等待方法;永远不要在循环之外调用它。循环用于在等待之前和之后测试条件。
在等待之前测试条件,如果条件已经保持,则跳过等待以确保活动。如果条件已经存在,并且在线程等待之前已经调用了notify(或notifyAll)方法,则不能保证线程将从等待中醒来。
等待后测试条件,如果条件不存在,则再次等待是确保安全的必要条件。如果线程在条件不存在时继续操作,它可以销毁锁保护的不变式。当条件不存在时,线程可能被唤醒有几个原因:
• 另一个线程可能已经获得了锁,并改变了保护状态之间的时间线程调用通知和等待线程被唤醒。
• 当条件不存在时,另一个线程可能意外或恶意地调用了notify。类通过等待公共可访问的对象将自己暴露给这种恶作剧。公共可访问对象的同步方法中的任何等待都容易受到这个问题的影响。
• 通知线程在唤醒等待线程时可能过于“慷慨”。例如,通知线程可能会调用 notifyAll,即使只有一些等待线程满足了它们的条件。
•在没有通知的情况下,等待的线程(很少)会被唤醒。这被称为虚假唤醒 [POSIX, 11.4.3.6.1;Java9-api]。
一个相关的问题是使用 notify 还是 notifyAll 来唤醒等待的线程。(回想一下,notify 会唤醒一个等待线程(假设存在这样一个线程),而 notifyAll会唤醒所有等待线程。) 有时有人说,应该始终使用 notifyAll。这是一个合理而保守的建议。它总是会产生正确的结果,因为它确保您将唤醒需要被唤醒的线程。您也可以唤醒其他一些线程,但这不会影响程序的正确性。这些线程将检查它们正在等待的条件,如果发现该条件为假,将继续等待。
作为一种优化,如果等待集中的所有线程都在等待相同的条件,并且每次只有一个线程可以从条件变为真中获益,那么可以选择调用 notify 而不是 notifyAll。
即使满足了这些前提条件,也可能有理由使用 notifyAll 代替 notify。就像在循环中放置等待调用可以防止公共可访问对象上的意外或恶意通知一样,使用 notifyAll 代替 notify 可以防止不相关线程的意外或恶意等待。否则,这样的等待可能会“吞下”一个重要通知,让预定收件人无限期地等待。
总之,与 java.util.concurrent 提供的高级语言相比,直接使用 wait 和 notify 就像是在“并发汇编语言”中编程。在新代码中很少有理由使用wait 和 notify 。如果您维护使用 wait 和 notify 的代码,请确保它始终使用标准习语从 while 循环中调用 wait。通常应该优先使用 notifyAll 方法,而不是 notify。如果使用 notify,则必须非常小心,以确保活性。