new Thread 的弊端:
- 每次new Thread新建对象性能差
- 线程缺乏统一管理,可能无限制新建线程、相互之间竞争,可能占用过多的系统资源导致死机活着OOM
- 缺乏更多的功能,如定时执行、定期执行、线程中断
线程池的好处:
- 重用存在的线程、减少对象的创建、消亡的开销、性能佳
- 可有控制最大并发线程数、提高系统资源的利用率,同时避免过多的资源竞争、避免堵塞
- 提供定时、定期、单线程、并发数控制等功能
Java 线程池种类
JDK 为我们内置了4种常见线程池的实现,均可以使用 Executors 工厂类创建。
- CachedThreadPool 用于并发执行大量短期的小任务,或者是负载较轻的服务器。
- FixedThreadPool 用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程数量。
- SingleThreadExecutor 用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。
- ScheduledThreadPoolExecutor 用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。
- 自定义线程池时,
- 如果任务是 CPU 密集型(需要进行大量计算、处理),则应该配置尽量少的线程,比如 CPU 个数 + 1,这样可以避免出现每个线程都需要使用很长时间但是有太多线程争抢资源的情况;
- 如果任务是 IO密集型(主要时间都在 I/O,CPU 空闲时间比较多),则应该配置多一些线程,比如 CPU 数的两倍,这样可以更高地压榨 CPU。
为了错误避免创建过多线程导致系统奔溃,建议使用有界队列。因为它在无法添加更多任务时会拒绝任务,这样可以提前预警,避免影响整个系统。
执行时间、顺序有要求的话可以选择优先级队列,同时也要保证低优先级的任务有机会被执行。
创建线程池需要使用ThreadPoolExecutor类
public ThreadPoolExecutor(int corePoolSize, //核心线程的数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间的单位
BlockingQueue<Runnable> workQueue, //保存待执行任务的队列
ThreadFactory threadFactory, //创建新线程使用的工厂
RejectedExecutionHandler handler // 当任务无法执行时的处理器
) {...}
其核心的方法execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.当前池中线程比核心数少,新建一个线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.核心池已满,但任务队列未满,添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果这时被关闭了,拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0)
//如果之前的线程已被销毁完,新建一个线程
addWorker(null, false);
}
//3.核心池已满,队列已满,试着创建一个新线程
else if (!addWorker(command, false))
reject(command);
//如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
}
以下线程池的主要工作流程:
由于 1 和 3 新建线程时需要获取全局锁,这将严重影响性能。因此 ThreadPoolExecutor 这样的处理流程是为了在执行 execute() 方法时尽量少地执行 1 和 3,多执行 2。
在 ThreadPoolExecutor 完成预热后(当前线程数不少于核心线程数),几乎所有的 execute() 都是在执行步骤 2。
前面提到的 ThreadPoolExecutor 构造函数的参数,分别影响以下内容:
- corePoolSize:核心线程池数量
在线程数少于核心数量时,有新任务进来就新建一个线程,即使有的线程没事干
等超出核心数量后,就不会新建线程了,空闲的线程就得去任务队列里取任务执行了 - maximumPoolSize:最大线程数量
包括核心线程池数量 + 核心以外的数量
如果任务队列满了,并且池中线程数小于最大线程数,会再创建新的线程执行任务 - keepAliveTime:核心池以外的线程存活时间,即没有任务的外包的存活时间
如果给线程池设置 allowCoreThreadTimeOut(true),则核心线程在空闲时头上也会响起死亡的倒计时
如果任务是多而容易执行的,可以调大这个参数,那样线程就可以在存活的时间里有更大可能接受新任务 - workQueue:保存待执行任务的阻塞队列
不同的任务类型有不同的选择,下一小节介绍 - threadFactory:每个线程创建的地方
可以给线程起个好听的名字,设置个优先级啥的 - handler:饱和策略,大家都很忙,咋办呢,有四种策略
- CallerRunsPolicy:只要线程池没关闭,就直接用调用者所在线程来运行任务
- AbortPolicy:直接抛出 RejectedExecutionException 异常
- DiscardPolicy:悄悄把任务放生,不做了
- DiscardOldestPolicy:把队列里待最久的那个任务扔了,然后再调用execute() 试试看能行不
- 我们也可以实现自己的 RejectedExecutionHandler 接口自定义策略,比如如记录日志什么的
保存待执行任务的阻塞队列
当线程池中的核心线程数已满时,任务就要保存到队列中了。
线程池中使用的队列是 BlockingQueue 接口,常用的实现有如下几种:
- ArrayBlockingQueue:基于数组、有界,按FIFO(先进先出)原则对元素进行排序
- LinkedBlockingQueue:基于链表,按FIFO (先进先出) 排序元素
- 吞吐量通常要高于 ArrayBlockingQueue
- Executors.newFixedThreadPool() 使用了这个队列
- SynchronousQueue:不存储元素的阻塞队列
- 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
- 吞吐量通常要高于 LinkedBlockingQueue
- Executors.newCachedThreadPool使用了这个队列
- PriorityBlockingQueue:具有优先级的、无限阻塞队列
自定义线程池
public class ThreadPoolManager {
private final String TAG = this.getClass().getSimpleName();
private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
// 核心线程数为 CPU数*2
private static final int MAXIMUM_POOL_SIZE = 64;
// 线程队列最大线程数
private static final int KEEP_ALIVE_TIME = 1;
// 保持存活时间 1秒
/*
** 如果是要求高吞吐量的,可以使用 SynchronousQueue 队列;如果对执行顺序有要求,可以使用 PriorityBlockingQueue;如果最大积攒的待做任务有上限,可以使用 LinkedBlockingQueue
*/
private final BlockingQueue<Runnable> mWorkQueue = new LinkedBlockingQueue<>(128);
private final ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, TAG + " #" + mCount.getAndIncrement());
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
private ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME,
TimeUnit.SECONDS, mWorkQueue, DEFAULT_THREAD_FACTORY,
new ThreadPoolExecutor.DiscardOldestPolicy());
private static volatile ThreadPoolManager mInstance = new ThreadPoolManager();
public static ThreadPoolManager getInstance() {
return mInstance;
}
public void addTask(Runnable runnable) {
mExecutor.execute(runnable);
}
@Deprecated
public void shutdownNow() {
mExecutor.shutdownNow();
}
}
两种提交任务的方法
ExecutorService 提供了两种提交任务的方法:
- execute():提交不需要返回值的任务
execute() 的参数是一个 Runnable,也没有返回值。因此提交后无法判断该任务是否被线程池执行成功。 - submit():提交需要返回值的任务
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
submit() 有三种重载,参数可以是 Callable 也可以是 Runnable。
同时它会返回一个 Funture 对象,通过它我们可以判断任务是否执行成功。
获得执行结果调用 Future.get()方法,这个方法会阻塞当前线程直到任务完成。
提交一个 Callable 任务时,需要使用 FutureTask 包一层
FutureTask futureTask = new FutureTask(new Callable<String>() { //创建 Callable 任务
@Override
public String call() throws Exception {
String result = "";
//do something
return result;
}
});
Future<?> submit = executor.submit(futureTask); //提交到线程池
try {
Object result = submit.get(); //获取结果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
关闭线程池
线程池即使不执行任务也会占用一些资源,所以在我们要退出任务时最好关闭线程池。
有两个方法关闭线程池:
- shutdown()
将线程池的状态设置为 SHUTDOWN,然后中断所有没有正在执行的线程 - shutdownNow()
将线程池设置为 STOP,然后尝试停止所有线程,并返回等待执行任务的列表
它们的共同点是:都是通过遍历线程池中的工作线程,逐个调用 Thread.interrup()来中断线程,所以一些无法响应中断的任务可能永远无法停止(比如 Runnable)