其实手动创建是更好的。因为这样一来可以让我们非常明确线程池的运行规则,避免资源耗尽的风险。
我们就来看一看,如果我们自动创建的话会有哪些风险?这同时也是让我们熟悉一下 JDK 提供给我们的那些非常典型的线程池。
FixedThreadPool
首先来用代码展示一下 newFixedThreadPool。在这里我们新建一个 FixedThreadPoolDemo
类。在该程序中演示如何利用 FixedThreadPool 线程池执行任务。具体代码如下:
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService=Executors.newFixedThreadPool(5);
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
class Task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
在 main 函数中,我们直接新建 executors 这个辅助工具类去新建线程池,这里面会传入一个参数,这个参数是我们想让存在的线程数。也就是说我们用了这种线程池的话,也就是固定的线程池。固定的值为几都可以,这里我们将值设置为 5,这样子我们的线程池就创建出来了。在这种情况下,往线程池里面提交任务。
接着我们来新建一个任务类,任务类里面很简单,我们先休眠 500 毫秒,然后去打印出我们当前现成的名字。
在 main 函数中我们就来用 for 循环去执行任务。假设执行 1000 次,在里面传入我们的任务。
然后来执行我们的程序。执行的结果如下:
可以看出,执行出来的线程名字始终都是 1、2、3、4、5不会超过,这是因为执行这 1000 个任务的线程,它们是被规定好的,就是 5。所以即便任务来得特别多,也不会创建新的线程去执行他。
那这是为什么呢?我们直接看到源码,就可以看出这个原因了。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool 内部实际还是调用了 ThreadPoolExecutor 构造函数。可以看出它本质还是 ThreadPoolExecutor。但是里面的这些参数3现在来分析一下,第一个参数是 nThread,这个 n 就是我们传进来的参数,所以实际上这个参数原本是 corePoolSize,它的核心数量就被设置成为 n。假设我们上面的程序中的参数是设置为 5,那么传进来的参数就是 5。第二个参数是最大的线程数量,在源码中可以看到设置的和核心数是一样的,所以永远不可能膨胀超过 5。
第三个参数是存活时间,默认设置为 0。这是因为既然不可能膨胀到超过核心线程数量,所以这个参数实际上是没有意义的,因为也不会有线程被回收。第四个参数是时间的一个单位,这里是毫秒,那这个和刚才的这个参数是绑定的,都是用来表示超时时间的。还有一个参数是队列,源码中使用的的队列是非常典型的 LinkedBlockingQueue 队列,而这个 LinkedBlockingQueue先前面的课时中介绍过,是一个无界队列,所以有再多的任务进来,都可以放到LinkedBlockingQueue 中去执行。
我们在来简单的总结,通过往构造函数中传参,创建了一个核心线程数和最大线程数相等的线程池,它们的数量也就是我们传入的参数,这里的重点是使用的队列是容量没有上限的 LinkedBlockingQueue,如果我们对任务的处理速度比较慢,那么随着请求的增多,队列中堆积的任务也会越来越多,最终大量堆积的任务会占用大量内存,并发生 OOM 异常,也就是 OutOfMemoryError,这几乎会影响到整个程序,会造成很严重的后果。
所以现在我们就来演示一下 FixedThreadPool 错误的实际发生的情况。具体代码如下:
public class FixedThreadPoolOOMDemo {
private static ExecutorService executorService= Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executorService.execute(new TaskFixedThreadPoolOOM());
}
}
}
class TaskFixedThreadPoolOOM implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
那我们新建一个 FixedThreadPoolOOMDemo 类,该类主要去演示内存不够用了。虽然看上去设计的其实蛮好的,就这么多线程,然后,进来任务放到队列中,可是如果队列里面的任务越来越多,确实也是一个很大的问题。
在这里写一个线程池,该线程池固定线程的数量,假设就一个线程。希望处理的越慢越好,因为我们希望队列里面任务越来越多来达到变成内存过多的这种效果。
然后在主函数中,是要往里面放特别多的任务,到整型的最大值,即为 Integer.MAX_VALUE,这样足够多了,然后去执行任务。
执行哪一个任务呢?这个任务也是需要有一定特点的。不是什么任务提交过来都能顺利的触发这个内存过多,要根据我们的目的去设计一下任务。这个任务特点其实是非常明显的,就是只让我们的任务线程休眠的一段时间,并且这个休眠时间设置特别长,这个任务的特点就是一直在处于休眠状态。这样的任务为什么会设计成这样的,就是因为我们不想让这个任务被执行完毕,因为被执行完毕了,那我们自然这个任务就结束了,进入到下一个任务。
我们的目标就把这个队列给塞满,所以希望这里面的任务一个都不能结束,自然随着时间的增加,队列里面的任务就越来越多。所以在这里,就用 for 循环去不停的往队列里面塞任务,同时我们在执行的时候,还要稍微做一下调整。
要想让这样默认的配置去做,如果不做调整,要想达到这个溢出需要的时间太长了。我们是演示,所以把我们的内存设置的稍微小一点。在 IDEA 中设置我们 JVM 内存的大小,通过 -Xmx8m 和 -Xmx8m 这两个参数设置。这个也不影响,只要说我们能看到他有内存溢出的情况发生,就说明实际上是很危险的,现在就可以执行我们的程序。运行的结果如下图所示:
执行的结果报出的错误是 OutOfMemoryError。这是因为不停的往队列中添加任务,最后导致占用大量的内存,发生了 OutOfMemoryError 异常。所以这就演示了我们用这种 newFixedThreadPool 所带来的一种可能出错的情况,这就意味着我们用这种固定的数量的线程池,还是需要额外注意的。
SingleThreadExecutor
下面我们探讨第二种线程池是 SingleThreadExecutor。
首先我们通过代码演示,在这里呢我们还是新建一个 SingleThreadExecutorDome 类。具体代码如下所示:
public class SingleTreadExecutorDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 1000; i++) {
executorService.execute(new TaskSingleTreadExecutor());
}
}
}
class TaskSingleTreadExecutor implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
这种线程池是什么特点呢?从名字就可以看出来了,相对而言是比较简单的,就是一个单独的线程,也就是这个线程池里面只有一个线程。通过 newSingleThreadExecutor() 方法创建线程池,在 newSingleThreadExecutor 方法中都不需要传参数,因为它的线程数就是 1,所以也不需要做什么指定。同时使用 for 循环去执行一些任务。程序运行的结果如下图所示:
控制台输出的结果都是线程的名字,可以看出非常明显了,每一个执行出来都是我们的线程池里面的第一个线程,没有其他的线程执行了,所以执行的速度也不会特别快。这种线程池就是这样的特点,它只有一个线程。
为什么会这样呢?实际上我们来分析一下它创建的源码。具体代码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
通过看源码我们就一目了然了。你可以看出,newSingleThreadExecutor 和 newFixedThreadPool 的原理是一样的,传入的参数核心数量是 1,最大线程数量也是 1。
既然核心数量和最大数量一样,所以时间也可以忽略不计,最后也传入了一个无线容量的队列,即 LinkedBlockingQueue。所以可以看出,这个线程池和我们前面探讨固定数量的线程池是基本上很类似的。区别在于传入的核心线程数和最大线程数不同。
这种线程池它的原理和我们刚才是很类似的,只不过把线程数量设置为 1,但是这也会导致同样的问题,也就是当请求堆积的时候可能会占用大量的内存,并且发生 OOM 异常。
CachedThreadPool
接着我们来探讨第三种线程池,即 CachedThreadPool,这种线程池的特点是可以缓存,看上去挺厉害的。什么叫可以缓存呢?它是一个无界的线程池,而且还可以自动回收多余的线程。
我们用图来对这种线程池进行表述。如下图所示:
我们来看一下这张图就说明了它的原理。首先用的是 Synchronous queue,这个 queue 前面介绍过,我们看一下它有什么特点,它是直接交换的一个队列。直接交换的队列内部的容量是 0,所以不能把任务放在队列中,所以它就没有一个队列用来存储。任务提交进来之后,直接就提交给线程去执行了,并且这个线程池所设定的最大的线程数量是整型的最大值,可以说是没有上限。
所以无论多少个任务提交进来,都会创建多少线程来帮助执行,并且前面介绍可缓存的,这就是在一定时间之后,会把多余的那些线程给回收回来。因为这个特点就是有任务来了,就创建线程,过一会儿任务结束之后,肯定会有多余的线程,所以在多余的时间过后,会把它给回收回来,默认的时间呢是 60 秒。
我们就来看一下这种线程如何实现的。具体实现方式如下代码所示:
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i <1000 ; i++) {
executorService.execute(new TaskCachedThreadPool());
}
}
}
class TaskCachedThreadPool implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
执行程序的结果如下:
主要看的是线程编号,可以看出和前面介绍情况是大大不同。刚才那个固定数量的或者说是一个线程的线程池,他们就是 1 或者是 5,但是这里直接创建了好几百个线程来执行任务,过一会儿,这些线程发现没有任务可以执行了,也会把它回收掉,这就是这种线程池的特点。
这种线程池也是有问题的,问题在于哪里呢?我们来看一下它的源码。具体代码如下所示:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以发现创建线程的时候,核心线程数量为 0,最大线程数量为 Integer.MAX_VALUE,也就意味着几乎没有一个最大的数量。同样道理,既然线程几乎是无限创建的,如果创建的线程数量特别多,我任务数量特别多的话,最终超过了操作系统的上限而无法创建新线程,或者导致内存不足,也有可能就会导致 OOM 异常。
ScheduleThreadPool
我们来看一下这种线程池是什么特点。这种线程池是支持定时以及周期性的去执行任务。具体实现代码如下:
public class ScheduleThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(10);
scheduledExecutorService.schedule(new TaskScheduleThreadPool(), 5, TimeUnit.SECONDS);
}
}
class TaskScheduleThreadPool implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
执行的结果运行:
这种线程池实现的有几种用法,下面我们逐一探讨。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
创建并执行在给定延迟后启用的 ScheduledFuture,callable: 要执行的函数 ,delay:从现在开始延迟执行的时间,unit:延迟参数的时间单位。
public abstract ScheduledFuture<?> schedule (Runnable command,
long delay,
TimeUnit unit)
创建并执行在给定延迟后启用的一次性操作,command: 要执行的任务,delay: 从现在开始延迟执行的时间,unit:延迟参数的时间单位。
public abstract ScheduledFuture<?> scheduleAtFixedRate (Runnable command,
long initialDelay,
long period,
TimeUnit unit)
创建并执行一个周期性动作,在给定的初始延迟后首先启用,然后在给定的时间段内启用,command:要执行的任务,initialDelay:延迟第一次执行的时间,period:连续执行之间的时间间隔,unit:initialDelay 和 period 参数的时间单位。
public abstract ScheduledFuture<?> scheduleWithFixedDelay (Runnable command,
long initialDelay,
long delay,
TimeUnit unit)
创建并执行一个周期性动作,该动作首先在给定的初始延迟后启用,随后在一个执行终止和下一个执行开始之间具有给定的延迟。command:要执行的任务,initialDelay:延迟第一次执行的时间,delay:一个执行的终止和下一个执行的开始之间的延迟,unit:nitialDelay 和 delay 参数的时间单位。
为什么不应该手动创建线程池
正确的创建线程的方法不应该是自动创建了,因为这些线程池都是提前被设计好的,在设计的时候和我们的业务肯定不完全契合,我们最好还是要根据我们的业务场景去设置线程池的参数。
比如内存有多大,如何定义线程的名字,我们想给线程池里面的线程定义自己的线程的名字,就要传入自己的线程工厂,还比如任务被拒绝的时候该如何记录日志,这一系列的内容都和我们的业务是相关的。我们的并发量有多大,可能会决定我们的线程数量需要多少,这些我们最好经过调研之后,结合业务去确定的线程池。这里面的参数都是我们的业务定制的,所以这样会非常适合于我们的业务场景。
适合的线程数量是多少,CPU 核心数和线程数的关系
下面我们一起来探讨适合的线程数量是多少,CPU 核心数和线程数的关系。前面我们说自己手动设置线程确实会好一些。但是我们该如何确定这个参数?
线程池里面其实比较重要的参数是 corePoolSize 和 maxPoolSize,这两个参数决定了我们线程的数量。
根据我个人多年的经验给出建议,根据我们的任务不同有不同的规则。
第一个,假设任务是 CPU 密集型的,CPU 密集型指的就是大量的计算,比如加密、解密、计算哈希、压缩,这些都是非常典型的应用场景,CPU 会不停的工作,CPU 相当于是满负荷的,这个时候应当把线程数量设置为 CPU 核心数的 1 ~ 2 倍,如果设置过多的线程数,实际上并不会起到很好的效果,。假如 CPU 是 8 核,最好把它设置成 8 ~ 16 之间的一个数字,这样的线程的数量就足够了,因为我们的线程已经是在满负荷工作了,我们的线程由于大量的都是 CPU 计算,所以它会每个线程分配到一个 CPU ,然后就去不停地执行,而我们设置过多的线程,每个个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数过多导致性能下降。
针对于这种情况,我们最好还要同时考虑在同一台机器上还有其他哪些会占用过多 CPU 资源的程序在运行,然后对 CPU 资源使用做一个整体的平衡。
第二个,耗时 IO 型,比如读写数据库、读写文件、网络通信等,这种就不一样,这又是为什么呢?因为一旦要涉及到网络,这个时候 CPU 通常是不工作的,或者说涉及到读写文件的时候,外设的速度比我们 CPU 的速度要慢,所以 CPU 很有可能就会休息。这种情况下,可以把线程的数量设置为大于 CPU 核心数的很多倍。比如是 8 核CPU,可以设置成 10 倍,就相当于有 80 个线程来执行任务,其实这也是客观的,这是为什么呢?因为虽然看上去有 80 个线程,但是实际上多线程都是在等待,要么等待文件,要么等待网络,所以实际上这样才能充分地利用 CPU。
所以根据我们的业务不同会有不同的设计规则。我们的任务是什么样的,要去根据任务类型设置线程池的线程数量。
《Java 并发编程实战》的作者 Brain Goetz 推荐的计算方法:
设置线程数的公式:线程数= CPU 核心数 *(1+平均等待时间/平均工作时间)。
通过这个公式,我们可计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对我们上面的 CPU 密集型任务,线程数就随之减少。
等待时间指的是线程大部分时间是等待,那很显然就是读取数据库,也就是读取数据库的时间越长,假设等待的时间是 100S,平均工作时间越短,比如 1S,也就是等待 100S 数据库去读取数据,然后执行 1S 的计算任务。假设是这样比例的话,最终这个值算出来 101 * CPU 核心数,就是这样的一个计算方法。
根据任务的不同,线程数实际上是在变化的,所以在这样的一个标准之下。如果还想做到更加精准的话,实际上最精准的应该是根据不同的程序去做压测,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量,然后就可以得到一个相对比较合适的线程数量,这样能够合理并充分的利用资源。如果粗略地估计一下,就可以用上面的公式去估计。