在java中我们想在新线程中执行一个任务很简单,有以下两种方式:
// 1. 将任务放在Runnable的run方法中
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 执行任务
}
});
thread.start();
// 2. 将任务放在Thread的run方法中
Thread thread = new Thread() {
@Override
public void run() {
// 执行任务
}
};
thread.start();
如果现在有多个任务需要在另一线程中执行,那么我们就要为每个任务创建一个线程,当任务执行完成后,会消耗线程,但是这样频繁创建和销毁线程非常耗费时间,浪费资源。
那么有没有更好的方式来解决这个问题呢?
- 准备一个任务队列,所有要执行的任务都添加到这个任务队列中。
- 创建几个新线程,它们不断从任务队列中获取任务,然后执行,完成任务后并不释放线程,继续从任务队列中获取任务。如果当前任务队列中没有任务,那么就线程就等待,直到任务队列中添加了任务。
这个实现方式就是线程池。对于线程池来说,它内部维持一个线程集合和任务队列。在分析线程池之前,我们先来分析线程池的顶层接口,看看它们了那些操作线程池的方法。
一. Executor接口
Executor是线程池的顶层接口,包含线程池最重要的方法,那么线程池最重要的方法是什么呢?肯定是执行任务的方法。
// Executor执行器接口。
public interface Executor {
// 执行任务
void execute(Runnable command);
}
二. ExecutorService接口
Executor接口只是定义了线程池最重要的执行任务的方法,而ExecutorService接口就定义了线程池常用方法。
package java.util.concurrent;
import java.util.Collection;
import java.util.List;
public interface ExecutorService extends Executor {
/**
* 终止线程池。
* 不能再向线程池中添加新任务了,但是已经添加到任务队列的任务还是会执行。
* 也不会对正在执行任务的线程发起中断请求
*/
void shutdown();
/**
* 终止线程池。
* 不能再向线程池中添加新任务了,也不会执行已经添加到任务队列的任务, 但是会返回未执行的任务集合。
* 而且对所有工作线程都发起中断请求, 不管这个工作线程是否正在执行任务
* @return 返回未执行的任务集合
*/
List<Runnable> shutdownNow();
/**
* 返回true,表示线程池不是处于运行状态,即不能添加新任务了。
* 但是线程池中可能还有任务还没有执行完毕。
* 表示调用了shutdown或者shutdownNow方法。
* @return
*/
boolean isShutdown();
/**
* 返回true,表示线程池已经完全终止了,
* 即线程池中所有的工作线程都释放了,任务队列也为空了。
* 注意isTerminated返回true,那么isShutdown方法一定也是返回true
* @return
*/
boolean isTerminated();
/**
* 让调用值线程等待线程池完全终止。
* 如果线程池还有线程在执行,那么调用值线程机会阻塞等待,
* 直到线程池完全终止,或者等待时间超时,就会唤醒调用值线程。
*
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一个Callable类型的任务,会返回一个Future类型的对象。
* @param task
* @param <T>
* @return
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个Runnable类型的任务,会返回一个Future类型的对象。
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个Runnable类型的任务,会返回一个Future类型的对象。
*/
Future<?> submit(Runnable task);
/**
* 执行一系列任务tasks,等待任务都执行完毕,返回任务结果值的集合
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 执行一系列任务tasks,等待任务都执行完毕,返回任务结果值的集合。规定了超时时间
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 执行一系列任务tasks,如果有一个任务执行完毕,那么就返回这个任务的结果值,并取消其他正在执行的任务。
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 执行一系列任务tasks,如果有一个任务执行完毕,那么就返回这个任务的结果值,并取消其他正在执行的任务。规定了超时时间
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
这些方法分为3种:
- 终止线程池相关方法:shutdown()、shutdownNow()、isShutdown()、isTerminated()和awaitTermination方法。
- 执行任务,并返回结果值相关方法:submit(Callable<T> task)、submit(Runnable task, T result)和submit(Runnable task)方法。
- 执行一系列任务相关方法:invokeAll与invokeAny方法。
invokeAll与invokeAny方法的区别:都是针对一系列的任务tasks,invokeAll方法表示等待这些任务都执行完毕,返回结果值的集合。invokeAny方法表示当有一个任务执行完毕,就返回结果值,并取消其他任务。
2.1 终止线程池方法
线程池不可能立即就终止,因为涉及到线程池正在执行任务的线程和任务队列中等待执行的任务该如何处理,有两个方式:
- shutdown方法:不能再向线程池中添加新任务了,但是已经添加到任务队列的任务还是会执行,也不会对正在执行任务的线程发起中断请求。等待任务队列任务执行完成,释放线程池中所有线程,线程池进入完全终止状态。
- shutdownNow方法:不能再向线程池中添加新任务了,也不会执行已经添加到任务队列的任务,但是会返回未执行的任务集合。而且对所有工作线程都发起中断请求, 不管这个工作线程是否正在执行任务。等待线程池中所有线程释放,线程池进入完全终止状态。
2.2 返回任务结果的方法
我们知道线程池执行任务,是调用execute方法。那么ExecutorService接口中的submit系列方法或者invoke系列方法,都是通过execute方法来实现的,不过它可以返回任务的结果值。
因此在AbstractExecutorService中,就实现了submit系列方法或者invoke系列方法。
三. AbstractExecutorService抽样类
线程池执行任务是通过execute方法,而execute方法的参数是Runnable,所以正常情况下,等不到任务的结果值。
想要得到结果值,我们想到了上一章讲解的Future与Callable对象,Callable代表一个任务,当任务运行完成后返回结果值,它一般是Future对象实例中的一个成员变量,所以是通过Future对象来获取任务的结果值。
但是execute方法的参数是Runnable,我们想到了FutureTask类,它实现了Runnable和Future接口,内部有个Callable的成员变量。
3.1 submit方法
/**
* 提交一个任务,返回Future对象,通过它的get方法获取结果值
*/
public <T> Future<T> submit(Callable<T> task) {
// 如果task为null,抛出异常
if (task == null) throw new NullPointerException();
// 通过task对象创建一个RunnableFuture对象
RunnableFuture<T> ftask = newTaskFor(task);
// 执行任务
execute(ftask);
// 返回Future对象,通过它的get方法获取结果值
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// 通过callable对象,创建一个FutureTask对象。
return new FutureTask<T>(callable);
}
通过callable对象创建一个FutureTask对象,调用execute方法执行任务,然后返回FutureTask对象,通过它的get方法获取任务的结果值,如果任务没有执行完成,那么就等待。
3.2 invokeAll方法
运行一系列任务tasks,等待所有任务都执行完毕,返回结果值的集合。
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 创建结果值Future集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 遍历任务集合,创建RunnableFuture对象,调用execute方法执行任务
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 遍历结果值Future集合
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
// f.isDone()返回false,表示任务没有完成。
// 所以当任务没有完成的时候,调用f.get()方法,等待着任务完成,之后再循环,
// 直到所有的任务都已经完成了。
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
// 返回任务都已经完成的结果值Future集合
return futures;
} finally {
// 如果出现未知异常,导致任务没有都完成,那么遍历futures集合,取消任务。
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
要想等待所有任务完成,并返回结果值的集合。
- 首先遍历任务集合,创建RunnableFuture对象,调用execute方法执行任务。
- 因为我们是并发执行每个任务的,那么所有任务都执行完成的时间,就是耗时最长的那个任务执行执行完成的时间。
那么怎么找到这个耗时最长的任务呢?Future中有两个方法,isDone()方法表示任务是否完成,get()方法当任务没有完成,会让当前线程一直等待到任务完成。所以当一个任务的get()方法返回时,比它耗时少的任务都已经完成了即isDone()方法返回true,比它耗时多的任务还没有完成,会继续调用get()方法。所以遍历集合就可以保存所有任务都完成了。- 返回结果值集合。
- 如果出现未知异常,导致任务没有都完成,那么遍历futures集合,取消任务。
3.3 invokeAny方法
运行一系列任务tasks,有一个任务执行完毕,就返回这个任务的结果值,并取消其他任务。
这个方法比invokeAll方法麻烦多了,因为get()方法会等待任务执行完毕返回,那么耗时多的任务调用get()方法等待时间就越长,所以遍历所有任务,就可以找到耗时最长的任务。但是对于寻找耗时最短的任务,这种方式就不适用了。
那么怎么寻找耗时最短的任务呢?
思路可以是这样:首先执行所有任务,然后当前线程等待,当有一个任务完成了,就唤醒当前线程,返回这个任务结果值,并取消其他任务。
这个方式我们想到了阻塞队列BlockingQueue。创建一个阻塞队列,调用它的take获取队列中的值,因为队列为空,那么线程就会等待,知道有一个任务完成,将结果值插入到阻塞队列中,那么就会唤醒当前线程,获取这个结果值。
关于阻塞队列BlockingQueue,请看这篇文章BlockingQueue详细分析
public interface CompletionService<V> {
// 提交执行一个任务,返回任务结果值Future
Future<V> submit(Callable<V> task);
// 提交执行一个任务,返回任务结果值Future
Future<V> submit(Runnable task, V result);
// 获取一个已完成的任务,如果没有已完成的任务,那么就会等待
Future<V> take() throws InterruptedException;
// 获取一个已完成的任务,如果没有已完成的任务,那么返回null
Future<V> poll();
// 获取一个已完成的任务,如果没有已完成的任务,那么就会等待。
// 设置了超时时间,如果超过时间仍没有任务完成,那么返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
public class ExecutorCompletionService<V> implements CompletionService<V> {
// ExecutorCompletionService本身没有执行任务的方法,
// 通过executor来实现线程池的功能,执行任务。
private final Executor executor;
// 用来调用它的创建新任务的方法newTaskFor
private final AbstractExecutorService aes;
// 表示已完成任务的阻塞队列
private final BlockingQueue<Future<V>> completionQueue;
/**
* 继承FutureTask,主要复写done()方法,
* 当任务已完成时,将任务task添加到阻塞队列completionQueue中去。
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 当任务已完成时,将任务添加到阻塞队列中
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
// 创建新任务
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
// 创建新任务
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
// 设置执行器
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
// 默认使用LinkedBlockingQueue的阻塞队列
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
// 设置执行器
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
// 设置阻塞队列
this.completionQueue = completionQueue;
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
// 创建新任务
RunnableFuture<V> f = newTaskFor(task);
// 执行任务,通过QueueingFuture包裹一下,
// 就可以将已完成的任务添加到已完成队列中
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
// 创建新任务
RunnableFuture<V> f = newTaskFor(task, result);
// 执行任务,通过QueueingFuture包裹一下,
// 就可以将已完成的任务添加到已完成队列中
executor.execute(new QueueingFuture(f));
return f;
}
// 获取一个已完成的任务,如果没有已完成的任务,那么就会等待
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
// 获取一个已完成的任务,如果没有已完成的任务,那么返回null
public Future<V> poll() {
return completionQueue.poll();
}
// 获取一个已完成的任务,如果没有已完成的任务,那么就会等待。
// 设置了超时时间,如果超过时间仍没有任务完成,那么返回null
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}
所以在并发框架下,提供了一个ExecutorCompletionService类,它内部有个已完成任务的阻塞队列completionQueue,通过它执行的任务,任务完成后都会插入completionQueue这个阻塞队列中,还提供take和poll方法来获取已完成任务。
注意有三点:
- ExecutorCompletionService没有实现Executor接口,那么它本身没有执行任务的方法,这里是通过组合的方法,它内部有个Executor实例,通过这个Executor实例来执行任务。
- 如何知道任务已完成了呢?在上一章的FutureTask类介绍中,它有个钩子方法done(),当任务完成时,会调用这个方法,所以我们要继承FutureTask类,复写这个方法。
- 它是通过阻塞队列来实现take和poll系列方法的。
下面来看invokeAny方法具体实现:
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
// 任务的个数
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
// 创建结果值Future集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
ExecutionException ee = null;
// 截止时间deadline
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
// 获取一个已完成的任务,如果f==null表示还没有任何任务完成
Future<T> f = ecs.poll();
if (f == null) {
// ntasks大于0表示还有任务没有执行。
if (ntasks > 0) {
// 未执行任务
--ntasks;
// 执行任务,并将它添加到futures集合中,以便取消未完成任务。
futures.add(ecs.submit(it.next()));
// 正在执行的任务
++active;
}
// 一般不可能执行到active == 0的
else if (active == 0)
break;
// 如果任务都添加添加执行后,还没有一个任务完成,那么就让当前线程等到了
else if (timed) {
// 设置超时时间来返回一个已经完成的任务
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
// f == null表示是超时返回的,那么抛出TimeoutException异常
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
// 会返回一个已经完成的任务
f = ecs.take();
}
if (f != null) {
--active;
try {
// 返回任务结果值
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 那么遍历futures集合,取消未完成的任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
借助ExecutorCompletionService来获取首个已完成的任务。
四. 重要示例
11.1 正常运行线程池
package com.zhang._22._5;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"开始运行 任务"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 发生中断异常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"结束 任务"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定数量的线程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 单个线程的线程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 缓存线程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < 6; i++) {
Future<Integer> future = service.submit(new Run(i), i);
list.add(future);
}
for(Future<Integer> future: list) {
try {
System.out.println("=================主线程获取任务结果值 "+future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("主线程运行结束");
}
}
运行结果:
--线程1开始运行 任务1
--线程2开始运行 任务2
--线程3开始运行 任务3
=======线程1结束 任务1
--线程1开始运行 任务4
=================主线程获取任务结果值 0
=======线程2结束 任务2
--线程2开始运行 任务5
=================主线程获取任务结果值 1
=======线程3结束 任务3
--线程3开始运行 任务6
=================主线程获取任务结果值 2
=======线程1结束 任务4
=================主线程获取任务结果值 3
=======线程2结束 任务5
=================主线程获取任务结果值 4
=======线程3结束 任务6
=================主线程获取任务结果值 5
主线程运行结束
这里使用的是固定数量的线程池,所以只有三个线程来执行任务,未执行到的任务只能等待。
future.get()方法只有当对应的任务完成之后,才会返回。否则它会一直阻塞着线程。
运行完任务后,你会发现程序没有结束,那是因为线程池没有被终止。
11.2 终止线程池
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"开始运行 任务"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 发生中断异常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"结束 任务"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定数量的线程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 单个线程的线程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 缓存线程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
// 还是会执行完已经添加的任务
service.shutdown();
}
}
运行结果:
--线程1开始运行 任务1
--线程3开始运行 任务3
--线程2开始运行 任务2
=======线程1结束 任务1
--线程1开始运行 任务4
=======线程2结束 任务2
--线程2开始运行 任务5
=======线程3结束 任务3
--线程3开始运行 任务6
=======线程1结束 任务4
=======线程2结束 任务5
=======线程3结束 任务6
Process finished with exit code 0
使用shutdown方法,还是会执行完已经添加的任务。最后程序退出。
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"开始运行 任务"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 发生中断异常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"结束 任务"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定数量的线程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 单个线程的线程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 缓存线程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
service.shutdownNow();
}
}
运行结果:
--线程1开始运行 任务1
--线程2开始运行 任务2
--线程3开始运行 任务3
线程2 发生中断异常 exception==sleep interrupted
线程1 发生中断异常 exception==sleep interrupted
=======线程1结束 任务1
=======线程2结束 任务2
线程3 发生中断异常 exception==sleep interrupted
=======线程3结束 任务3
Process finished with exit code 0
使用shutdownNow方法,在任务队列中等待的任务是不会执行的,而且立即发起线程中断请求。