8.1 在任务与执行策略之间的隐形耦合
只有当任务都是同种类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他的任务,那么除非线程池很大,否则将可能造成死锁。
8.1.1 线程饥饿死锁
在线程池中,如果任务依赖于其他任务,那么将可能产生死锁。
如下:RenderPageTask想Executor提交了两个任务来获取网页的页眉和页脚
public class ThreadDeadLock {
ExecutorService exec = ExecutorService.newSingleThreadExecutor();
public class RendererPageTask implements Callable<String> {
public String call () throws Exception {
Futrue<String> header,footer;
header = exec.submit (new LoadFileTask("header.html"));
footer = exec.submit (new LoadFileTask("footer.html"));
String page = renderBody ();
//将发生死锁 --- 由于任务在等待子任务的结果
return header.get() + footer.get();
}
}
}
8.1.2 运行时间较长的任务
如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。为此,我们可以通过限定任务等待资源的时间(如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等),而不要无限制地等待。如果等待时间超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。
8.2 设置线程池的大小
线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。但一定要注意避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空间的处理器无法执行工作,从而降低吞吐量。
- 如果是CPU密集型应用,则线程池大小设置为N+1
- 如果是IO密集型应用,则线程池大小设置为2N+1
8.3 配置ThreadPoolExecutor
8.3.1 管理队列任务
ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方式有3种:无界队列、有界队列和同步移交(Synchronous Handoff)。
newFixedPoolExecutor和newSinglePoolExecutor在默认情况下使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过线程池处理它们的速度,那么队列将无限制地增加。
有界队列(ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue)有助于避免资源耗尽的情况发生,但可能会带来一个问题:当队列填满后,新的任务怎么办(使用饱和策略)。
在使用有界队列时,队列的大小与线程池的大小必须一起调节。如果线程池小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但代价是可能会限制吞吐量。对于非常大的或者无界的线程池(如newCachedThreadPool),可以通过使用SynchronousQueue来避免任务排队。SynchronousQueue并不是真正的队列,而是一种在线程之间进行移交的机制。
如果想进一步控制任务执行顺序,那么可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。任务的优先级时通过自然顺序或Comparator来定义的。
建议:只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool。
8.3.2 饱和策略
当有界队列填满时,饱和策略就会开始发回作用。常见的饱和策略如下:
- AbortPolicy(中止策略):默认策略,该策略将抛出未检查的RejectedExecution-Exception。调用者可以捕获这个异常,然后根据需求来编写自己的代码。
- DiscardPolicy(抛弃策略):当新提交的任务无法保存到队列中等待执行时,该策略会悄悄将任务抛弃。
- DiscardOldestPolicy(抛弃最旧的):该策略将会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。
- CallerRunsPolicy(调用者运行):该策略实现了一种调节机制,它既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
使用信号量来限制任务的到达率:
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, Semaphore semaphore){
this.exec = exec;
this.semaphore = semaphore;
}
public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire();
try{
exec.execute(new Runnable() {
@Override
public void run() {
try{
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e){
semaphore.release();
}
}
}
8.3.4 线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将创建一个新的,非守护的线程,并且不包含任何含特殊的配置信息。
- 线程工厂方法:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
- 自定义的线程工厂:
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName){
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
return new MyAppThread(r,poolName);
}
}
- 定制Thread基类:为线程设置名字,并维护一些统计信息
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifeCycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) { this(r,DEFAULT_NAME); }
public MyAppThread(Runnable r, String name){
super(r,name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){
@Override
public void uncaughtException(Thread t, Throwable e) {
log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run(){
boolean debug = debugLifeCycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try{
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() { return created.get(); }
public static int getThreadsAlive() { return alive.get(); }
public static boolean getDebug() { return debugLifeCycle; }
public static void setDebug(boolean b) { debugLifeCycle = b; }
}
8.3.5 在调用构造函数后再定制ThreadPoolExecutor
在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置(setter)来修改大多数传递给它的构造函数的参数。当如果Executor时通过工厂方法创建的,那么结果的类型转换为ThreadPoolExecutor来访问设置器。
ExecutorService exec = Executors.newCachedThreadPool();
if(exec instanceof ThreadPoolExecutor)
((ThreadPoolExecutor) exec).setCorePoolSize(10);
else
throw new AssertionError("Oops,bad assumption");
8.4 扩展ThreadPoolExecutor
在执行任务的线程中将调用beforeExecute和afterExecute方法,在这些方法中可以添加日志、计时、监视或统计信息收集的功能。在线程池完成关闭操作时调用terminated方法,也就是在所有任务都已经完成并且工作者线程也已经关闭后。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录通知或者收集finalizer统计信息等操作。
- 示例:给线程池添加统计信息
public class TimingThreadPool extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s",t,r));
startTime.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
} finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
log.info(String.format("Terminated: avg time = %dns", totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
8.5 递归算法的并行化
如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么就可以使用Executor将串行循环转换为并行循环。
- 将串行转换为并行执行:
void processSequentially (List<Element> elements){
for (Element e : elements)
process(e);
}
void processInParallel (Executor exec, List<Element> elements){
for (final Element e : elements)
exec.execute(new Runnable(){
public void run() { process(e); }
});
}
- 将串行递归转换为并行递归:
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
for (Node<T> n : nodes){
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes,
final Collection<T> results){
for (final Node<T> n : nodes){
exec.execute(new Runnable(){
public void run(){ results.add(n.compute()); }
});
parallelRecursive(exec, n.getChildren(), results);
}
}
注意:这里的并行指的是compute方法的调用,而不是遍历过程,在这里遍历过程依旧是串行的。
- 等待通过并行方式计算的结果
public<T> Collection<T> getParallelResults (List<Node<T>> nodes)
throws InterruptedException{
ExecutorService exec = ExecutorService.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec,nodes,resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUtil.SECONDES);
return resultQueue;
}
- 谜题框架
“谜题”包含了一个初始位置,一个目标位置,以及用于判断是否是有效移动的规则集。我们所需做的是在谜题空间中查找,直到找到一个解答或者找遍了整个空间都没有发现答案。
谜题的抽象类:
public interface Puzzle<P, M> {
P initialPosition();
boolean isGoal(P position);
Set<M> legalMoves (P position);
P move(P position, M move);
}
串行的谜题解答器:
public class SequentialPuzzleSolver<P,M> {
private final Puzzle<P,M> puzzle;
private final Set<P> seen = new HashSet<P>();
public SequentialPuzzleSolver(Puzzle<P,M> puzzle){
this.puzzle = puzzle;
}
public List<M> solve(){
P pos = puzzle.initialPosition();
return search(new Node<P, M>(pos,null,null));
}
private List<M> search (Node<P,M> node){
if (!seen.contains(node.pos)){
seen.add(node.pos);
if (puzzle.isGoal(node.pos))
return node.asMoveList();
for (M move : puzzle.legalMoves(node.pos)){
P pos = puzzle.move(node.pos, move);
Node<P,M> child = new Node<P, M>(pos, move, node);
List<M> result = search(child);
if (result != null)
return result;
}
}
return null;
}
static class Node<P,M> {
final P pos;
final M move;
final Node<P,M> prev;
Node(P pos, M move, Node<P,M> prev){
this.pos = pos;
this.move = move;
this.prev = prev;
}
List<M> asMoveList(){
List<M> solution = new LinkedList<M>();
for (Node<P,M> n = this; n.move != null; n = n.prev)
solution.add(0,n.move);
return solution;
}
}
}
由于计算某次移动的过程很大程度上与计算其他移动的过程是相互独立的,因此如果有多个处理器使用,我们可以采用并行方法来减少查找的时间。
设置一个闭锁来保证阻塞获取结果:
public class ValueLatch<T> {
private T value = null;
private final CountDownLatch done = new CountDownLatch(1);
public boolean isSet(){
return (done.getCount() == 0);
}
public synchronized void setValue(T newValue){
if (!isSet()){
value = newValue;
done.countDown();
}
}
public T getValue() throws InterruptedException{
done.await();
synchronized (this){
return value;
}
}
}
并发的谜题解答器:
public class ConcurrentPuzzleSolver<P,M> {
private final Puzzle<P,M> puzzle;
private final ExecutorService exec;
private final ConcurrentHashMap<P,Boolean> seen;
final ValueLatch<Node<P,M>> solution = new ValueLatch<Node<P, M>>();
public ConcurrentPuzzleSolver(Puzzle<P,M> puzzle, ExecutorService exec){
this.puzzle = puzzle;
this.exec = exec;
seen = new ConcurrentHashMap<P, Boolean>();
}
public List<M> solve() throws InterruptedException{
try{
P p = puzzle.initialPosition();
exec.execute(newTask(p,null,null));
//阻塞直到找到解答
Node<P,M> solveNode = solution.getValue();
return (solveNode == null) ? null : solveNode.asMoveList();
} finally {
exec.shutdown();
}
}
protected Runnable newTask(P p, M m, Node<P,M> n){
return new SolverTask(p,m,n);
}
class SolverTask extends Node<P,M> implements Runnable{
SolverTask(P pos, M move, Node<P, M> prev) {
super(pos, move, prev);
}
@Override
public void run() {
//已经找到答案或者已经遍历了这个位置
if (solution.isSet() || seen.putIfAbsent(pos,true) != null) return;
if (puzzle.isGoal(pos))
solution.setValue(this);
else
for (M m : puzzle.legalMoves(pos))
exec.execute(newTask(puzzle.move(pos,m),m,this));
}
}
static class Node<P,M> {
final P pos;
final M move;
final Node<P,M> prev;
Node(P pos, M move, Node<P,M> prev){
this.pos = pos;
this.move = move;
this.prev = prev;
}
List<M> asMoveList(){
List<M> solution = new LinkedList<M>();
for (Node<P,M> n = this; n.move != null; n = n.prev)
solution.add(0,n.move);
return solution;
}
}
}
但实际上这里存在一个问题:如果谜题空间中不存在答案呢?那么将会一直运行下去。因此,我们可以记录活动任务的数量,当该值为零时将解答设置null。
public class PuzzleSolver<P,M> extends ConcurrentPuzzleSolver<P,M> {
private final AtomicInteger taskCount = new AtomicInteger(0);
public PuzzleSolver(Puzzle<P, M> puzzle, ExecutorService exec) {
super(puzzle, exec);
}
@Override
protected Runnable newTask(P p, M m, Node<P, M> n) {
return new CountingTask(p,m,n);
}
class CountingTask extends SolverTask{
CountingTask(P pos, M move, Node<P, M> prev) {
super(pos, move, prev);
taskCount.incrementAndGet();
}
@Override
public void run() {
try{
super.run();
}finally {
if (taskCount.decrementAndGet() == 0)
solution.setValue(null);
}
}
}
}
注意:串行版本的程序执行深度优先搜索,因此搜索过程将受限于栈的大小。并发版本的程序执行广度优先搜索,因此不会受到栈大小的限制,但如果代搜索的或者已搜索的位置集合大小超过了可用的内存总量,那么仍可能耗尽内存。