Java线程池的优点:
1、降低资源消耗 通过重复利用已创建的线程降低线程创建和销毁的消耗。
2、提高响应速度 当任务到达的时候,不需要等到线程创建就能立即执行。
3、提高线程的可管理性 线程时稀缺资源,使用线程池可以进行统一的分配、调优和监控。
ThreadPoolExecutor 工作原理:
1、如果当前运行的线程少于 corePoolSize
,则直接创建新线程来执行任务。
2、如果运行的线程等于或多于 corePoolSize
,则将任务加入 BlockingQueue
3、如果无法加入到 BlockingQueue
(队列已满),则创建新的线程来处理任务。
4、如果新创建的线程将使当前运行的线程超出 maximumPoolSize
,任务将被拒绝,调用饱和策略处理(RejectedExceptionHandler.rejectedException()方法),默认是AbortPolicy,会抛出Exception。
当前运行的线程的意思是:只要不是
TERMINATED
都算活着。
ThreadPoolExecutor 的创建:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其它空闲的基本线程能够执行新任务,也会创建线程,等到需要执行的任务数大于线程池基本大小时就不在创建。
workQueue
(任务队列): 用于保存等待执行的任务的阻塞队列。
- ArrayBlockingQueue:基于数组结构的有界阻塞队列。FIFO
- LinkedBlockingQueue:基于链表结构的阻塞队列。FIFO
maximumPoolSize
(线程池最大数量):线程池允许的最大线程数。注意,如果使用了无界的任务队列,这个参数就没什么效果。
ThreadFactory
:用于设置创建线程的工厂
RejectedExecutionHandler
(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,必须采用一种策略处理提交的新任务。默认策略是 AbortPolicy,表示无法处理新任务时抛出异常。
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:用调用者所在的线程运行任务。
- DiscardPolicy:不处理,丢弃掉
- DiscardOldestPolicy:丢弃队列最近的一个任务,并执行当前任务。
keepAliveTime
(线程活动保持时间):线程池的工作线程空闲后,保持存活的事件。
TimeUnit
:保持时间单位
。
示例代码演示:
先定义一个任务,供以后使用:
class MyRun(val name: String) : Runnable {
override fun run() {
println("$name is running, ${Thread.currentThread().name}")
TimeUnit.SECONDS.sleep(3)
println("$name is finished")
}
}
调用线程池执行任务一:
val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
executor.execute(MyRun("T1"))
TimeUnit.SECONDS.sleep(20)
executor.execute(MyRun("T2"))
结果:
T1 is running, pool-1-thread-1
T1 is finished
T2 is running, pool-1-thread-2
T2 is finished
分析:
当 T1
执行完成之后,此时 pool-1-thread-1
的线程所处的状态 WAITING
:
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fa1c2083000 nid=0x4403 waiting on condition [0x0000700010f37000]
java.lang.Thread.State: WAITING (parking)
因为 corePoolSize
是 2,所以面对新的任务还是会用 新的线程 pool-1-thread-2
来执行,程序结束后线程池中线程状态:
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fa1c2083000 nid=0x4403 waiting on condition [0x0000700010f37000]
java.lang.Thread.State: WAITING (parking)
"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fa1c287e800 nid=0x320b waiting on condition [0x000070001113d000]
java.lang.Thread.State: WAITING (parking)
调用线程池执行任务二:
val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
executor.execute(MyRun("T1"))
executor.execute(MyRun("T2"))
TimeUnit.SECONDS.sleep(5)
executor.execute(MyRun("T3"))
executor.execute(MyRun("T4"))
执行结果:
T1 is running, pool-1-thread-1
T2 is running, pool-1-thread-2
T2 is finished
T1 is finished
T3 is running, pool-1-thread-2
T4 is running, pool-1-thread-3
T3 is finished
T4 is finished
分析:
T1、T2执行完成后,线程池中线程有:pool-1-thread-1
和 pool-1-thread-2
,此时当T3提交给线程池时,会把任务存到 BlockingQueue中等待执行,再提交T4,此时有可能 BlockingQueue 是满的状态,所以又因为 maximumPoolSize 是 10,所以会新创建线程执行任务:pool-1-thread-3
。
当T4刚刚执行完成,我们来看一下线程池中线程的情况:
"pool-1-thread-3" #13 prio=5 os_prio=31 tid=0x00007f90bb893800 nid=0x4e0b waiting on condition [0x000070000175b000]
java.lang.Thread.State: TIMED_WAITING (parking)
"pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007f90bb00f000 nid=0x3f03 waiting on condition [0x0000700001555000]
java.lang.Thread.State: TIMED_WAITING (parking)
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f90bb892800 nid=0x4103 waiting on condition [0x0000700001452000]
java.lang.Thread.State: WAITING (parking)
因为,此时线程数为3,超过了corePoolSize的个数,这时候线程就会进入 TIMED_WAITING
状态,直到线程数达到 corePoolSize 的个数为止。
我们再等待一段时间(超过30秒),这时候我们来看一下线程池中线程的情况:
"pool-1-thread-3" #13 prio=5 os_prio=31 tid=0x00007f90bb893800 nid=0x4e0b waiting on condition [0x000070000175b000]
java.lang.Thread.State: WAITING (parking)
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f90bb892800 nid=0x4103 waiting on condition [0x0000700001452000]
java.lang.Thread.State: WAITING (parking)
pool-1-thread-2
经过超时时间之后,会销毁。 此时线程池只剩下 pool-1-thread-3
和 pool-1-thread-1
两条线程,维持核心线程数。
调用线程池执行任务三:
fun main() {
val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
executor.execute(MyRun("T1"))
executor.execute(MyRun("T2"))
TimeUnit.SECONDS.sleep(5)
executor.execute(MyRun("T3"))
TimeUnit.SECONDS.sleep(1) //稍微等待一下
executor.execute(MyRun("T4"))
}
结果:
T1 is running, pool-1-thread-1
T2 is running, pool-1-thread-2
T2 is finished
T1 is finished
T3 is running, pool-1-thread-2
T4 is running, pool-1-thread-1
T3 is finished
T4 is finished
分析:
基本和上一个示例差不多,只是在T3
和 T4
提交之间,增加了一个延迟,这时候再提交T4的时候,BlockingQueue
就不为空了,所以T4提交是会放到 BlockingQueue
中等待执行。
合理配置线程池:
CPU密集型任务:应尽可能尽可能小的配置线程(否则老是切换线程上下文的话,反而总时间会边长)。例如配置 N(CPU) + 1
IO密集型任务:IO密集型任务并不是一直再执行任务,则应配置尽可能多线程(因为IO就会阻塞,线程数多的话正好可以利用CPU空闲时间,提高利用率)。例如:2 * N(CPU)。
Executors 中几种常见的线程池:
ThreadPoolExecutor
是线程池的核心 实现类,用来执行提交的任务。
ScheduledThreadPoolExecutor
是一个线程池实现类,可以在给定的延迟后运行任务。或者定期执行任务。
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,比较适用于负载较重的服务器。 任务队列使用的是 无界的 LinkedBlockingQueue,线程池大小有界。 创建用于容纳固定线程数量的池子,每个线程存活时间是无限的,当池子满了不再添加线程,如果池子中线程都在繁忙,则将新任务放入阻塞队列中。
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
是大小无界限的线程池,适用于执行很多短期异步任务的小程序,或者负载较轻的服务器。当新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue 是同步队列,因此会在池中寻找可用线程来执行,若有可用线程则执行,若没有则创建一个新的线程来执行该任务。若空闲线程大于指定超时时间,则线程会被销毁。 SynchronousQueue是BlockingQueue的一种,SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存储任何元素。也就是说SynchronousQueue的每一次insert操作,必须等待其他线性的remove操作。而每一个remove操作也必须等待其他线程的insert操作。
也就是说SynchronousQueue的每一次insert操作,必须等待其他线性的remove操作。而每一个remove操作也必须等待其他线程的insert操作。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 适用于需要保证顺序地执行各个任务,并且在任意时候,不会有多个线程是活动的。创建了只有一个线程的线程池,是无限存活的,繁忙是会有任务塞进任务队列。
ScheduledThreadPoolExecutor
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
适用于需要多个后台线程执行周期任务。同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。