并发编程实战二之线程池和CompletionService

线程池

线程饥饿死锁

任务依赖于其他任务,线程池不够大
单线程,一个任务将另一个任务提交到同一个Executor。

设置线程池的大小

int N_CPUS = Runtime.getRuntime().availableProcessors();
计算密集型  thread = N_CPUS+1
包含I/O或其他阻塞操作的任务  thread = N_CPUS*U_CPUS(1 + W/C)
    U_CPUS  ——  基准负载
    W/C  ——  等待时间与计算时间的比值
内存、文件句柄、套接字句柄、数据库连接 —— 资源可用总量/每个任务的需求量

线程池的创建

public ThreadPoolExecutor(int corePoolSize,          //基本大小
                              int maximumPoolSize,        //最大
                              long keepAliveTime,        //线程存活时间
                              TimeUnit unit,            
                              BlockingQueue<Runnable> workQueue,    //线程队列
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Executors.newSingleThreadExecutor();

Executors.newFixedThreadPool(100)基本大小和最大大小设置为参数指定值
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Executors.newCachedThreadPool()线程池最大大小设置为Integer.MAX_VALUE,队列为SynchronousQueue
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

串行递归转并行递归

public class SeToParallel {
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
        for(Node<T> n:nodes){
            results.add(n.compute());
            sequentialRecursive(n.getChildren(),results);
        }
    }

    public <T> void parallelRecursive(final Executor exec,List<Node<T>> nodes,final Collection<T> results){
        for(final Node<T> n : nodes){
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    results.add(n.compute());
                }
            });
            parallelRecursive(exec,n.getChildren(),results);
        }
    }

    public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<>();
        parallelRecursive(exec,nodes,resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }

        class Node<T>{

            private T t;
            private List<Node<T>> children;

            public List<Node<T>> getChildren() {
                return children;
            }

            public T compute(){
                return t;
            }
        }
}

CompletionService:Executor与BlockingQueue

ExecutorCompletionService实现了CompletionService,用BlockingQueue保存计算完成的结果。提交任务是,任务被包装成QueueingFuture.
页面逐渐渲染:

public class Renderer {
    private final ExecutorService executorService;

    public Renderer(ExecutorService executorService) {
        this.executorService = executorService;
    }
    
    void renderPage(CharSequence source){
        List<ImageInfo> info = scanForImageInfo(source);
            CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executorService);
        for(final ImageInfo imageInfo:info){
            completionService.submit(new Callable<ImageData>() {
                @Override
                public ImageData call() throws Exception {
                    return imageInfo.downloadImage();
                }
            });
        }
        
        renderText(source);
        try {
            for(int t = 0,n = info.size();t < n;t++){
                Future<ImageData> f = completionService.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 4,324评论 0 3
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 11,121评论 1 19
  • 原文链接:http://blog.csdn.net/u010687392/article/details/4985...
    xpengb阅读 5,142评论 0 1
  • 前段时间遇到这样一个问题,有人问微信朋友圈的上传图片的功能怎么做才能让用户的等待时间较短,比如说一下上传9张图片,...
    加油码农阅读 4,933评论 0 2
  • 如果伤害了谁 就鼓起勇气 由衷地说声对不起 给受伤者些许安慰 也让自己的良心 少受点谴责和压力 微笑能化解矛盾 真...
    韬声依旧0622阅读 3,395评论 1 8