并发处理利器-CompletionService

版权声明:本文为博主原创文章,未经博主允许不得转载。

摘要

考虑这样一个需求,并发处理一批任务,每个任务都完成之后,对结果做一些后续处理,最后汇总结果。第一个方案:启动多个线程并发处理任务,并循环监控每一个线程的处理结果Futrue,直到所有Future返回为止。这个方案可行,但还需要自己监控所有的结果完成情况,是不是很乏味。来试试CompletionService吧。

CompletionService

先看看这个接口定义了哪些方法:

  • Future<V> submit(Callable<V> task); 提交Callable任务,并返回Future结果。
  • Future<V> submit(Runnable task, V result); 与上一个方法类似,当任务完成时返回指定的result对象。
  • Future<V> take() throws InterruptedException; 获取并移除最新完成的任务结果,该过程是阻塞的。
  • Future<V> poll(); 获取并移除最新完成的任务结果,如果没有结果则返回null。
  • Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; 指定超时时间等待获取并移除最新的任务结果。

从获取结果的几个接口可以看到,返回的都是最新完成的结果。这就是重点所在,我们可以不需要去监控等待每一个结果(如果等待的第一个Future是最慢的,岂不是会妨碍其他先完成的任务吗),而是按结果完成顺序得到了每一个返回结果,先完成的结果可以先继续执行后续处理,这不是挺好嘛。

ExecutorCompletionService是该接口的实现类,内部有一个线程池和BlockingQueue队列。它的实现原理其实挺简单:每个提交给ExecutorCompletionService的任务,都会被封装成一个QueueingFuture(FutureTask的子类),它重写了done()方法(该方法会在任务执行完成之后回调),将执行完成的FutureTask加入到内部队列,take()等方法其实是到内部队列中获取得到最新完成的结果FutrueTask。

对比

从代码层面来看看两种方案的差异:

public void test1() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    List<Future<String>> results = new ArrayList<Future<String>>(10);
    for(int i=0; i<10; i++) {
        Future<String> result = executorService.submit(new MyRunnable());
        results.add(result);
    }
    for(Future result : results) {
        String str = result.get();//遍历等待每一个Future, 如果第一个任务是最慢的,那么整个进度就会被拖慢
        //do something
    }
    //汇总操作
    executorService.shutdown();
}
public void test2() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletionService<String> completionService = new executorCompletionService<String>(executorService);
    for(int i=0; i<10; i++) {
        completionService.submit(new MyRunnable());
    }
    for(int i=0; i<10; i++) {
        String str = completionService.take().get();//先完成的结果先执行后续处理
        //do something
    }
    //汇总操作
    executorService.shutdown();
}

从代码量上看差异比较少,但是方案2不用单独维护一个List来保存所有的处理结果Future。重要的是,completionService因为任务结果按完成顺序陆续到来,每个任务的进度不会相互干扰,那么后续操作也不会相互影响,而第一种方案中如果第一个任务很慢,那么其他任务都要空闲等待第一个任务完成,才能继续后面的操作,这一点就明显影响到了性能。

小心踩坑

  • 关闭线程池

在测试方法中,为了演示而创建了线程池,方法结束时也关闭了线程池。如果你的代码与示例代码类似,那么请记住关闭线程池,否则即使方法退出之后,创建的线程也得不到回收和关闭,迟早将耗尽资源或撑爆内存。如果你的线程池是全局共享的,那么不存在这个问题,JVM关闭时会关闭线程池。

  • 错误的使用方式
public void test3() {
    Future<String> future = completionService.submit(new MyRunnable());//这里的线程池是共享的
    String str = future.get();
    //do something
}

该场景也许不太合适使用completeService,但是这里要说明的是另一个问题,直接使用completionService.submit的返回结果Future会造成内存泄漏,因为该方式只关心获取当前返回的结果,而忽略了BlockingQueue中保存的Future对象,BlockingQueue队列会不断变大(默认实现是LinkedBlockingQueue,无界队列),迟早将内存撑爆。正确的使用方式还是通过completionService.take()来获取Future对象。

如有什么地方描述不对,欢迎指出。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 880评论 0 3
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,905评论 1 19
  • 一、并发 进程:每个进程都拥有自己的一套变量 线程:线程之间共享数据 1.线程 Java中为多线程任务提供了很多的...
    SeanMa阅读 2,573评论 0 11
  • 第三章 Java内存模型 3.1 Java内存模型的基础 通信在共享内存的模型里,通过写-读内存中的公共状态进行隐...
    泽毛阅读 4,400评论 2 22
  • 作者:李相文 小时候过年,我听得是多的一句话是父亲说的,三十夜的火,正月半的灯。 三十夜的“火”,是指每年的除夕,...
    相子阅读 634评论 1 0