JUC 已经提供了一些现成的线程池给开发者使用,但是这些线程池或多或少不能满足具体的业务开发需求,所以在使用线程池的时候,大多是自己创建。
JUC 提供的线程池
-
newSingleThreadExecutor
,创建只有一个线程的线程池。 -
newFixedThreadPool
,用来创建固定大小的线程池。 -
newWorkStealingPool
,创建具有抢占式操作的线程池。 -
newCachedThreadPool
,创建可以自动扩容的线程池,增长没有上限。 -
newScheduledThreadPoo
l,创建有定时任务功能的线程池。
这些线程池要么没有自动扩容的功能,要么自动扩容的大小不能控制,所以不太能够契合具体的业务场景。
创建自定义线程池
public class TestThreadPoolExecutor {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy());
}
}
ThreadPoolExecutor
最多需要 7 个参数。方法分别是
corePoolSize
,核心线程数。线程池新建的时候,是没有核心线程的,只有当任务要处理的时候才会新建线程。当线程空闲,没有任务要处理,这些线程不会被销毁。maximumPoolSize
,最大线程数。当核心线程被用完,并且队列满了,会新建线程,最多达到这个数。keepAliveTime
,保持存活时间。当线程池空闲,多于核心线程数的线程在这个时间后被销毁。TimeUnit
,保持存活时间的时间单位,可以设置为毫秒,秒等。-
workQueue
,任务储存队列,当核心线程数被用完,多余的任务会进入这个队列等待。3 种队列:
-
SynchronousQueue
是直接交换,这个队列没有容量,新的任务会直接交给线程池处理。= -
LinkedBlockingDeque
是无界队列,没有容量限制。如果任务生产的速度大于消费速度,可能会造成 OOM。 -
ArrayBlockingQueue
是有界队列,可以设置队列储存任务的数量。
-
threadFactory
(可选),当线程池需要新建线程的时候会使用 threadFactory 来新建。这个一般用默认的。-
RejectedExecutionHandler
(可选),当线程池和任务对接都满了的时候,线程池会拒接新的任务,这个参数是设置拒绝策略的。4 种拒绝策略:
-
AbortPolicy
,抛出异常 -
DiscardPolicy
,丢弃任务 -
DiscardOldestPolicy
,丢弃队列中最老的任务, 把位置留给新的任务。 -
CallerRunsPolicy
,谁提交的任务谁自己运行。
-
使用线程池
public class TestThreadPoolExecutor {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy());
threadPool.submit(new Runnable() {
@Override
public void run() {
}
});
}
}
通过使用 submit 方法可以向线程池里提交任务,任务的逻辑写在 Rannable 对象的 run 方法中。
任务太多怎么拒绝
4 种拒绝策略
-
AbortPolicy
,抛出异常 -
DiscardPolicy
,丢弃任务 -
DiscardOldestPolicy
,丢弃队列中最老的任务, 把位置留给新的任务。 -
CallerRunsPolicy
,谁提交的任务谁自己运行。
停止线程池
-
shutdown
,这个方法会等待存量任务执行完毕,并且不接收新的任务。直到存量任务执行完毕,线程池关闭。 -
isShutdown
,这个方法用来判断线程池是否开始停止。不是指线程池完全停止,而是看线程池是否开始停止,不在接收新的任务。 -
isTerminated
,这个方法用来判断线程池是否完全停止。 -
awaitTermination
,等待一段时间,如果这段时间内线程池的任务执行完毕,返回 true,否则返回 false。 -
shutdownNow
,这个方法用来立刻停止线程池,返回从未开始执行的任务的列表。使用需要格外谨慎,如果线程池里的任务是开发者自己编写,需要对中断做额外的处理。也就是任务正确停止,不能做没有后续处理。
示例如下:
public class TestThreadPoolExecutor {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy());
threadPool.submit(new Runnable() {
@Override
public void run() {
}
});
threadPool.shutdown();
threadPool.isShutdown();
threadPool.isTerminated();
try {
threadPool.awaitTermination(10,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<Runnable> runnables = threadPool.shutdownNow();
}
}
线程池的钩子方法,在执行之前,之后做一些事
- 实现自己的线程池,继承 ThreadPoolExecutor。
- 必须要重写默认的四个构造方法。
- 重写 beforeExecute 和 afterExecute,在执行之前和执行之后做一些事情。
示例:
public class PauseableTreadpool extends ThreadPoolExecutor {
public PauseableTreadpool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableTreadpool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableTreadpool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableTreadpool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
public static void main(String[] args) {
PauseableTreadpool pauseableTreadpool = new PauseableTreadpool(5, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy());
pauseableTreadpool.submit(new Runnable() {
@Override
public void run() {
}
})
}
}