Java异步编程 4 CompletionService

CompletionService实际上可以看做是Executor和BlockingQueue的结合体。

CompletionService 依赖于一个单独的 Executor 来实际运行任务。
内部管理了一个堵塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步运行该RunnableFuture。当其状态变为done后,加入CompletionService的堵塞队列中,外部通过调用take()(堵塞)或者poll()(非堵塞,为空返回null)方法获取运行结果。

CompletionService接口,唯一实现类ExecutorCompletionService。

按照执行结果完成的先后顺序take。

CompletionService实现原理

ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。

当计算完成时,调用FutureTask的done方法。

当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。

QueueingFuture的源码如下:
private class QueueingFuture extends FutureTask<Void> {
       QueueingFuture(RunnableFuture<V> task) {
           super(task, null);
           this.task = task;
       }
       protected void done() { completionQueue.add(task); }
       private final Future<V> task;
   }
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
   
    ...
}

ExecutorCompletionService主要是增强executor线程池的。
Task包装后被塞入completionQueue,当Task结束,其Future就可以从completionQueue中获取到。

ExecutorCompletionService原理图

CompletionService接口源码:

public interface CompletionService<V> {
    // 提交
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    // 获取
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

例:向CompletionService中提交10个Task,当Task有任务返回则会优先从CompletionService内部的队列中获取到Task的Future。

package async;

import java.util.Random;
import java.util.concurrent.*;

/**
 * CompletionService
 * 应用举例1
 */
public class Main5 {
    public static void main(String[] args) {
        final ExecutorService pool = Executors.newFixedThreadPool(5);
        CompletionService<Integer> completionService
                = new ExecutorCompletionService<Integer>(pool);

        for (int i=0;i<10;i++) {
            completionService.submit(new Task(i));
        }

        System.out.println("begin get value....");
        // 异步非阻塞队列获取结果
        for (int i=0;i<10;i++) {
            try {
                Integer res = completionService.take().get();
                System.out.println(res);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println("end get value....");
        pool.shutdown();
    }

    static class Task implements Callable<Integer> {
        Integer value = 0;

        public Task(int i) {
            this.value = i;
        }
        @Override
        public Integer call() throws Exception {
            TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 1);
            System.out.println(Thread.currentThread().getName() + " finished..."
                    + new Random().nextInt(5) + 1);
            return new Random().nextInt(5) + 1;
        }
    }
}

public interface Executor {
  void execute(Runnable command);
}

public interface ExecutorService extends Executor {
}

public interface CompletionService<V> {

}

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。