Java 线程池
Java 线程池模型的关键几个类和接口包括:Executor,Executors,ExecutorService,ThreadPoolExecutor,Future,Callable
ExecutorService 接口
所有的线程池实现类都继承于ExecutorService 接口,包括ThreadPoolExecutor。在使用线程池的时候,其实也是通过这个接口动态绑定各个线程池实例进行调用,常用的接口方法包括:
void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);
void shutdown();
boolean awaitTermination(long timeout, TimeUnit unit)
ThreadPoolExecutor
这个类是各种线程池类的爸爸或干爹,所有的常见的线程池类都是从他派生出来。怎么派生的呢?我们来看下他的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
这里我们只看这个五个参数的构造方法(还有个6参数方法):
corePoolSize 核心线程数量,包含空闲线程
maximumPoolSize 最大线程数量
keepAliveTime 空闲的线程存活时间
unit 时间单位
workQueue 线程缓存队列,仅保存 execute方法提交的线程
详细的含义直接摘抄别人的总结:根据 ThreadPoolExecutor 源码前面大段的注释,我们可以看出,当试图通过 excute 方法讲一个 Runnable 任务添加到线程池中时,按照如下顺序来处理:
如果线程池中的线程数量少于 corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;
如果线程池中的线程数量大于等于 corePoolSize,但缓冲队列 workQueue 未满,则将新添加的任务放到 workQueue 中,按照 FIFO 的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);
如果线程池中的线程数量大于等于 corePoolSize,且缓冲队列 workQueue 已满,但线程池中的线程数量小于 maximumPoolSize,则会创建新的线程来处理被添加的任务;
如果线程池中的线程数量等于了 maximumPoolSize,有 4 种才处理方式(该构造方法调用了含有 6 个参数的构造方法,并将最后一个构造方法为 RejectedExecutionHandler 类型,它在处理线程溢出时有 4 种方式,这里不再细说,要了解的,自己可以阅读下源码)。
总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于 corePoolSize,再看缓冲队列 workQueue 是否满,最后看线程池中的线程数量是否大于 maximumPoolSize。
Executors
Executors是一个工厂类,里边给我们提供了几个线程池的工厂方法,例如:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
.........以下略掉N个方法.........
Executors.newCachedThreadPool
立马看下这个方法的调用的是哪个构造函数:
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
可以看到,核心线程数为0,空闲线程存活时长是60s,就是说一旦某个线程空闲下来超过60秒,他就会从池中拿掉。另外SynchronousQueue 是一个不缓存元素的队列,一旦有Runnable 通过execute提交进来,他立即传递给线程池去寻找空闲线程去执行,如果没有空闲线程,则新建一个。
像下边这段测试代码,因为每隔21秒才提交一个sleep20 秒的runnable,所以,线程池中的唯一个线程不断地被得到复用。
public class ExecutorTest {
private static ExecutorService service;
static class SleepTask extends Thread {
@Override
public void run() {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " sleep 20 s ");
}
}
public static void main(String[] args) throws InterruptedException {
service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.execute(new SleepTask());
Thread.sleep(21000);
}
// 这里要使得主线程等待线程池结束
service.shutdown();
while (!service.awaitTermination(10, TimeUnit.SECONDS)) ;
}
}
输出:
pool-1-thread-1 sleep 20 s
pool-1-thread-1 sleep 20 s
pool-1-thread-1 sleep 20 s
pool-1-thread-1 sleep 20 s
pool-1-thread-1 sleep 20 s
pool-1-thread-1 sleep 20 s
(以下略)
Executors.newFixedThreadPool
先看调用的构造方法:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
构造方法参数是,corePoolSize == maximumPoolSize == nThreads,线程空闲时长为0s,队列为无界(Integer.MAX)的先进先出阻塞队列。
这个意思就是,线程不会被复用,execute提交一个runnable,就新建一个线程去执行他,线程用完就销毁。当线程数量大于 corePoolSize ,就放到队列里缓存起来,知道有空闲的位置让出来。
Executors.newSingleThreadExecutor
线程池空间为1,没有空闲时间。这个很简单,略过。跟newFixedThreadPool(1)类似。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Executors.newScheduledThreadPool
这个线程池的使用姿势跟上边略有不同,先看构造方法调用:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//最终调用了这个方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
可以看到,corePoolSize可以指定,maximumPoolSize 无穷大,空闲时间为0,队列使用的支持延时获取元素的无界阻塞队列。在调用的时候,需要使用ScheduledExecutorService 这个接口来动态绑定,才能发挥真正的作用。
比如 :
scheduleAtFixedRate 这个方法,允许command 第一次延迟 initialDelay个时间单位才开始执行,第二次延迟initialDelay+period 个时间单位才开始,第三次延迟到initialDelay+period*2 个时间单位才开始。每次增加period 个时间单位。但不允许并发,前一个执行没结束,后继的执行会被推迟。见范例。测试结果表明,每隔5s 左右才会执行一次。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
scheduleWithFixedDelay 这个方法,允许command第一次延迟 initialDelay个时间单位才开始执行,后继的每一次都要在前一次结束后,延迟period个时间单位才开始执行。将范例里的注释行去掉,测试结果符合该描述。
范例:
public class ExecutorTest {
private static ScheduledExecutorService service;
static class SleepTask extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start ");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " finished ");
}
}
public static void main(String[] args) throws InterruptedException {
service = Executors.newScheduledThreadPool(5);
service.scheduleAtFixedRate(new SleepTask(), 1, 1, TimeUnit.SECONDS);
// service.scheduleWithFixedDelay(new SleepTask(),1,1,TimeUnit.SECONDS);
}
}
}
Callable && Future
上面都是在讨论用线程池执行Runnable对象,还有另外一个实现了 Callable 接口的对象。实现了 Callable 接口的对象会有返回值,但是Runnable对象返回值为null。并且 Callable 的 call()方法只能通过 ExecutorService 的 submit(Callable task) 方法来执行,并且返回一个 Future,是表示任务等待完成的 Future。
当将一个 Callable 的对象传递给 ExecutorService 的 submit 方法,则该 call 方法自动在一个线程上执行,并且会返回执行结果 Future 对象。同样,将 Runnable 的对象传递给 ExecutorService 的 submit 方法,则该 run 方法自动在一个线程上执行,并且会返回执行结果 Future 对象,但是在该 Future 对象上调用 get 方法,将返回 null。
上范例:
*/
public class ExecutorTest {
private static ExecutorService service;
static class SleepTask implements Callable<String> {
@Override
public String call() throws Exception {
return Thread.currentThread().getName()+" is finished.";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
service = Executors.newCachedThreadPool();
Future<String> future = service.submit(new SleepTask());
System.out.println(future.get());
}
}