前言
最近在看并发编程艺术这本书,对看书的一些笔记及个人工作中的总结。
线程池的优势
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。
线程池的实现原理
提交一个新的任务到线程池中,线程池的处理流程:
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是(也就是当前执行任务的线程数小于核心线程数corePoolSize),则创建一个新的线程执行任务。如果线程池中的线程都在执行任务,也就是说当前的工作线程数大于等于corePoolSize),则进入下个流程
2)线程池判断工作队列(一般是阻塞队列BlockingQueue)是否已经满了。如果工作队列没有满,则将任务增加到工作队列中。如果工作队列满了,则进入下个流程,
3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略(或者叫做拒绝策略)来处理这个任务。
ThreadPoolExecutor执行execute方法分下面4种情况。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
下面是ThreadPoolExecutor的一个构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
稍微分析一下这几个参数,官方api都有讲:
corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize:线程池中允许的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果使用的是无界的队列,那么这个参数没有意义了。
keepAliveTime:线程池工作线程空闲后,保持的存活时间。
unit :上面时间的单位
workQueue:执行任务在执行之前加入的queue。该队列仅保存由execute方法提交的Runnable任务。
threadFactory:创建新线程的工厂方法。可以通过线程工厂给每个创建的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字
handler:当队列满时,线程处于饱和策略。拒绝策略handler
jdk1.5提供了下面几种策略:
AbortPolicy:直接抛异常。
DiscardPolicy:顾名思义,直接丢弃多余的任务。
DiscardOldestPolicy:丢弃队列中最老的任务,并且重试exexute方法,并发书中说丢弃最新的一个任务,这边书中应该描述错了。
CallerRunsPolicy:在调用线程中执行丢弃的任务。
也可以自定义拒绝策略,比如记录日志或者持久化处理。
执行源码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
执行线程次提交的任务
通过execute方法和submit方法(submit提供了重载方法)。execute不返回值,而submit返回值,并且可以通过返回的Future对象的get方法查看返回值。
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
定义在ExecutorService类中:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//BlockingQueue使用有界阻塞队列
public class ThreadPoolExecutorTest1 {
public static void main(String[] args) {
/**
* 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
* 若大于corePoolSize,则会将任务加入队列,
* 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
* 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
*
* 这个构造方法使用默认的拒绝策略,AbortPolicy(即抛出异常)
*/
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, //coreSize
2, //MaxSize
60, //60
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3) //指定一种队列 (有界队列)
);
MyTask mt1 = new MyTask(1, "任务1");
MyTask mt2 = new MyTask(2, "任务2");
MyTask mt3 = new MyTask(3, "任务3");
MyTask mt4 = new MyTask(4, "任务4");
MyTask mt5 = new MyTask(5, "任务5");
//MyTask mt6 = new MyTask(6, "任务6");
pool.execute(mt1);
pool.execute(mt2);
pool.execute(mt3);
pool.execute(mt4);
pool.execute(mt5);
//pool.execute(mt6);
pool.shutdown();
}
}
public class MyTask implements Runnable {
private int taskId;
private String taskName;
public MyTask(int taskId, String taskName){
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public void run() {
try {
System.out.println("run taskId =" + this.taskId);
Thread.sleep(5*1000);
//System.out.println("end taskId =" + this.taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String toString(){
return Integer.toString(this.taskId);
}
}
使用无界BlockingQueue:
与有界队列相比,除非系统资源耗尽,否则无界的队列不存在任务入队失败的情况。当有新任务到来,系统的线程小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。若后续仍有新的队列加入,而没有空闲的线程资源,则队列直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
所以一般不建议使用无界队列
//使用无界队列的时候,最大线程数,拒绝策略等都失去了意义
public class ThreadPoolExecutorTest2 implements Runnable{
private static AtomicInteger count = new AtomicInteger(0);
@Override
public void run() {
try {
int temp = count.incrementAndGet();
System.out.println("任务" + temp);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception{
BlockingQueue<Runnable> queue =
new LinkedBlockingQueue<>();
ExecutorService executor = new ThreadPoolExecutor(5, 10, 120L, TimeUnit.SECONDS, queue);
for(int i = 0 ; i < 100; i++){
executor.execute(new ThreadPoolExecutorTest2());
}
Thread.sleep(1000);
System.out.println("queue size:" + queue.size());
Thread.sleep(2000);
}
}
自定义拒绝策略:
//和第一个demo相比,因为默认使用的策略是AbortPolicy(抛出一个拒绝异常),而我们这个使用了自定义的拒绝策略
public class ThreadPoolExecutorTest3 {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, //coreSize
2, //MaxSize
60, //60
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3) //指定一种队列 (有界队列)
, new MyRejected()
// , new DiscardOldestPolicy()
);
MyTask mt1 = new MyTask(1, "任务1");
MyTask mt2 = new MyTask(2, "任务2");
MyTask mt3 = new MyTask(3, "任务3");
MyTask mt4 = new MyTask(4, "任务4");
MyTask mt5 = new MyTask(5, "任务5");
MyTask mt6 = new MyTask(6, "任务6");
pool.execute(mt1);
pool.execute(mt2);
pool.execute(mt3);
pool.execute(mt4);
pool.execute(mt5);
pool.execute(mt6);
pool.shutdown();
}
}
public class MyRejected implements RejectedExecutionHandler {
public MyRejected(){
}
//拒绝策略实际调用的方法处理丢弃的任务,这边可以持久化,可以进行日志处理
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定义处理..");
System.out.println("当前被拒绝任务为:" + r.toString());
}
}
下一篇博客将会分析一下jdk提供的Executor框架创建线程池的几种底层实现原理。