参考链接:
java并发编程-Executor框架+Future
1. 概述
Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他们的关系为:
- Excutor 执行器接口, Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作
- Future<T> 接口,用于获取异步任务的执行结果 。核心方法
V get() throws InterruptedException, ExecutionException; 获取类型为V的执行结果,如果任务还未产生结果,阻塞。 - Callable<T> 接口,提供一个返回类型T结果的 call()的方法
- public interface RunnableFuture<V> extends Runnable, Future<V>{} 一个继承了Runnable和Future接口的接口(接口可以继承其他接口,继承接口的时候可以继承多个),相当于带有Future功能的线程。
- public class FutureTask<V> implements RunnableFuture<V> 具有一个Callable的成员变量。
2.创建线程池
Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute
将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
3. ExcutorService接口
ExecutorService接口继承了Executor接口,添加了一些生命周期管理和任务提交机制的方法。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止 。ExecutorService创建时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再向ExecutorService中添加任务,所有已添加的任务执行完毕后,ExecutorService处于终止状态,isTerminated()返回true。
如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。
- 提交任务: <T> Future<T> submit(Callable<T> task);
- 执行所有任务:<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
4 一个并发计算数组和的实例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ConcurrentCalculator {
private ExecutorService exec;
private int cpuCoreNumber;
private List<Future<Long>> tasks = new ArrayList<Future<Long>>();
class SumCalculator implements Callable<Long> {
private int[] nums;
private int start, end;
public SumCalculator(final int[] nums, int start, int end) {
this.nums = nums;
this.start = start;
this.end = end;
}
@Override
public Long call() throws Exception {
long sum = 0l;
for (int i = start; i < end; i++) {
sum += nums[i];
}
return sum;
}
}
public ConcurrentCalculator() {
cpuCoreNumber = Runtime.getRuntime().availableProcessors();
exec = Executors.newFixedThreadPool(cpuCoreNumber);
}
public Long sum(final int[] nums) {
for (int i = 0; i < cpuCoreNumber; i++) {
int increment = nums.length / cpuCoreNumber + 1;
int start = increment * i;
int end = increment * i + increment;
if (end > nums.length) {
end = nums.length;
}
SumCalculator sumCalc = new SumCalculator(nums, start, end);
FutureTask<Long> task = new FutureTask<Long>(sumCalc);
tasks.add(task);
}
return getResult();
}
private Long getResult() {
Long result = 0l;
for (Future<Long> task : tasks) {
try {
Long subSum = task.get();
result += subSum;
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (ExecutionException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
return result;
}
public void close() {
exec.shutdown();
}
}