Executor框架简介
Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
Executor:一个接口,其定义了一个接收Runnable对象的方法executor(Runnable command))。
public interface Executor {
void execute(Runnable command);
}
ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法。
public interface ExecutorService extends Executor {
/**
* 有序的关闭先前提交的执行任务,若已经关闭,则直接返回
*/
void shutdown();
/**
* 试图停止已经执行的任务,中止等待执行的任务,并返回等待执行的任务列表
*/
List<Runnable> shutdownNow();
/**
* 是否已经停止
*/
boolean isShutdown();
/**
* 是否所有任务都已经完成后关闭
* 注:除非shutdownNow()或者shutdown()先被调用,否则isTerminated()不可能返回true
*/
boolean isTerminated();
/**
* 阻塞,一直到所有的task在shutdown之后都已经执行完毕 或者 发生超时 或者 当前线程被中断
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 提交一个有返回值的执行任务,返回值是一个表示任务执行结果的Future对象,这个Future对象的get方法将在任务执行成功之后返回任务结果
* 如果你想立即阻塞等待任务,可以使用构造器方式:result =exec.submit(aCallable).get();
* 注: Executors 类包括一组可以转换其他常见的闭包类对象方法,比如:java.security.PrivilegedAction到Callable的形式,以便它们可以提交
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个有返回值的执行任务,返回值是一个表示任务执行结果的Future对象,这个Future对象的get方法将在任务执行成功之后返回任务结果且返回到result入参上。
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个有返回值的执行任务,返回值是一个表示任务执行结果的Future对象,这个Future对象的get方法将在任务执行成功之后返回任务结果且返回null。
*/
Future<?> submit(Runnable task);
/**
* 执行入参中给的任务,当所有任务执行完毕之后,返回持有这些任务状态和结果的Future集合
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
/**
* 执行入参中给的任务,当所有任务执行完毕之后或者超时时,返回持有这些任务状态和结果的Future集合
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
/**
* 执行入参中给的任务,返回成功执行任务的结果集中的一个
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
/**
* 执行入参中给的任务,返回成功执行任务的结果集中的一个,有执行的超时时间
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ThreadPoolExecutor:其构造函数的各个参数说明如下:
/**
* @corePoolSize:核心线程数,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的
* @maximumPoolSize:最大线程数,可允许创建的线程数
* 注:corePoolSize和maximumPoolSize设置的边界自动调整池大小:corePoolSize <运行的线程数< maximumPoolSize:仅当队列满时才创建新线程
corePoolSize=运行的线程数= maximumPoolSize:创建固定大小的线程池
* @keepAliveTime:如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止
* @unit:keepAliveTime参数的时间单位
* @workQueue:保存线程任务的阻塞队列,与线程池的大小有关
* 当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列
* 当运行的线程数等于或多于corePoolSize,在有新任务添加时则先加入队列,不直接创建线程
* 当workQueue满时,再有新任务时就创建新线程
* @threadFactory(非必须) :使用ThreadFactory创建新线程,默认使用defaultThreadFactory创建线程
* @handler(非必须):定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 属性赋值。。。
}
/**
* ThreadPoolExecutor的核心构造器的参数详解:
* corePoolSize 核心线程池大小
* maximumPoolSize 最大线程池大小
* keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
* TimeUnit keepAliveTime时间单位
* workQueue 阻塞任务队列
* threadFactory 新建线程工厂
* RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
*/
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
return thread;
}
});
Executors:提供了一系列静态工厂方法用于创建各种线程池
/**
* 创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;
* 如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
// 使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
new LinkedBlockingQueue<Runnable>());
}
/**
* 创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
// corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程,就重用可用线程,否则就新建一条线程
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
// 使用同步队列,将任务直接提交给线程
new SynchronousQueue<Runnable>());
}
Executor的生命周期
ExecutorService提供了管理Eecutor生命周期的方法,ExecutorService的生命周期包括了:运行、 关闭和终止三种状态。
- ExecutorService在初始化创建时处于运行状态。
- shutdown()方法等待提交的任务执行完成并不再接受新任务,在完成全部提交的任务后关闭
- shutdownNow()方法将强制终止所有运行中的任务并不再允许提交新任务