1.线程池基本介绍
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这里是7个参数(我们在开发中用的更多的是5个参数的构造方法),OK,那我们来看看这里七个参数的含义:
corePoolSize 线程池中核心线程的数量
maximumPoolSize 线程池中最大线程数量
keepAliveTime 非核心线程的超时时长,当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长
unit 第三个参数的单位,有纳秒、微秒、毫秒、秒、分、时、天等
workQueue 线程池中的任务队列,该队列主要用来存储已经被提交但是尚未执行的任务。存储在这里的任务是由ThreadPoolExecutor的execute方法提交来的。
threadFactory 为线程池提供创建新线程的功能,这个我们一般使用默认即可
handler 拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。
通过构造方法可以看出:
maximumPoolSize < corePoolSize的时候会抛出异常;
maximumPoolSize(最大线程数) = corePoolSize(核心线程数) + noCorePoolSize(非核心线程数);
当currentSize<corePoolSize时,没什么好说的,直接启动一个核心线程并执行任务。
当currentSize>=corePoolSize、并且workQueue未满时,添加进来的任务会被安排到workQueue中等待执行。
当workQueue已满,但是currentSize<maximumPoolSize时,会立即开
启一个非核心线程来执行任务。
- 当currentSize>=corePoolSize、workQueue已满、并且currentSize>maximumPoolSize时,调用handler默认抛出RejectExecutionExpection异常。
2.线程池执行流程
一个线程从被提交(submit)到执行共经历以下流程:
- 线程池判断核心线程池里是的线程是否都在执行任务,如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下一个流程
- 线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的任务储存在这个工作队列里。如果工作队列满了,则进入下一个流程。
- 线程池判断其内部线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已满了,则交给饱和策略来处理这个任务。
线程池在执行execute方法时,主要有以下四种情况
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁)
- 如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)
- 如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
线程池采取上述的流程进行设计是为了减少获取全局锁的次数。在线程池完成预热(当前运行的线程数大于或等于corePoolSize)之后,几乎所有的excute方法调用都执行步骤2。
3.常见的线程池
Executors工厂类可以创建四种类型的线程池,通过Executors.newXXX即可创建。
1. FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
- 它是一种固定大小的线程池;
- corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
- keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
- 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
- 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
- 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
public class MyThread extends Thread {
@Override
public void run() {
super.run();
System.out.println(new Date().toString()+" "+Thread.currentThread().getName()+" MyThread is Running");
}
}
Thread thread1 = new MyThread();
Thread thread2 = new MyThread();
Thread thread3 = new MyThread();
Thread thread4 = new MyThread();
Thread thread5 = new MyThread();
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(thread1);
pool.execute(thread2);
pool.execute(thread3);
pool.execute(thread4);
pool.execute(thread5);
运行结果:(1/2运行的顺序是无序的)
Wed Mar 13 15:37:38 CST 2019 pool-1-thread-2 MyThread is Running
Wed Mar 13 15:37:38 CST 2019 pool-1-thread-1 MyThread is Running
Wed Mar 13 15:37:38 CST 2019 pool-1-thread-2 MyThread is Running
Wed Mar 13 15:37:38 CST 2019 pool-1-thread-1 MyThread is Running
Wed Mar 13 15:37:38 CST 2019 pool-1-thread-2 MyThread is Running
2. CachedThreadPool
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}
- 它是一个可以无限扩大的线程池;
- 它比较适合处理执行时间比较小的任务;
- corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
- keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
- 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
Thread thread1 = new MyThread();
Thread thread2 = new MyThread();
Thread thread3 = new MyThread();
Thread thread4 = new MyThread();
Thread thread5 = new MyThread();
ExecutorService pool = Executors.newCachedThreadPool();
pool.execute(thread1);
pool.execute(thread2);
pool.execute(thread3);
pool.execute(thread4);
pool.execute(thread5);
运行结果:
Wed Mar 13 15:40:03 CST 2019 pool-1-thread-4 MyThread is Running
Wed Mar 13 15:40:03 CST 2019 pool-1-thread-1 MyThread is Running
Wed Mar 13 15:40:03 CST 2019 pool-1-thread-2 MyThread is Running
Wed Mar 13 15:40:03 CST 2019 pool-1-thread-3 MyThread is Running
Wed Mar 13 15:40:03 CST 2019 pool-1-thread-5 MyThread is Running
3. SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){
return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
- 它只会创建一条工作线程处理任务;
- 采用的阻塞队列为LinkedBlockingQueue;
Thread thread1 = new MyThread();
Thread thread2 = new MyThread();
Thread thread3 = new MyThread();
Thread thread4 = new MyThread();
Thread thread5 = new MyThread();
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(thread1);
pool.execute(thread2);
pool.execute(thread3);
pool.execute(thread4);
pool.execute(thread5);
运行结果:
Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running
Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running
Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running
Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running
Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running
4. ScheduledThreadPool
它用来处理延时任务或定时任务。
- 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
- scheduledAtFixedRate
- scheduledWithFixedDelay
SchduledFutureTask接收的参数:
- time:任务开始的时间
- sequenceNumber:任务的序号
- period:任务执行的时间间隔
它采用DelayQueue存储等待的任务
- DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
- DelayQueue也是一个无界队列;
工作线程的执行过程:
- 工作线程会从DelayQueue取已经到期的任务去执行;
- 执行结束后重新设置任务的到期时间,再次放回DelayQueue
public class MyThread extends Thread {
private String name;
public MyThread(String name){
this.name = name;
}
@Override
public void run() {
super.run();
System.out.println(new Date().toString()+" "+Thread.currentThread().getName()+" "+name);
}
}
Thread thread1 = new MyThread("MyThread1");
Thread thread2 = new MyThread("MyThread2");
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(2);
exec.scheduleAtFixedRate(thread1, 0, 3000, TimeUnit.MILLISECONDS);
exec.scheduleAtFixedRate(thread2, 0, 2000, TimeUnit.MILLISECONDS);
运行结果:
Wed Mar 13 15:44:07 CST 2019 pool-1-thread-2 MyThread2
Wed Mar 13 15:44:07 CST 2019 pool-1-thread-1 MyThread1
Wed Mar 13 15:44:09 CST 2019 pool-1-thread-2 MyThread2
Wed Mar 13 15:44:10 CST 2019 pool-1-thread-1 MyThread1
Wed Mar 13 15:44:11 CST 2019 pool-1-thread-2 MyThread2
Wed Mar 13 15:44:13 CST 2019 pool-1-thread-1 MyThread1
Wed Mar 13 15:44:13 CST 2019 pool-1-thread-2 MyThread2
Wed Mar 13 15:44:15 CST 2019 pool-1-thread-2 MyThread2
Wed Mar 13 15:44:16 CST 2019 pool-1-thread-1 MyThread1
Wed Mar 13 15:44:17 CST 2019 pool-1-thread-2 MyThread2
4.Java中的阻塞队列
由于上面的构造方法涉及到了阻塞队列,所以补充一些阻塞队列的知识。
阻塞队列:我的理解是,生产者——消费者,生产者往队列里放元素,消费者取,如果队列里没有元素,消费者线程取则阻塞,如果队列里元素满了,则生产者线程阻塞。
常见的阻塞队列有下列7种:
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
5.RejectedExecutionHandler:饱和策略
当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4中策略:
当工作队列满了,不同策略的处理方式为:
1.Abort策略:默认策略,新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常可由调用者捕获。
2.CallerRuns策略:为调节机制,既不抛弃任务也不抛出异常,而是将某些任务回退到调用者。不会在线程池的线程中执行新的任务,而是在调用exector的线程中运行新的任务。
3.Discard策略:新提交的任务被抛弃。
4.DiscardOldest策略:队列的是“队头”的任务,然后尝试提交新的任务。(不适合工作队列为优先队列场景)
6.关闭线程池
shutdown()
问:shutdown()有什么功能?
答:阻止新来的任务提交,对已经提交了的任务不会产生任何影响。当已经提交的任务执行完后,它会将那些闲置的线程(idleWorks)进行中断,这个过程是异步的。
问:如何阻止新来的任务提交?
答:通过将线程池的状态改成SHUTDOWN,当再将执行execute提交任务时,如果测试到状态不为RUNNING,则抛出rejectedExecution,从而达到阻止新任务提交的目的。
问:为何对提交的任务不产生任何影响?
答:在调用中断任务的方法时,它会检测workers中的任务,如果worker对应的任务没有中断,并且是空闲线程,它才会去中断。另外的话,workQueue中的值,还是按照一定的逻辑顺序不断的往works中进行输送的,这样一来,就可以保证提交的任务按照线程本身的逻辑执行,不受到影响。
shutdownNow()
问:shutdownNow()有什么功能?
答:阻止新来的任务提交,同时会中断当前正在运行的线程,即workers中的线程。另外它还将workQueue中的任务给移除,并将这些任务添加到列表中进行返回。
问:如何阻止新来的任务提交?
答:通过将线程池的状态改成STOP,当再将执行execute提交任务时,如果测试到状态不为RUNNING,则抛出rejectedExecution,从而达到阻止新任务提交的目的.
问:如果我提交的任务代码块中,正在等待某个资源,而这个资源没到,但此时执行shutdownNow(),会出现什么情况?
答:当执行shutdownNow()方法时,如遇已经激活的任务,并且处于阻塞状态时,shutdownNow()会执行1次中断阻塞的操作,此时对应的线程报InterruptedException,如果后续还要等待某个资源,则按正常逻辑等待某个资源的到达。例如,一个线程正在sleep状态中,此时执行shutdownNow(),它向该线程发起interrupt()请求,而sleep()方法遇到有interrupt()请求时,会抛出InterruptedException(),并继续往下执行。在这里要提醒注意的是,在激活的任务中,如果有多个sleep(),该方法只会中断第一个sleep(),而后面的仍然按照正常的执行逻辑进行。