8. 线程池的使用

8.1 在任务与执行策略之间的隐形耦合

只有当任务都是同种类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他的任务,那么除非线程池很大,否则将可能造成死锁。

8.1.1 线程饥饿死锁

在线程池中,如果任务依赖于其他任务,那么将可能产生死锁。

如下:RenderPageTask想Executor提交了两个任务来获取网页的页眉和页脚

public class ThreadDeadLock {
    ExecutorService exec = ExecutorService.newSingleThreadExecutor();

    public class RendererPageTask implements Callable<String> {
        public String call () throws Exception {
            Futrue<String> header,footer;
            header = exec.submit (new LoadFileTask("header.html"));
            footer = exec.submit (new LoadFileTask("footer.html"));
            String page = renderBody ();
            //将发生死锁 --- 由于任务在等待子任务的结果
            return header.get() + footer.get();
        }
    } 
}
8.1.2 运行时间较长的任务

如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。为此,我们可以通过限定任务等待资源的时间(如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等),而不要无限制地等待。如果等待时间超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。

8.2 设置线程池的大小

线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。但一定要注意避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空间的处理器无法执行工作,从而降低吞吐量。

  • 如果是CPU密集型应用,则线程池大小设置为N+1
  • 如果是IO密集型应用,则线程池大小设置为2N+1

8.3 配置ThreadPoolExecutor

ThreadPoolExecutor构造函数.png
8.3.1 管理队列任务

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方式有3种:无界队列、有界队列和同步移交(Synchronous Handoff)。

  • newFixedPoolExecutor和newSinglePoolExecutor在默认情况下使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过线程池处理它们的速度,那么队列将无限制地增加。

  • 有界队列(ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue)有助于避免资源耗尽的情况发生,但可能会带来一个问题:当队列填满后,新的任务怎么办(使用饱和策略)。
    在使用有界队列时,队列的大小与线程池的大小必须一起调节。如果线程池小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但代价是可能会限制吞吐量。

  • 对于非常大的或者无界的线程池(如newCachedThreadPool),可以通过使用SynchronousQueue来避免任务排队。SynchronousQueue并不是真正的队列,而是一种在线程之间进行移交的机制。

  • 如果想进一步控制任务执行顺序,那么可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。任务的优先级时通过自然顺序或Comparator来定义的。

建议:只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool。

8.3.2 饱和策略

当有界队列填满时,饱和策略就会开始发回作用。常见的饱和策略如下:

  • AbortPolicy(中止策略):默认策略,该策略将抛出未检查的RejectedExecution-Exception。调用者可以捕获这个异常,然后根据需求来编写自己的代码。
  • DiscardPolicy(抛弃策略):当新提交的任务无法保存到队列中等待执行时,该策略会悄悄将任务抛弃。
  • DiscardOldestPolicy(抛弃最旧的):该策略将会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。
  • CallerRunsPolicy(调用者运行):该策略实现了一种调节机制,它既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

相关Demo

使用信号量来限制任务的到达率:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;
    
    public BoundedExecutor(Executor exec, Semaphore semaphore){
        this.exec = exec;
        this.semaphore = semaphore;
    }
    
    public void submitTask(final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try{
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e){
            semaphore.release();
        }
    }
}
8.3.4 线程工厂

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将创建一个新的,非守护的线程,并且不包含任何含特殊的配置信息。

  • 线程工厂方法:
public interface ThreadFactory {
    Thread newThread(Runnable r);
}
  • 自定义的线程工厂:
public class MyThreadFactory implements ThreadFactory {
    private final String poolName;

    public MyThreadFactory(String poolName){
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new MyAppThread(r,poolName);
    }
}
  • 定制Thread基类:为线程设置名字,并维护一些统计信息
public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifeCycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAppThread(Runnable r) { this(r,DEFAULT_NAME); }

    public MyAppThread(Runnable r, String name){
        super(r,name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }

    public void run(){
        boolean debug = debugLifeCycle;
        if (debug) log.log(Level.FINE, "Created " + getName());
        try{
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }

    public static int getThreadsCreated() { return created.get(); }
    public static int getThreadsAlive() { return  alive.get(); }
    public static boolean getDebug() { return debugLifeCycle; }
    public static void setDebug(boolean b) { debugLifeCycle = b; }
}
8.3.5 在调用构造函数后再定制ThreadPoolExecutor

在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置(setter)来修改大多数传递给它的构造函数的参数。当如果Executor时通过工厂方法创建的,那么结果的类型转换为ThreadPoolExecutor来访问设置器。

ExecutorService exec = Executors.newCachedThreadPool();
if(exec instanceof ThreadPoolExecutor)
    ((ThreadPoolExecutor) exec).setCorePoolSize(10);
else 
    throw new AssertionError("Oops,bad assumption");

8.4 扩展ThreadPoolExecutor

在执行任务的线程中将调用beforeExecute和afterExecute方法,在这些方法中可以添加日志、计时、监视或统计信息收集的功能。在线程池完成关闭操作时调用terminated方法,也就是在所有任务都已经完成并且工作者线程也已经关闭后。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录通知或者收集finalizer统计信息等操作。

  • 示例:给线程池添加统计信息
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();
    private final Logger log = Logger.getLogger("TimingThreadPool");
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s",t,r));
        startTime.set(System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
        } finally {
            super.afterExecute(r, t);
        }
    }

    @Override
    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time = %dns", totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

8.5 递归算法的并行化

如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么就可以使用Executor将串行循环转换为并行循环。

  • 将串行转换为并行执行:
void processSequentially (List<Element> elements){
    for (Element e : elements)
        process(e);
}

void processInParallel (Executor exec, List<Element> elements){
    for (final Element e : elements)
        exec.execute(new Runnable(){
            public void run() { process(e); }
        });
}
  • 将串行递归转换为并行递归:
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
    for (Node<T> n : nodes){
        results.add(n.compute());
        sequentialRecursive(n.getChildren(), results);
    }
}

public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes,
 final Collection<T> results){
    for (final Node<T> n : nodes){
        exec.execute(new Runnable(){
            public void run(){ results.add(n.compute()); }
        });
        parallelRecursive(exec, n.getChildren(), results);
    }
}

注意:这里的并行指的是compute方法的调用,而不是遍历过程,在这里遍历过程依旧是串行的。

  • 等待通过并行方式计算的结果
public<T> Collection<T> getParallelResults (List<Node<T>> nodes) 
    throws InterruptedException{
    ExecutorService exec = ExecutorService.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec,nodes,resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUtil.SECONDES);
    return resultQueue;
}
  • 谜题框架

“谜题”包含了一个初始位置,一个目标位置,以及用于判断是否是有效移动的规则集。我们所需做的是在谜题空间中查找,直到找到一个解答或者找遍了整个空间都没有发现答案。

谜题的抽象类:

public interface Puzzle<P, M> {
    P initialPosition();
    boolean isGoal(P position);
    Set<M> legalMoves (P position);
    P move(P position, M move);
}

串行的谜题解答器:

public class SequentialPuzzleSolver<P,M> {
    private final Puzzle<P,M> puzzle;
    private final Set<P> seen = new HashSet<P>();

    public SequentialPuzzleSolver(Puzzle<P,M> puzzle){
        this.puzzle = puzzle;
    }

    public List<M> solve(){
        P pos = puzzle.initialPosition();
        return search(new Node<P, M>(pos,null,null));
    }

    private List<M> search (Node<P,M> node){
        if (!seen.contains(node.pos)){
            seen.add(node.pos);
            if (puzzle.isGoal(node.pos))
                return node.asMoveList();
            for (M move : puzzle.legalMoves(node.pos)){
                P pos = puzzle.move(node.pos, move);
                Node<P,M> child = new Node<P, M>(pos, move, node);
                List<M> result = search(child);
                if (result != null)
                    return result;
            }
        }
        return null;
    }

    static class Node<P,M> {
        final P pos;
        final M move;
        final Node<P,M> prev;
        Node(P pos, M move, Node<P,M> prev){
            this.pos = pos;
            this.move = move;
            this.prev = prev;
        }

        List<M> asMoveList(){
            List<M> solution = new LinkedList<M>();
            for (Node<P,M> n = this; n.move != null; n = n.prev)
                solution.add(0,n.move);
            return solution;
        }
    }
}

由于计算某次移动的过程很大程度上与计算其他移动的过程是相互独立的,因此如果有多个处理器使用,我们可以采用并行方法来减少查找的时间。

设置一个闭锁来保证阻塞获取结果:

public class ValueLatch<T> {
    private T value = null;
    private final CountDownLatch done = new CountDownLatch(1);

    public boolean isSet(){
        return (done.getCount() == 0);
    }

    public synchronized void setValue(T newValue){
        if (!isSet()){
            value = newValue;
            done.countDown();
        }
    }

    public T getValue() throws InterruptedException{
        done.await();
        synchronized (this){
            return value;
        }
    }
}

并发的谜题解答器:

public class ConcurrentPuzzleSolver<P,M> {
    private final Puzzle<P,M> puzzle;
    private final ExecutorService exec;
    private final ConcurrentHashMap<P,Boolean> seen;
    final ValueLatch<Node<P,M>> solution = new ValueLatch<Node<P, M>>();

    public ConcurrentPuzzleSolver(Puzzle<P,M> puzzle, ExecutorService exec){
        this.puzzle = puzzle;
        this.exec = exec;
        seen = new ConcurrentHashMap<P, Boolean>();
    }

    public List<M> solve() throws InterruptedException{
        try{
            P p = puzzle.initialPosition();
            exec.execute(newTask(p,null,null));
            //阻塞直到找到解答
            Node<P,M> solveNode = solution.getValue();
            return (solveNode == null) ? null : solveNode.asMoveList();
        } finally {
            exec.shutdown();
        }
    }

    protected Runnable newTask(P p, M m, Node<P,M> n){
        return new SolverTask(p,m,n);
    }

    class SolverTask extends Node<P,M> implements Runnable{

        SolverTask(P pos, M move, Node<P, M> prev) {
            super(pos, move, prev);
        }

        @Override
        public void run() {
            //已经找到答案或者已经遍历了这个位置
            if (solution.isSet() || seen.putIfAbsent(pos,true) != null) return;
            if (puzzle.isGoal(pos))
                solution.setValue(this);
            else
                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos,m),m,this));
        }
    }

    static class Node<P,M> {
        final P pos;
        final M move;
        final Node<P,M> prev;
        Node(P pos, M move, Node<P,M> prev){
            this.pos = pos;
            this.move = move;
            this.prev = prev;
        }

        List<M> asMoveList(){
            List<M> solution = new LinkedList<M>();
            for (Node<P,M> n = this; n.move != null; n = n.prev)
                solution.add(0,n.move);
            return solution;
        }
    }

}

但实际上这里存在一个问题:如果谜题空间中不存在答案呢?那么将会一直运行下去。因此,我们可以记录活动任务的数量,当该值为零时将解答设置null。

public class PuzzleSolver<P,M> extends ConcurrentPuzzleSolver<P,M> {
    private final AtomicInteger taskCount = new AtomicInteger(0);

    public PuzzleSolver(Puzzle<P, M> puzzle, ExecutorService exec) {
        super(puzzle, exec);
    }

    @Override
    protected Runnable newTask(P p, M m, Node<P, M> n) {
        return new CountingTask(p,m,n);
    }

    class CountingTask extends SolverTask{

        CountingTask(P pos, M move, Node<P, M> prev) {
            super(pos, move, prev);
            taskCount.incrementAndGet();
        }

        @Override
        public void run() {
            try{
                super.run();
            }finally {
                if (taskCount.decrementAndGet() == 0)
                    solution.setValue(null);
            }
        }
    }
}

注意:串行版本的程序执行深度优先搜索,因此搜索过程将受限于栈的大小。并发版本的程序执行广度优先搜索,因此不会受到栈大小的限制,但如果代搜索的或者已搜索的位置集合大小超过了可用的内存总量,那么仍可能耗尽内存。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容