JAVA中的线程池实现
在JDK1.5之后,JAVA增加了线程池的实现.这里简要描述一下相关的两个类Executors
以及ThreadPoolExecutor
,注意: 该部分内容是结合了互联网上的相关知识.
Java Executors
Java里面线程池的顶级接口是Executors
,但是严格意义上讲Executors
并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
比较重要的几个类:
类名 | 描述 |
---|---|
ExecutorService | 真正的线程池接口 |
ScheduledExecutorService | 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题 |
ThreadPoolExecutor | ExecutorService的默认实现 |
ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现 |
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。
这四种线程池,分别为:
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
示例代码如下:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// 注意当前创建根据不同的需要,我们后面针对不同的线程池实现做修改
ExecutorService pool = Executors.newCachedThreadPool();
// 创建线程
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 关闭线程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 正在执行。。。");
}
}
newCachedThreadPool
执行结果如下
pool-1-thread-1 正在执行。。。
pool-1-thread-2 正在执行。。。
pool-1-thread-2 正在执行。。。
pool-1-thread-3 正在执行。。。
pool-1-thread-1 正在执行。。。
Process finished with exit code 0
newFixedThreadPool
- 修改对应代码
ExecutorService pool = Executors.newFixedThreadPool(5);
此时对应的结果如下
pool-1-thread-2 正在执行。。。
pool-1-thread-1 正在执行。。。
pool-1-thread-3 正在执行。。。
pool-1-thread-4 正在执行。。。
pool-1-thread-5 正在执行。。。
Process finished with exit code 0
- 修改对应代码
ExecutorService pool = Executors.newFixedThreadPool(2);
此时执行的结果如下
pool-1-thread-1 正在执行。。。
pool-1-thread-2 正在执行。。。
pool-1-thread-1 正在执行。。。
pool-1-thread-1 正在执行。。。
pool-1-thread-2 正在执行。。。
Process finished with exit code 0
newSingleThreadExecutor
修改代码为
ExecutorService pool = Executors.newSingleThreadExecutor();
执行结果
pool-1-thread-1 正在执行。。。
pool-1-thread-1 正在执行。。。
pool-1-thread-1 正在执行。。。
pool-1-thread-1 正在执行。。。
pool-1-thread-1 正在执行。。。
Process finished with exit code 0
newScheduledThreadPool
修改代码为
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
// 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
// 将线程放入池中进行执行
pool.execute(t1);
// 使用延迟执行风格的方法
pool.schedule(t2, 1000, TimeUnit.MILLISECONDS);
pool.schedule(t3, 10, TimeUnit.MILLISECONDS);
// 关闭线程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 正在执行。。。");
}
}
执行结果
pool-1-thread-1 正在执行。。。
pool-1-thread-2 正在执行。。。
pool-1-thread-1 正在执行。。。
Process finished with exit code 0
无论以上创建那种线程池 必须要调用ThreadPoolExecutor
,以下附上对应的静态方法实现代码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 该类随未直接new ThreadPoolExecutor类,但ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,new ScheduledThreadPoolExecutor实际上调用的即为ThreadPoolExecutor的构造器
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
但阿里的JAVA开发手册不建议直接使用Executors
的四种静态方法
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
那么我们来看一下ThreadPoolExecutor
的实现
ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor
,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)
corePoolSize
: 线程池维护线程的最少数量
maximumPoolSize
:线程池维护线程的最大数量
keepAliveTime
: 线程池维护线程所允许的空闲时间
unit
: 线程池维护线程所允许的空闲时间的单位
workQueue
: 线程池所使用的缓冲队列
handler
: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)
方法被添加到线程池,任务就是一个 Runnable
类型的对象,任务的执行方法就是 Runnable
类型对象的run()
方法。
当一个任务通过execute(Runnable)
方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize
,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize
,但是缓冲队列 workQueue
未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize
,缓冲队列workQueue
满,并且线程池中的数量小于maximumPoolSize
,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize
,缓冲队列workQueue
满,并且线程池中的数量等于maximumPoolSize
,那么通过 handler
所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize
时,如果某线程空闲时间超过keepAliveTime
,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit
可选的参数为java.util.concurrent.TimeUnit
中的几个静态属性:
NANOSECONDS
、MICROSECONDS
、MILLISECONDS
、SECONDS
。
workQueue
常用的是:java.util.concurrent.ArrayBlockingQueue
Java并发包中的阻塞队列一共7个,当然他们都是线程安全的。
名称 | 描述 |
---|---|
ArrayBlockingQueue | 一个由数组结构组成的有界阻塞队列 |
LinkedBlockingQueue | 一个由链表结构组成的有界阻塞队列 |
PriorityBlockingQueue | 一个支持优先级排序的无界阻塞队列 |
DealyQueue | 一个使用优先级队列实现的无界阻塞队列 |
SynchronousQueue | 一个不存储元素的阻塞队列 |
LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列 |
LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列 |
handler
有四个选择:
名称 | 描述 |
---|---|
ThreadPoolExecutor.AbortPolicy() | 丢弃任务并抛出RejectedExecutionException异常。 (默认) |
ThreadPoolExecutor.CallerRunsPolicy() | 重试添加当前的任务,他会自动重复调用execute()方法 |
ThreadPoolExecutor.DiscardOldestPolicy() | 抛弃旧的任务,重新尝试执行 |
ThreadPoolExecutor.DiscardPolicy() | 丢弃当前任务,但是不抛出异常。 |
个人Blog:https://yannischen.cn