1.什么是:
- java.util.concurrent.Excutors提供的接口用于实现创建线程池。
- 降低资源的消耗:降低线程创建和销毁的资源消耗。
- 提高相应速度:线程的创建时间是T1,执行时间是T2,销毁时间是T3,免去了T1和T3的时间。
- 提高线程的管理性。
2.线程池的基本架构:
1.接口(ExcutorService)的主要方法:
java.util.concurrent.ExcutorService是java线程池框架的主要接口,用Future保存任务的运行状态及运算结果,主要方法有:
- void execute(Runnable) //提交任务到线程池
- Future<?> submit(Runnable) //提交任务到线程池,并放回Future
- Future<T> submit(Runnable,T) //提交任务到线程池,并返回Future,第二个参数回作为计算结果封装到Future
- Future<T> submit(Callable) 提交任务到线程池,并返回Future,call方法的计算结果会封装到Future
- List<Future<T>> invokeAll(Collection extends Callable tasks) // 批量提交任务,并返回结果
- T invokeAny(Colleciton extends Callable tasks) //提交一个任务,并返回结果
2.线程池主要调度类(ThreadPoolExecutor):
ThreadPoolExecutor是线程池框架的主要实现类
2.1各个参数的含义:
- int corePoolSize //线程池中核心线程数;当<corePoolSize,就会创建新线程,=corePoolSize,就会保存到BlockingQueue,如果调用preStartAllThreads(),就会一次性启动corePoolSize个线程数。
- int maximumPoolSize // 允许的最大线程数,BlockingQueue满了,当<maximumPoolSize,就会再创键新的线程。
- long keepAliveTime //线程空闲下来后,存活的时间,这个只在>corePoolSize才有用。
- TimeUnit unit //存活时间单位值
- BlockingQueue<Runnable> workQueue //保存任务的阻塞队列,当线程池中的线程数量>corePoolSize,就会进入workQueue
- ThreadFactory threadFactory //创建线程的工厂,给新建的线程赋名
- RejectedExcutionHandler handler //* 饱和策略,jdk提供了四种:*
1.AbortPolicy:直接跑出异常(RejectedExcutionException),默认
2.CallerRunsPolicy:用调用者所在的线程来执行
3.DiscardOldestPolicy:丢弃阻塞队列里最老的任务
4.DiscardPolicy:当前任务直接丢弃
如果需要实现自己的饱和策略,实现RejectedExcutionHandler接口。
2.2线程池的工作顺序:
corePoolSize --> 任务队列 --> maximumPoolSise --> 拒绝策略
2.3提交任务的方式:
- excute(Runnable command)不需要返回值
- Future<T> submit(Callable<T> task ) 需要返回值
public class one {
public static void main(String[] str) throws ExecutionException, InterruptedException {
two to = new two();
ExecutorService es = Executors.newCachedThreadPool();
Future<Integer> submit = es.submit(to);
es.shutdown();//关闭线程池
System.out.println("获取到的结果:"+submit.get());
}
static class two implements Callable {
@Override
public Object call() throws Exception {
Thread.sleep(2000);
int result = 0;
for(int i=0;i<100;i++){
for(int j=0;j<i;j++){
result += j;
}
}
return result;
}
}
}
2.4关闭线程池方式:
- shutdownNow() :设置线程池状态,尝试停止正在运行或者暂停任务的线程。
- shutdown():设置线程池状态,只会中断所有没有执行任务的线程。
2.5源码解析:
2.5.1.Execute源码:
a).最核心的execute方法,在AbstractExecutorService中并没有实现,直到ThreadPoolExecutor才实现。
b).ExecutorService中的submit(),invokeAll(),invokeAny()都调用了该方法,所以,Execute是核心中的核心:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果正在运行的线程数小于corePoolSize,那么将调用addWorker 方法来创建一个新的线程,并将该任务作为新线程的第一个任务来执行。
当然,在创建线程之前会做原子性质的检查,如果条件不允许,则不创建线程来执行任务,并返回false.
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果一个任务成功进入阻塞队列,那么我们需要进行一个双重检查来确保是我们已经添加一个线程(因为存在着一些线程在上次检查后他已经死亡)或者
当我们进入该方法时,该线程池已经关闭。所以,我们将重新检查状态,线程池关闭的情况下则回滚入队列,线程池没有线程的情况则创建一个新的线程。
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
如果任务无法入队列(队列满了),那么我们将尝试新开启一个线程(从corepoolsize到扩充到maximum),如果失败了,那么可以确定原因,要么是
线程池关闭了或者饱和了(达到maximum),所以我们执行拒绝策略。
*/
// 1.当前线程数量小于corePoolSize,则创建并启动线程。
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 成功,则返回
return;
c = ctl.get();
}
// 2.步骤1失败,则尝试进入阻塞队列,
if (isRunning(c) && workQueue.offer(command)) {
// 入队列成功,检查线程池状态,如果状态部署RUNNING而且remove成功,则拒绝任务
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 步骤1和2失败,则尝试将线程池的数量有corePoolSize扩充至maxPoolSize,如果失败,则拒绝任务
else if (!addWorker(command, false))
reject(command);
}
上面流程解析:
解析步骤:
- 进来后先做空指针校验
- workerCountOf()方法可以获取当前线程池中的线程总数与corePoolSize比较大小
- 如果小于,通过addWorker()方法来执行
- 如果大于,提交到任务队列等待
- 如果进入阻塞队列失败,将会把任务交给线程池
- 如果线程达到最大线程数,任务提交失败,执行拒绝策略
2.5.2 addworker方法源码
execute()方法中添加任务的方式是addworker():
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层循环,用于判断线程池状态
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层的循环,任务是将worker数量加1
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// worker加1后,接下来将woker添加到HashSet<Worker>中,并启动worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果往HashSet<Worker>添加成功,则启动该线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker(Runnable task , boolean core) 主要任务是创建并启动线程,他会根据当前线程的状态和给定值(core or maximum)来判断是否创建线程。
addWorker提供了四种传参方式,execut使用了其中三种:
- addWorker(paramRunnable,true):线程数小于corePoolSize时,放一个需要处理的task到workers set中,如果workers set长度超过corePoolSize,就返回false。
- addWorker(null,false) :放入一个空的task到workers set,长度限制是maximumPoolSize。相当于创建了一个新线程,只是没有立马去分配任务。
- addWorker(paramRunnable,false): 当队列满时,尝试将新来的task直接放入workers set,而此时线程的长度是maximumPoolSize,如果线程池也满了,就返回false
- 还有一种没被使用的是addWorker(null,true)
3.常用的线程池
3.1 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(corePoolSize : var0,
maximumPoolSize : var0,
keepAliveTime : 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
return new ThreadPoolExecutor(var0,
var0,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
var1);
}
- 固定大小线程池,可以指定线程池大小,该线程池corePoolSize和maximumPoolSize相等,队列阻塞使用的是LinkBlockingQueue,大小为整数最大。
- 该线程池中线程数量不变,有新任务提交时,如果有空闲线程则立即执行,如果没有,暂时存入阻塞队列。
- LinkBlockingQueue是一个无界队列,当提交任务很频繁,linkBlockingQueue会迅速膨胀, 存在着耗尽资源的风险。
- 当线程池空闲时,也不会释放工作线程,需要shutdown来停止。
3.2 newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
return new Executors.FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(), var0));
}
- 单个线程的线程池,阻塞队列用的是LinkBlockingQueue,同上。
3.3 newCachedThread
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L,
TimeUnit.SECONDS,
new SynchronousQueue());
}
public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
return new ThreadPoolExecutor(0, 2147483647, 60L,
TimeUnit.SECONDS,
new SynchronousQueue(), var0);
}
- 缓存线程池,缓存的线程默认存活60秒。
- 线程池的核心线程数corePoolSize为0,最大为integer.max_value。
- 阻塞队列是SynchronousQueue,是一个直接提交的阻塞队列,总是迫使线程池增加新的线程去执行任务。
- 当没有线程执行时,空闲(keepAliveTime)时间超过60秒,工作线程将被终止收回。
- 当有任务提交,如果没有空闲线程,则创建新的线程,如果同时有大量任务提交,线程池会新增等量线程处理任务,很可能很快会耗尽系统资源。
3.4 newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
return new ScheduledThreadPoolExecutor(var0, var1);
}
- 定时线程池,该线程通常用于周期性的去执行任务,比如同步数据。
- scheduleAtFixedRate:以固定的频率去执行任务,周期指:每次成功执行任务之间的间隔。
- schedultWithFixedDelay:以固定的延时去执行任务,延时指:上次成功执行后和下次之间的间隔时间。
4.常用线程池实例
4.1 newFixedThreadPoll
public static void main(String[] str){
ExecutorService esFix = Executors.newFixedThreadPool(5);
for(int i=0;i<50;i++){
final int j = i;
esFix.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
}
});
}
esFix.shutdown();//释放资源,否则不释放
}
4.2 newCachedThreadPool
public static void main(String[] str){
ExecutorService esFix = Executors.newCachedThreadPool();
for(int i=0;i<50;i++){
final int j = i;
esFix.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
}
});
}
}
}
- 这里没有调用shutdown()方法,执行完毕,60秒后自动关闭释放资源
4.3 newSingleThreadPool
public static void main(String[] str){
ExecutorService esFix = Executors.newSingleThreadExecutor();
for(int i=0;i<50;i++){
final int j = i;
esFix.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
}
});
}
esFix.shutdown();//释放资源,否则不释放
}
}
4.4 newScheduledThread
public static void main(String[] str){
ScheduledExecutorService esFix = Executors.newScheduledThreadPool(2);
esFix.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行了您呢");
}
},0,2, TimeUnit.SECONDS);
}
}
5.合理配置线程数
根据任务的性质来选取策略:
- 计算密集型:比如加密、大数据分解、正则……,线程数要适当小点,最大为机器的cpu核心数+1(防止页缺失),cpu核心数计算:Runtime.getRuntime().avalibleProcessors()
- IO密集型:读文件、数据库链接、网络通讯等,线程数要适当大点,推荐为机器的核心数x2
- 混合型:尽量拆分,IO密集型>>计算密集型,拆分意义不大,队列上应该选择有界,无界可能导致oom
6.线程工厂
- Executors线程池如果不指定线程工厂,会使用默认的DefaultThreadFactory,是非守护线程。
- 使用自定义的线程工厂可以做很多事,比如:跟踪线程池在何时创建 了多少线程、定义线程名称和优先级。
- 如果将新建的线程都设置成守护线程,当主线程退出后,将会强制销毁线程池。
下面记录了线程的创建,并将所有线程都设置为守护线程:
public class ThreadFactoryDemo {
public static class MyTask1 implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+"Thrad ID:"+Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args){
MyTask1 task = new MyTask1();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
System.out.println("创建线程"+t);
return t;
}
});
for (int i = 0;i<=4;i++){
es.submit(task);
}
}
}
7.阿里规范:
不建议直接使用Executors创建线程,而是通过ThreadPoolExecutor手动创建,使用Executors的两个弊端:
- newFixedThreadPool 和 newSingleThreadPool,主要问题是堆积的请求处理队列中是无界的,可能出现oom问题。
- newCachedThreadPool 和 newScheduledThreadPool,主要问题是线程最大数是Integer.MAX_VALUE,可能出现创建非常多的线程,出现oom
8.手动创建线程池的结果注意点:
1.任务独立。如何任务依赖于其他任务,那么可能产生死锁。例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。
2.合理配置阻塞时间过长的任务。如果任务阻塞时间过长,那么即使不出现死锁,线程池的性能也会变得很糟糕。在Java并发包里可阻塞方法都同时定义了限时方式和不限时方式。例如
Thread.join,BlockingQueue.put,CountDownLatch.await等,如果任务超时,则标识任务失败,然后中止任务或者将任务放回队列以便随后执行,这样,无论任务的最终结果是否成功,这种办法都能够保证任务总能继续执行下去。
3.设置合理的线程池大小。只需要避免过大或者过小的情况即可,上文的公式线程池大小=NCPU *UCPU(1+W/C)。
4.选择合适的阻塞队列。newFixedThreadPool和newSingleThreadExecutor都使用了无界的阻塞队列,无界阻塞队列会有消耗很大的内存,如果使用了有界阻塞队列,它会规避内存占用过大的问题,但是当任务填满有界阻塞队列,新的任务该怎么办?在使用有界队列是,需要选择合适的拒绝策略,队列的大小和线程池的大小必须一起调节。对于非常大的或者无界的线程池,可以使用SynchronousQueue来避免任务排队,以直接将任务从生产者提交到工作者线程。
9.看下facebook工程师是如何定义线程池的:
private static ExecutorService createDefaultExecutorService(Args args) {
SynchronousQueue executorQueue = new SynchronousQueue();
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
executorQueue);
}