一. 简介
1. Executor框架的两级调度模型
- 应用程序通过Executor框架控制上层的调度;
- 下层的调度由操作系统内核控制;
Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。
2. Executor框架的结构
任务。实现Runnable接口或Callable接口的任务;
任务的执行。核心接口Executor、继承Executor接口的ExecutorService接口、实现了ExecutorService接口的两个关键类ThreadPoolExecutor、ScheduledThreadPoolExecutor;
异步计算的结果。Future接口、实现Future接口的FutureTask类。
- 主线程创建任务(实现Rnnable/Callable接口)
- 把任务提交给线程池执行
- 异步计算的结果(如果有的话)保存在Future中,主线程通过get()方法等待任务执行完成并获取结果,也可以通过FutureTask.cancel()取消此任务的执行。
Executor框架的成员
- ThreadPoolExecutor
ThreadPoolExecutor通常使用工厂类Executors来创建:
- Executors.newFixedThreadPool:固定线程数
- Executors.newSingleThreadExecutor:单线程数
- Executors.newCachedThreadPool:0 - Integer.MAX_VALUE个线程数
- ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor通常使用工厂类Executors来创建:
- Executors.newScheduledThreadPool:固定线程数
- Executors.newSingleThreadScheduledExecutor:单线程数
- Future接口
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。
- Runnable接口和Callable接口
- Runnable接口不会返回结果
- Callable接口可以返回结果
二. ThreadPoolExecutor详解
1. FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool使用LinkedBlockingQueue作为任务队列(队列容量为Integer.MAX_VALUE,可被看成是无界队列),使用无界队列对线程池的影响:
- 由于线程池中线程数达到corePoolSize后,新任务会放在无界队列中,因此maxPoolSize无效,同时keepAliveTime无效;
- 运行中的FixedThreadPool不会拒绝任务。
2. SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
类似于FixedThreadPool,但是corePoolSize和maxPoolSize均被设置为1。
3. CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被设置为0,maxPoolSize被设置为Integer.MAX_VALUE,因此线程池的线程数可看成是无界的。
CachedThreadPool的任务队列使用SynchronousQueue,该队列没有容量,不存储任何任务。
- 如果当前maxPool中有空闲的线程正在执行SynchronousQueue.poll()操作(这个poll操作会让该空闲线程保持60s,如果超时则销毁该线程),同时主线程提交了任务,即SynchronousQueue.offer(Runnable task),此时两者配对成功,任务被提交给该空闲线程执行;
- 如果maxPool为空,或者maxPool中没有空闲线程,则没有线程执行SynchronousQueue.poll()操作,此时CachedThreadPool创建一个新线程执行任务。
注:
SynchronousQueue是一个没有容量的阻塞队列,插入操作和移除操作必须配对成功才会执行,插入的内容被移除操作直接从队列中取走。
三. ScheduledThreadPoolExecutor详解
主要用来在给定的延迟之后运行任务,或者定期执行任务。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor使用DelayedWorkQueue作为任务队列
待调度的任务被封装成ScheduledFutureTask,主要包含如下成员变量:
- long型成员变量time,表示这个任务将要被执行的具体时间;
- long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号;
- long型成员变量period,表示任务执行的间隔周期。
time小的任务被排在前面,如果time相同,sequenceNumber小的排在前面。
向ScheduledThreadPoolExecutor提交任务时,会在DelayedWorkQueue中添加ScheduledFutureTask
线程池中的线程从DelayedWorkQueue中获取满足要求的ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor中执行周期任务
- 线程从DelayedWorkQueue中获取已到期的ScheduledFutureTask(time大于等于当前时间)
- 执行这个ScheduledFutureTask
- 修改ScheduledFutureTask的time变量为下次将要被执行的时间
- 线程将这个修改time后的ScheduledFutureTask放回DelayedWorkQueue中。
四. FutureTask详解
FutureTask实现了Future接口,可以用来表示异步计算的结果。
FutureTask实现了Runnable接口,可提交给线程执行。
FutureTask的三种状态:未启动、已启动、已完成。
FutureTask的实现:
基于AbstractQueuedSynchronizer(AQS),类似于ReentrantLock,利用AQS来原子性管理同步状态、阻塞和唤醒线程、维护被阻塞线程的队列。
Future接口
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
线程实现Callable<V>接口,并使用submit()方法提交到线程池,可利用Future<V>(超时)获取执行结果。
这里的超时时间是每个任务的超时时间。
对于超时的任务,可以利用Future<V>取消该任务,而不影响其它任务的正常执行。
@SpringBootApplication
public class TestApplication {
static AtomicInteger atomicInteger = new AtomicInteger(0);
static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(TestApplication.class, args);
for (int i = 0; i < 100; i++) {
CallableTask task = new CallableTask(atomicInteger);
Future<String> future = executorService.submit(task);
try {
String result = future.get(5, TimeUnit.SECONDS);
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
// 在超时异常中判断任务是否执行完成,未完成则取消该任务
if(!future.isDone()){
future.cancel(true);
}
}
}
}
}
public class CallableTask implements Callable<String> {
AtomicInteger atomicInteger;
public CallableTask() {
super();
}
public CallableTask(AtomicInteger atomicInteger) {
super();
this.atomicInteger = atomicInteger;
}
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
int result = atomicInteger.addAndGet(1);
return "result: " + result;
}
}