结构
概述
它通过线程池来执行许多任务。通常使用Executors中的工厂方法来配置并创建它的实例。
线程池主要为可解决两种问题:
- 解决大量异步任务时的性能问题。
- 管理资源,如线程的数量,任务的数量。
设置
ThreadPoolExecutor有很多参数要设置,通常我们需要Excutors的工厂方法来配置并创建其实例。
- newCachedThreadPool()
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
- newFixedThreadPool(int)
建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。(我用的就是这个,同上所述,相当于创建了相同corePoolSize、maximumPoolSize的线程池)
- newSingleThreadExecutor()
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
针对该类的常用配置:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
参数名 | 说明 |
---|---|
corePoolSize | 线程池维护线程的最少数量 |
maximumPoolSize | 线程池维护线程的最大数量 |
keepAliveTime | 线程池维护线程所允许的空闲时间 |
workQueue | 任务队列,用来存放我们所定义的任务处理线程 |
threadFactory | 线程创建工厂 |
handler | 线程池对拒绝任务的处理策略 |
1. corePoolSize 和 maximumPoolSize:
ThreadPoolExecutor将根据corePoolSize和maximumPoolSize设置的边界自动调整池大小。
当新的任务被提交进来,如果当前线程数量小于corePoolSize,一个新的线程将被创建来处理这个任务。
当新的任务被提交进来,如果当前的线程数大于corePoolSize,小于maximumPoolSize,任务将优先放入到队列中,直到队列满了,并且没有空闲线程时才创建新的线程。
如果你设置了corePoolSize和maximumPoolSize相等,你将得到一个固定的大小的线程池。
如果你设置maximumPoolSize为无穷大,你的线程池将处理任意数量的并发任务。
通常情况下,corePoolSize和maximumPoolSize在构造的时候被指定,但你也可以动态的设置:setCorePoolSize(int) 和 setMaximumPoolSize(int)
2. 依据需求构造
默认情况下,核心的线程已经新的任务到来而创建。你可以提前创建这些核心线程,通过如下方法:
prestartCoreThread()
prestartAllCoreThreads()
3. 创建新的线程
- 新的线程由ThreadFactory来创建,新创建的线程他们属于同一个ThreadGroup,拥有同样的优先级:NORM_PRIORITY。
- 你可以提供一个自己的ThreadFactory,通过它你可以修改线程的名字,优先级等等。
- 若当前线程池中的线程数量大于核心的线程数,大于的线程可以生存一段时间(keepAliveTime),过了这段时间后它将被杀死,资源会被回收。
4. 存活时间
当线程数达到maximumPoolSize时,经过某段时间,发现多出的线程出于空闲状态,就进行线程的回收。keepAliveTime就是线程池内最大的空闲时间。
工作队列
当核心线程都在处理任务时,新进任务被放在Queue里。
线程池中任务有三种排队策略:
- 直接提交。
直接提交策略表示队列不对任务进行缓存。新进任务直接提交给线程池,当线程池中没有空闲线程时,创建一个新的线程处理此任务。这种策略需要线程池具有无限增长的可能性。实现为:SynchronousQueue
- 有界队列。
当线程池中线程达到corePoolSize时,新进任务被放在队列里排队等待处理。有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
- 无界队列。
使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
拒绝任务
当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。
RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。
- CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
- AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
- DiscardPolicy:不能执行的任务将被删除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
- DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
终止
当一个线程池不再被程序所引用,并且没有剩余的线程,它将会被自动的关闭。
若你想确保未被引用的线程池被回收(即使用户忘记关闭),那你必须安排未使用的线程最终死去,通过设置其合适的存活时间,或者将核心线程数设置小于0,或者设置allowCoreThreadTimeOut(boolean)。
举个例子
将task放到线程池中执行。
1. 工厂方式创建
Task:
public class Task implements Runnable {
private int index;
public void setIndex(int index) {
this.index = index;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " run ------->" + index + " ...");
try {
Thread.currentThread().sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finish " + Thread.currentThread().getName());
}
}
执行器:
public static void main(String[] args) {
//创建固定大小线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
Task task = new Task();
task.setIndex(i);
executor.execute(task);
}
System.out.println("--------------------> gg main <-------------------");
}
执行结果:
D:\android-studio\jre\bin\java -Didea.launcher.port=7540 -Didea.launcher.bin.path=D:\android-studio\bin -Dfile.encoding=UTF-8 -classpath D:\android-studio\jre\jre\lib\charsets.jar;D:\android-studio\jre\jre\lib\ext\access-bridge-64.jar;D:\android-studio\jre\jre\lib\ext\cldrdata.jar;D:\android-studio\jre\jre\lib\ext\dnsns.jar;D:\android-studio\jre\jre\lib\ext\jaccess.jar;D:\android-studio\jre\jre\lib\ext\localedata.jar;D:\android-studio\jre\jre\lib\ext\nashorn.jar;D:\android-studio\jre\jre\lib\ext\sunec.jar;D:\android-studio\jre\jre\lib\ext\sunjce_provider.jar;D:\android-studio\jre\jre\lib\ext\sunmscapi.jar;D:\android-studio\jre\jre\lib\ext\sunpkcs11.jar;D:\android-studio\jre\jre\lib\ext\zipfs.jar;D:\android-studio\jre\jre\lib\jce.jar;D:\android-studio\jre\jre\lib\jsse.jar;D:\android-studio\jre\jre\lib\management-agent.jar;D:\android-studio\jre\jre\lib\resources.jar;D:\android-studio\jre\jre\lib\rt.jar;D:\Android_MVP\android-architecture-todo-mvp\todoapp\threadpolltest\build\classes\main;D:\android-studio\lib\idea_rt.jar com.intellij.rt.execution.application.AppMain com.example.MyClass
pool-1-thread-1 run ------->0 ...
pool-1-thread-2 run ------->1 ...
pool-1-thread-3 run ------->2 ...
--------------------> gg main <-------------------
finish pool-1-thread-1
pool-1-thread-1 run ------->3 ...
finish pool-1-thread-3
finish pool-1-thread-2
pool-1-thread-3 run ------->4 ...
pool-1-thread-2 run ------->5 ...
finish pool-1-thread-1
pool-1-thread-1 run ------->6 ...
finish pool-1-thread-2
finish pool-1-thread-3
pool-1-thread-2 run ------->7 ...
pool-1-thread-3 run ------->8 ...
finish pool-1-thread-1
pool-1-thread-1 run ------->9 ...
finish pool-1-thread-2
finish pool-1-thread-3
finish pool-1-thread-1
2.自定义方式创建(针对队列策略)
抛弃策略
代码:
结果:
调用线程策略
代码:
结果:
抛弃最老策略
代码:
结果:
忽略策略
代码:
结果: