0. 为什么需要线程池
构造一个新的线程开销有点大,虽然线程与进程相比,是比较轻量的,但是线程的创建和关闭仍然需要花费时间,因为这涉及与操作系统的交互。而且,当线程数量过大时,会耗尽CPU和内存资源。
1. 什么是线程池
线程池是为了避免频繁地创建和销毁线程,对线程进行复用的。
就像数据库连接池维护数据库连接一样。
因此, 在使用线程池以后, 创建线程变成了从线程池获得空闲线程,关闭线程变成了向线程池归还线程。
2. 线程池的核心类
在java.util.concurrent包中, 有线程池管理的核心类。
ThreadPoolExecutor
表示一个线程池。Executors(执行器)
Executors是创建线程池的工厂类,扮演着线程池工厂的角色。
通过Executors可以取得一个拥有特定功能的线程池。
Executors的方法:
方法 | 描述 |
---|---|
newCachedThreadPool() | 返回一个可根据实际情况调整线程数量的线程池(无界线程池) |
newFixedThreadPool(int nThreads) | 返回一个固定线程数量的线程池(有界线程池) |
newSingleThreadExecutor() | 返回一个只有一个线程的线程池 |
newScheduledThreadPool() | 用于调度执行的固定线程池 |
newWorkStealingPool() | 一种适合"fork-join"任务的线程池,其中复杂的任务会分解成更简单的任务,空闲线程会"密取"较简单的任务 |
- 无界线程池
示例代码:
public class CachedThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
System.out.println("Runnable1 begin "
+ System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("A");
System.out.println("Runnable1 end "
+ System.currentTimeMillis());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
executorService.execute(() -> {
try {
System.out.println("Runnable2 begin "
+ System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("B");
System.out.println("Runnable2 end "
+ System.currentTimeMillis());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
}
运行结果:
Runnable1 begin 1594986808102
Runnable2 begin 1594986808103
A
B
Runnable2 end 1594986809122
Runnable1 end 1594986809122
- 有界线程池
示例代码:
public class NewFixedThreadPoolDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
int threadNum = 5;
int actualThreadNum = 10;
ExecutorService es = Executors.newFixedThreadPool(threadNum);
for (int i = 0; i < actualThreadNum; i++) {
es.submit(task);
}
}
}
运行结果:
1591878949865:Thread ID:16
1591878949865:Thread ID:15
1591878949865:Thread ID:13
1591878949865:Thread ID:17
1591878949865:Thread ID:14
1591878950869:Thread ID:15
1591878950869:Thread ID:14
1591878950869:Thread ID:13
1591878950869:Thread ID:17
1591878950869:Thread ID:16
从上面结果可以看出,线程的ID分了两批,而且前后时间也是分两批。
这正好就证明了,我们创建了一个5个线程的线程池,而总共10个线程,除以2就是两批。
3. 计划任务
如果我们需要使用线程池来周期性地执行某个任务,我们需要使用newSchedulesThreadPool()方法。
它返回一个ScheduledExecutorService,这个service并不会立刻安排执行任务,实际上它是计划任务,会在指定的时间,对任务进行调度。
下面的代码每隔2秒打印当前时间
ScheduledExecutorServiceDemo
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
int threadSize = 10;
ScheduledExecutorService ses = Executors.newScheduledThreadPool(threadSize);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 2, TimeUnit.SECONDS);
}
}
运行结果:
1591882377
1591882379
1591882381
1591882383
1591882385
1591882387
每隔2秒执行
3. 核心线程池的内部实现
事实上,newCachedThreadPool, newFixedThreadPool,newSingleThreadExecutor这3个方法都是调用的ThreadPoolExecutor的构造函数。
ThreadPoolExecutor构造函数:
public ThreadPoolExecutor(
int corePoolSize, // 线程池中线程数量
int maxiumPoolSize, // 线程池中最大线程数量
keepAliveTime, // 当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间
unit, // keepAliveTime的单位
workQueue, // 任务队列,被提交但尚未被执行的任务
threadFactory, // 线程工厂,用于创建线程,一般用默认的即可
handler // 拒绝策略,当任务太多来不及处理,如何拒绝任务
);
三种线程池的实现方式:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
}
public static ExecutorService newSingleThreadExecutor(int nThreads) {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
)
);
}
public static ExecutorService newCachedThreadPool(int nThreads) {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
}
newCachedThreadPool方法返回corePoolSize为0,maximumPoolSize无穷大的线程池。
这意味着在没有任务时,该线程池内无线程;而当任务被提交时,该线程池会使用空闲的线程执行任务;
若无空闲线程,则将任务加入SynchronousQueue队列。
4. ThreadPoolExecutor参数解释
corePoolSize
指定了线程池中的线程数量maximumPoolSize
指定了线程池中的最大线程数量keepAliveTime
当线程池线程数量超过corePoolSize时, 多余的空闲线程的存活时间unit
keepAliveTime的单位(eg: TimeUnit.SECONDS)workQueue
任务队列,被提交但尚未被执行的任务threadFactory
线程工厂,用于创建线程,一般用默认的即可handler
拒绝策略,当任务太多来不及处理,如何拒绝任务