ThreadPoolExecutor(线程池)
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
具体参数及其含义
int corePoolSize
核心线程数量,核心线程存在两种状态,与参数设置有关。
存活状态:allowCoreThreadTimeOut 默认设置为false,此设置下,核心线程一直处于存活状态,即使核心线程无任务操作,处于空闲状态。
终止状态: allowCoreThreadTimeOut 设置为true时,超过keepAliveTime设定时常后,核心线程终止,被回收。int maximumPoolSize
线程池所允许存在的最大线程数量,超过最大线程数后,后续的新任务会被阻塞。long keepAliveTime
当线程处于空闲状态后,允许存活的时长,超过此时长后,线程会被终止回收。根据allowCoreThreadTimeOut参数不同,限定的线程不同。
限定非核心线程:当allowCoreThreadTimeOut参数值为默认false时,只对非核心线程有限制。
限定核心线程与非核心线程:当allowCoreThreadTimeOut参数值为true时,对线程池中的核心与非核心都由限制,超过限定时长后,线程都会终止并回收。TimeUnit unit
keepAliveTime的事件单位,TimeUnit类提供枚举,有纳秒、微秒、毫秒等,具体如下。
TimeUnit.NANOSECONDS:纳秒
TimeUnit.MICROSECONDS:微秒
TimeUnit.MILLISECONDS:毫秒
TimeUnit.SECONDS:秒
TimeUnit.MINUTES:分
TimeUnit.HOURS:小时
TimeUnit.DAYS:天
BlockingQueue<Runnable> workQueue
线程池任务队列,阻塞队列,主要用来存储已经提交但尚未分配给线程执行的任务(线程池execute方法提交的Runnable对象)。ThreadFactory threadFactory
线程工厂,interface类型,只有一个方法Thread newThread(Runnable r); 用来为线程池创建新线程。RejectedExecutionHandler handler
interface类型,只有一个方法void rejectedExecution(Runnable r, ThreadPoolExecutor executor),线程池无法执行新任务时,ThreadPoolExecutor调用handler的该方法通知调用者。
调用策略
- 如果线程池中线程的数量小于核心线程数量,新任务来之后,开启新的核心线程去执行任务。
- 如果线程池中核心线程数量已经饱和,新任务来之后,则放到workQueue任务队列,代核心线程执行完任务后,从任务队列中取出待处理任务,继续处理。
- 如果线程池中核心线程数量已经饱和,workQueue任务队列中任务数也达到最大值,再来新任务,则开启一个非核心线程,进行处理任务。之后非核心线程和核心线程一同处理workQueue任务队列中任务。
- 如果线程池中核心线程数量饱和,workQueue任务队列中任务数也达到最大值,非核心线程也达到最大值,当再来到新任务后,那么线程池就调用RejectedExecutionHandler的rejectedExecution方法拒绝该任务。
- 特殊情况,如果线程池中无核心线程,并且任务数不大于任务队列限制,则线程池只开启一个非核心线程进行处理。
调用策略示例验证
为了更好的理解调用策略,采用具体的示例,更清晰明了的进行理解。
package com.bamboolmc.threadpro;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
private static int taskSize = 2;
private static int corePoolSize = 3;
private static int maximumPoolSize = 4;
private static int queueSize = 3;
public static class TestTask implements Runnable{
@Override
public void run() {
if (taskSize>0){
try {
//模拟任务处理
Thread.sleep(500);
System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()
+" 完成一个任务,编号为t" + (taskSize--));
}catch (Exception e){
e.printStackTrace();
}
}
}
}
public static void main(String args[]) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
executor.execute(task);
}
executor.shutdown();
}
}
对比调用策略1
private static int taskSize = 2;
private static int corePoolSize = 3;
private static int maximumPoolSize = 4;
private static int queueSize = 3;
任务数量2,核心线程数量3
运行结果:
1527001137241pool-1-thread-2 完成一个任务,编号为t1
1527001137241pool-1-thread-1 完成一个任务,编号为t2
对比调用策略2
修改如下
private static int taskSize = 5;
private static int corePoolSize = 3;
private static int maximumPoolSize = 4;
private static int queueSize = 3;
任务数量5,核心线程数量3,任务队列3
运行结果:
1527001567366pool-1-thread-2 完成一个任务,编号为t4
1527001567366pool-1-thread-1 完成一个任务,编号为t5
1527001567366pool-1-thread-3 完成一个任务,编号为t3
1527001567869pool-1-thread-1 完成一个任务,编号为t1
1527001567869pool-1-thread-2 完成一个任务,编号为t2
对比调用策略3
修改如下
private static int taskSize = 7;
private static int corePoolSize = 3;
private static int maximumPoolSize = 4;
private static int queueSize = 3;
任务数量7, 核心线程数量3,任务队列3,非核心线程1
运行结果:
1527002003014pool-1-thread-3 完成一个任务,编号为t6
1527002003014pool-1-thread-2 完成一个任务,编号为t4
1527002003014pool-1-thread-1 完成一个任务,编号为t7
1527002003014pool-1-thread-4 完成一个任务,编号为t5
1527002003516pool-1-thread-3 完成一个任务,编号为t3
1527002003516pool-1-thread-1 完成一个任务,编号为t2
1527002003516pool-1-thread-2 完成一个任务,编号为t1
对比调用策略4
修改如下
private static int taskSize = 8;
private static int corePoolSize = 3;
private static int maximumPoolSize = 4;
private static int queueSize = 3;
任务数量8, 核心线程数量3,任务队列3,非核心线程1
运行结果:
第8个任务运行时,线程池拒绝,抛出异常
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.bamboolmc.threadpro.ThreadPool$TestTask@14ae5a5 rejected from java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.bamboolmc.threadpro.ThreadPool.main(ThreadPool.java:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
1527002190243pool-1-thread-3 完成一个任务,编号为t8
1527002190243pool-1-thread-1 完成一个任务,编号为t6
1527002190243pool-1-thread-2 完成一个任务,编号为t5
1527002190243pool-1-thread-4 完成一个任务,编号为t7
1527002190749pool-1-thread-2 完成一个任务,编号为t3
1527002190749pool-1-thread-1 完成一个任务,编号为t3
1527002190749pool-1-thread-3 完成一个任务,编号为t4
对比调用策略5
修改如下
private static int taskSize = 3;
private static int corePoolSize = 0;
private static int maximumPoolSize = 4;
private static int queueSize = 3;
任务数量3, 核心线程数量0,任务队列3,非核心线程4
运行结果:
1527002486575pool-1-thread-1 完成一个任务,编号为t3
1527002487078pool-1-thread-1 完成一个任务,编号为t2
1527002487579pool-1-thread-1 完成一个任务,编号为t1
线程池分类
根据构建线程池时传入的参数不同,我们在Android系统中常见有4种不同功能特性的线程池。
- CachedThreadPool
- FixedThreadPool
- SingleThreadExecutor
- ScheduledThreadPool
我们可以用Executors.newXXX的方式去实例化我们需要的线程池,如实例化一个CachedThreadPool
ExecutorService executorService = Executors.newCachedThreadPool();
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数:0
最大线程数:2的31次方-1
超时时长:60秒
任务队列:SynchronousQueue
由此可见,CachedThreadPool只有非核心线程,并且非核心线程最大数量可以任意大,当前已建线程都在执行任务时,再来新的任务会开启新的线程来执行,闲置线程超过60秒,就会被收回。由于使用了SynchronousQueue队列,所有提交的任务都会被立即执行。这类线程池比较适合大量的耗时较少的任务。
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数:固定(新建线程池时提供的参数)
最大线程数:同核心线程数
超时时长:0
任务队列:LinkedBlockingQueue
由此可见,FixedThreadPool只有固定个数的核心线程数,无非核心线程,并且核心线程无超时限制,除非线程池关闭,否则线程即使处于空闲状态,也不会被回收,因此能够更快的响应外界请求。另外任务队列容量没有大小限制。当所有线程均有任务执行时,新任务会在任务队列排队,线程执行完,取出任务队列中任务执行。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
核心线程数:1
最大线程数:1
超时时长:0
任务队列:LinkedBlockingQueue
由此可见,SingleThreadExecutor只有1个核心线程,所有任务都会在一个线程中按顺序执行;任务队列容量没有大小限制;另外如果某些错误而导致线程终止,则会创建新的线程继续执行后续的任务(与newFixedThreadPool(1)的不同,无法创建新的线程继续执行后续任务);由于是在一个线程执行任务,所以无需处理线程同步问题了。
ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
核心线程数:固定
最大线程数:2的31次方-1
超时时长:10毫秒
任务队列:DelayedWorkQueue
由此可见,ScheduledThreadPool核心线程数固定,非核心线程数无限大,非核心线程超时时长10毫秒。主要用于执行定时任务和具有固定周期的任务。
使用方法异同
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new TestTask());
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new TestTask());
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new TestTask());
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.schedule(new TestTask(),2000,TimeUnit.MILLISECONDS);
任务队列BlockingQueue
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满时,自动唤醒被阻塞线程继续执行插入操作。
- 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会阻塞,等待队列变为非空时,自动唤醒。
四种类型线程池分析时,发现其中的任务队列参数分别使用到SynchronousQueue、LinkedBlockingQueue、DelayedWorkQueue这三种不同的任务队列。除此之外,系统还提供了PriorityBlockingQueue、ArrayBlockingQueue,DelayQueue,这些类均实现自BlockingQueue接口。
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
BlockingQueue作为一个接口,提供了添加、删除元素方法。
添加方法
- add: 添加元素到队列,成功返回true;队列容量满了,再添加元素时,会抛出IllegalStateException异常。
- offer:添加元素到队列,成功返回true,失败返回false
- put:添加指定元素到队列,如果队列容量满了,则会阻塞到队列有空间。
删除方法
- take:删除队列头部元素,如果队列为空,则一直阻塞到队列有元素为止。返回头部元素。
- poll:删除队列头部元素,在指定时间内,如果队列不为空,则删除头部元素,并返回头部元素。在制定时间内队列仍为空,则返回null。
- remove:删除队列内指定的元素,如果删除成功则返回true,如果找不到该指定元素,抛出NullPointerException。
其他方法
- remainingCapacity:返回队列能够添加元素的数量(阻塞或无限制除外)。
- contains:队列是否包含某指定元素。
- drainTo:从队列中移除(所有或指定数量的)可用元素,并将它们添加到给定集合中。
SynchronousQueue
没有元素存储空间的阻塞队列,如果在插入元素时后续没有执行取出的操作,那么插入的行为就会被阻塞。构造方法如下
public SynchronousQueue()
public SynchronousQueue(boolean fair)
LinkedBlockingQueue
基于链表的阻塞队列,可指定容量,默认容量无穷大(2的31次方-1),采用FIFO(first-in-first-out)的访问策略。
public LinkedBlockingQueue()
public LinkedBlockingQueue(int capacity)
public LinkedBlockingQueue(Collection<? extends E> c)
PriorityBlockingQueue
基于优先级的阻塞队列,队列容量由使用者指定,优先级的高低是如何指定的呢,可根据如下的构造函数中看到Comparator<? super E> comparator 就是指定优先级的比较器。因此队列元素(也就是要执行的任务)需要实现Comparator接口,来定义优先级高低。如果不设置容量大小的话,默认队列容量为11。
public PriorityBlockingQueue()
public PriorityBlockingQueue(int initialCapacity)
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator)
public PriorityBlockingQueue(Collection<? extends E> c)
ArrayBlockingQueue
基于数组的阻塞队列,队列容量由使用者指定,由于队列基于数组,初始化后,容量大小就是固定的。当设定boolean fair为true时,队列访问策略为FIFO;如果不设定,则默认false,访问策略是无序的。
public ArrayBlockingQueue(int capacity)
public ArrayBlockingQueue(int capacity, boolean fair)
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)
其他
详细的阻塞队列可参考源码及如下文章学习
https://blog.csdn.net/qq_38989725/article/details/73298856