1.线程池
1.1.为什么使用线程池
频繁创建/销毁/切换线程需要进行CPU调度,会造成极大系统资源开销。
相对于自行管理线程,使用线程池:
1.复用线程。通过复用创建的了的线程,减少了线程的创建、销毁的开销。
2.能有效提高线程的可管理性。基于线程池可设定核心线程数、最大线程数、饱和策略等。能够有效避自行管理线程遇到的种种问题:如线程创建过多,导致频繁地上下文切换。
1.2.Executor框架
Executor框架实现了任务单元与执行单元的分离。
Executor框架,有三个关键部分:
1.任务生产
生产任务首先要声明Runnable/Callable类型的任务。然后将任务实例提交给线程池。
2.任务处理
由线程池中的线程按照一定策略将任务消费掉。
3.结果获取
通过Future接口异步获取执行结果。Executor框架设计模式:生产者与消费者模式
1.3.JDK中的线程池
ThreadPoolExecutor是java.util.concurrent并发工具包中所有线程池实现类的父类。
1.3.1 ThreadPoolExecutor构造参数
- int corePoolSize 线程池中核心线程数。
当一个任务通过execute/submit方法提交到线程池时:
1>如果线程的数量小于corePoolSize,即使部分线程处于空闲状态,也会创建新的线程来处理这个任务。
2>如果线程的数量等于corePoolSize,而阻塞队列 workQueue未满,那么任务被放入阻塞队列。
3>如果线程的数量大于等于corePoolSize,阻塞队列workQueue满了。并且线程池中的数量小于maximumPoolSize,则创建新的线程来处理被添加的任务。
4>如果阻塞队列workQueue满了,并且线程池中的数量达到maximumPoolSize。那么通过线程池指定的饱和策略来处理。
- 调用prestartAllCoreThreads方法就会一次性的启动corePoolSize数量的线程。
int maximumPoolSize 允许的最大线程数。
当BlockingQueue满了且池中的线程数小于maximumPoolSize时,如果有新的任务提交到线程池会创建新的线程。
当BlockingQueue满了且池中的线程数达到maximumPoolSize时,则会用饱和策略来处理long keepAliveTime 空闲存活时间
线程空闲下来后,存活的时间。这个参数只在线程数大于corePoolSize才有用。TimeUnit unit 存活时间的单位值
BlockingQueue<Runnable> workQueue 保存任务的阻塞队列
建议使用有界队列ThreadFactory threadFactory 创建线程的工厂【OPT】
它会给新建的线程赋予名字RejectedExecutionHandler handler 饱和策略【OPT】
1>AbortPolicy 直接抛出异常。默认策略。
2>CallerRunsPolicy 用调用者所在的线程来执行任务。
3>DiscardOldestPolicy 丢弃阻塞队列里最老的任务。即队列里最靠前的任务或最先提交的任务。
4>DiscardPolicy :直接丢弃新提交的任务。
定制自己的饱和策略,实现RejectedExecutionHandler接口即可。
1.3.2. 提交任务
提交任务到线程池可调用以下两个方法:
- void execute(Runnable command) 不需要返回
- Future<T> submit(Callable<T> task) 需要返回。返回结果通过Future.get()获取
1.3.3. 关闭线程池
关闭线程池可调用以下两个方法:
- shutdownNow() 立即中断所有线程。不会等待线程执行的任务执行完成。还会尝试停止已经暂停任务的线程。返回等待执行的任务列表。注意终止线程使用的是interrupt方法。声明任务(Runnable、Callable)时,要注意判断线程的中断标志位。
- shutdown() 中断所有没有执行任务的线程。正在执行任务的线程执行完成后结束。
1.3.4. 工作机制
提交任务到线程池。
- 当池中线程数量小于corePoolSize时,会创建新的线程执行任务。
- 当池中线程数量达到corePoolSize且阻塞队列workQueue未满,则将任务加入workQueue。
- 当池中线程数量大于等于corePoolSize小于maximumPoolSize且阻塞队列workQueue已满。则继续创建线程执行任务。
- 当池中线程数量达到maximumPoolSize且阻塞队列workQueue已满。则根据饱和策略处理。
饱和策略见章节1.2.1
1.3.5. 线程池的配置
需要使用线程池执行的任务,大致分为三类:计算密集型任务、IO密集型任务(如读取文件、数据库连接,网络通讯)、混合型。
线程数量
计算密集型:执行此类任务的线程池,建议线程数量配置为机器的Cpu核心数+1。之所以+1是为了防止页缺失。这里不展开说。执行Runtime.getRuntime().availableProcessors()方法可以获得CPU的核心数。
IO密集型:建议线程数量不超过CPU核心数*2。
混合型:尽量拆分。没有固定标准。但线程数量应设定在CPU核心数的1-2倍之间。队列的选择
应该使用有界,无界队列可能会导致内存溢出。
1.3.6. 栗子
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class TestThreadPool {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
//工作窃取线程池,依靠ForkJoinPool实现。
ExecutorService exs = new ThreadPoolExecutor(5, 10, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());
try {
int taskCount = 30;
//结果集
List<Integer> list = new ArrayList<>();
List<Future<Integer>> futureList = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
futureList.add(exs.submit(new Task(i)));
}
//==================结果归集===================
for (int i = 0; i < futureList.size(); i++) {
Future<Integer> future = futureList.get(i);
Integer result = future.get();//线程在这里阻塞等待该任务执行完毕
System.out.println("任务" + i + "完成时间:" + System.currentTimeMillis());
list.add(result);
}
System.out.println("list=" + list);
System.out.println("总耗时=" + (System.currentTimeMillis() - start));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();//关闭线程池
}
}
static class Task implements Callable<Integer> {
Integer idx;
public Task(Integer idx) {
super();
this.idx = idx;
}
@Override
public Integer call() throws InterruptedException {
//测试工作窃取
if (idx == 4) {
Thread.sleep(2001);
} else {
Thread.sleep(1000);
}
System.out.println("线程:" + Thread.currentThread().getName() + "任务i=" + idx + ",执行完成!");
return idx;
}
}
}
执行结果
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3cd1a2f1 rejected from java.util.concurrent.ThreadPoolExecutor@2f0e140b[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
线程:pool-1-thread-4任务i=3,执行完成!
线程:pool-1-thread-6任务i=15,执行完成!
线程:pool-1-thread-7任务i=16,执行完成!
线程:pool-1-thread-3任务i=2,执行完成!
线程:pool-1-thread-8任务i=17,执行完成!
线程:pool-1-thread-9任务i=18,执行完成!
线程:pool-1-thread-10任务i=19,执行完成!
线程:pool-1-thread-2任务i=1,执行完成!
线程:pool-1-thread-1任务i=0,执行完成!
线程:pool-1-thread-5任务i=4,执行完成!
线程:pool-1-thread-9任务i=10,执行完成!
线程:pool-1-thread-1任务i=13,执行完成!
线程:pool-1-thread-4任务i=6,执行完成!
线程:pool-1-thread-2任务i=12,执行完成!
线程:pool-1-thread-7任务i=7,执行完成!
线程:pool-1-thread-10任务i=11,执行完成!
线程:pool-1-thread-6任务i=5,执行完成!
线程:pool-1-thread-8任务i=9,执行完成!
线程:pool-1-thread-3任务i=8,执行完成!
线程:pool-1-thread-5任务i=14,执行完成!
- 结果分析
示例中声明一个corePoolSize为5、maximumPoolSize为10、阻塞队列大小为10、饱和策略为直接丢弃的线程池。
执行了20个任务,其余10个被丢弃。可以尝试一下不同的饱和策略,执行结果不同。
1.3.7. 预定义线程池
一般不直接拿来用,但可以参考。
Executors 提供了一系列静态工厂方法用于创建各种线程池:
- FixedThreadPool
创建固定数量的线程。适用计算密集型任务。使用了无界队列。 - SingleThreadExecutor
创建单个线程。适用需要保证顺序执行的任务。使用了无界队列。 - CachedThreadPool
会根据需要来创建新线程的。适用执行很多短期异步任务的程序。使用了SynchronousQueue。 - WorkStealingPool(JDK7以后)
基于ForkJoinPool实现。 - ScheduledThreadPoolExecutor
适用需要定期执行周期任务。
Executors中定义的创建方法:
1> newSingleThreadScheduledExecutor:只创建一个线程。适用于执行需要保证执行顺序的定期任务。
2> newScheduledThreadPool 创建一个或多个线程。适用于执行周期任务。
ScheduledThreadPoolExecutor 提交任务的方法:
1> schedule:任务只执行一次。可设定执行时延。
2> scheduleAtFixedRate:任务按照固定的时间间隔定期(开始)执行。可设定执行时延。
3> scheduleWithFixedDelay:任务按照固定的(距前一个任务执行完成的时间)时延执行。
Spring的@Scheduled注解便是基于ScheduledThreadPoolExecutor实现的。默认线程池中1个执行线程。
建议根据任务的配置情况设置线程数。例如设定线程池1个线程,声明多个任务。可能存在任务A到了执行时间却不执行,等待”独苗“线程执行完正在执行的任务B。
3.CompletionService
CompletionService 允许以异步的方式一边生产新的任务,一边处理已完成任务的结果。
这样可以将执行任务与处理任务分离开来。使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果。
ExecutorCompletionService:实现了CompletionService。
3.1栗子:
3.1.1 自行归集VSCompletionService归集
import java.util.Random;
import java.util.concurrent.*;
public class CompletionServiceTest {
private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors();
// 方法一,自己写集合来实现获取线程池中任务的返回结果
public void testSumUpByQueue() throws Exception {
int maxWorkTime = 0;
long start = System.currentTimeMillis();
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
//容器存放提交给线程池的任务,list,map,
BlockingQueue<Future<SleepResult>> queue =
new LinkedBlockingQueue<>();
// 向里面扔任务
for (int i = 0; i < TOTAL_TASK; i++) {
Future<SleepResult> future = pool.submit(new Task());
queue.add(future);//i=0 先进队列,i=1的任务跟着进
}
// 检查线程池任务执行结果
for (int i = 0; i < TOTAL_TASK; i++) {
SleepResult res = queue.take().get();///i=0先取到,i=1的后取到
//模拟结果处理耗时
Thread.currentThread().sleep(100);
if (maxWorkTime < res.getSleepTime()) {
maxWorkTime = res.getSleepTime();
}
System.out.println(res.getDescription());
}
// 关闭线程池
pool.shutdown();
System.out.println("最大执行时间:" + maxWorkTime + "ms,归集耗时:"
+ (System.currentTimeMillis() - start) + " ms");
}
// 方法二,通过CompletionService来实现获取线程池中任务的返回结果
public void testSumUpByCompletionService() throws Exception {
int maxWorkTime = 0;
long start = System.currentTimeMillis();
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
CompletionService<SleepResult> cService = new ExecutorCompletionService<>(pool);
// 向里面扔任务
for (int i = 0; i < TOTAL_TASK; i++) {
cService.submit(new Task());
}
// 检查线程池任务执行结果
for (int i = 0; i < TOTAL_TASK; i++) {
SleepResult res = cService.take().get();
//模拟结果处理耗时
Thread.currentThread().sleep(100);
if (maxWorkTime < res.getSleepTime()) {
maxWorkTime = res.getSleepTime();
}
System.out.println(res.getDescription());
}
// 关闭线程池
pool.shutdown();
System.out.println("最大执行时间:" + maxWorkTime + "ms,归集耗时:"
+ (System.currentTimeMillis() - start) + " ms");
}
public static void main(String[] args) throws Exception {
CompletionServiceTest t = new CompletionServiceTest();
t.testSumUpByQueue();
t.testSumUpByCompletionService();
}
class Task implements Callable<SleepResult> {
public Task() {
}
@Override
public SleepResult call() {
int sleepTime = new Random().nextInt(1000);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
SleepResult result = new SleepResult();
result.setSleepTime(sleepTime);
result.setDescription(Thread.currentThread().getName() + "休眠了:" + sleepTime + "ms");
return result;
}
}
class SleepResult {
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
String description;
public int getSleepTime() {
return sleepTime;
}
public void setSleepTime(int sleepTime) {
this.sleepTime = sleepTime;
}
int sleepTime;
}
}
执行结果:
pool-1-thread-1休眠了:753ms
pool-1-thread-2休眠了:825ms
pool-1-thread-3休眠了:591ms
pool-1-thread-4休眠了:881ms
pool-1-thread-5休眠了:160ms
pool-1-thread-6休眠了:762ms
pool-1-thread-7休眠了:880ms
pool-1-thread-8休眠了:623ms
pool-1-thread-9休眠了:488ms
pool-1-thread-10休眠了:413ms
pool-1-thread-11休眠了:978ms
pool-1-thread-12休眠了:424ms
pool-1-thread-13休眠了:77ms
pool-1-thread-14休眠了:489ms
pool-1-thread-15休眠了:91ms
pool-1-thread-16休眠了:388ms
最大执行时间:978ms,归集耗时:2403 ms
pool-2-thread-9休眠了:16ms
pool-2-thread-10休眠了:32ms
pool-2-thread-11休眠了:71ms
pool-2-thread-6休眠了:144ms
pool-2-thread-16休眠了:151ms
pool-2-thread-4休眠了:163ms
pool-2-thread-8休眠了:322ms
pool-2-thread-12休眠了:358ms
pool-2-thread-15休眠了:379ms
pool-2-thread-5休眠了:447ms
pool-2-thread-13休眠了:453ms
pool-2-thread-7休眠了:611ms
pool-2-thread-3休眠了:660ms
pool-2-thread-1休眠了:748ms
pool-2-thread-2休眠了:936ms
pool-2-thread-14休眠了:979ms
最大执行时间:979ms,归集耗时:1678 ms
- 结果分析
示例中我们使用大小为16的FixedThreadPool并行执行16个任务。然后分别自行归集和使用CompletionService归集。模拟归集每个结果的处理时间是100ms。
任务执行总耗时接近于单个任务执行的最大时间。两个测试单个任务最大执行时间分别是978ms和979ms。
按照惯常的思维,两种方式归集结果耗时应该很接近。然而不然。两种方式归集耗时:一个2403 ms;一个1678 ms。差别很大。(任务执行时间差别很大,两种方式归集的耗时差异会更大)
这是为何呢?从控制台打印的数据中我们发现:
1.自行归集:顺序等待获取执行结果。按照线程1、2、3...16的顺序等待获取工作线程的执行结果。然后归集线程进行归集。
2.CompletionService归集:按线程执行完成的先后顺序,获取执行结果。先返回的结果先进行归集。这样归集线程就省却了不必要的等待时间。
3.1.2 工作窃取线程池workStealingPool
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestCompletionService {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
//工作窃取线程池,依靠ForkJoinPool实现。
ExecutorService exs = Executors.newWorkStealingPool(5);
try {
int taskCount = 30;
//结果集
List<Integer> list = new ArrayList<>();
//1.定义CompletionService
// Future.get: Waits if necessary for the computation to complete, and then retrieves its result.
CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
List<Future<Integer>> futureList = new ArrayList<>();
//2.提交任务
for (int i = 0; i < taskCount; i++) {
futureList.add(completionService.submit(new Task(i)));
}
// for (int i = 0; i < futureList.size(); i++) {
// Future<Integer> future = futureList.get(i);
// Integer result = future.get();//线程在这里阻塞等待该任务执行完毕
// System.out.println("任务" + i + "完成时间:" + System.currentTimeMillis());
// list.add(result);
// }
//==================结果归集===================
for (int i = 0; i < taskCount; i++) {
Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
System.out.println("任务" + i + "完成时间:"+System.currentTimeMillis() );
list.add(result);
}
System.out.println("list=" + list);
System.out.println("总耗时=" + (System.currentTimeMillis() - start));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();//关闭线程池
}
}
static class Task implements Callable<Integer> {
Integer idx;
public Task(Integer idx) {
super();
this.idx = idx;
}
@Override
public Integer call() throws InterruptedException {
//测试工作窃取
if (idx == 4) {
Thread.sleep(2001);
} else {
Thread.sleep(1000);
}
System.out.println("线程:" + Thread.currentThread().getName() + "任务i=" + idx + ",执行完成!");
return idx;
}
}
}
执行结果
线程:ForkJoinPool-1-worker-4任务i=3,执行完成!
线程:ForkJoinPool-1-worker-1任务i=0,执行完成!
线程:ForkJoinPool-1-worker-3任务i=2,执行完成!
线程:ForkJoinPool-1-worker-2任务i=1,执行完成!
任务0完成时间:1565150026330
任务1完成时间:1565150026330
任务2完成时间:1565150026330
任务3完成时间:1565150026330
线程:ForkJoinPool-1-worker-5任务i=4,执行完成!
任务4完成时间:1565150027331
线程:ForkJoinPool-1-worker-4任务i=5,执行完成!
线程:ForkJoinPool-1-worker-2任务i=8,执行完成!
任务5完成时间:1565150027333
线程:ForkJoinPool-1-worker-3任务i=7,执行完成!
线程:ForkJoinPool-1-worker-1任务i=6,执行完成!
任务6完成时间:1565150027333
任务7完成时间:1565150027333
任务8完成时间:1565150027333
线程:ForkJoinPool-1-worker-1任务i=13,执行完成!
线程:ForkJoinPool-1-worker-2任务i=11,执行完成!
线程:ForkJoinPool-1-worker-4任务i=10,执行完成!
任务9完成时间:1565150028334
任务10完成时间:1565150028334
任务11完成时间:1565150028334
线程:ForkJoinPool-1-worker-5任务i=9,执行完成!
线程:ForkJoinPool-1-worker-3任务i=12,执行完成!
任务12完成时间:1565150028334
任务13完成时间:1565150028334
线程:ForkJoinPool-1-worker-1任务i=14,执行完成!
线程:ForkJoinPool-1-worker-3任务i=18,执行完成!
任务14完成时间:1565150029337
线程:ForkJoinPool-1-worker-2任务i=15,执行完成!
线程:ForkJoinPool-1-worker-5任务i=17,执行完成!
线程:ForkJoinPool-1-worker-4任务i=16,执行完成!
任务15完成时间:1565150029337
任务16完成时间:1565150029337
任务17完成时间:1565150029337
任务18完成时间:1565150029337
线程:ForkJoinPool-1-worker-2任务i=21,执行完成!
线程:ForkJoinPool-1-worker-3任务i=20,执行完成!
线程:ForkJoinPool-1-worker-5任务i=22,执行完成!
线程:ForkJoinPool-1-worker-1任务i=19,执行完成!
线程:ForkJoinPool-1-worker-4任务i=23,执行完成!
任务19完成时间:1565150030338
任务20完成时间:1565150030339
任务21完成时间:1565150030339
任务22完成时间:1565150030339
任务23完成时间:1565150030339
线程:ForkJoinPool-1-worker-2任务i=24,执行完成!
线程:ForkJoinPool-1-worker-3任务i=25,执行完成!
线程:ForkJoinPool-1-worker-1任务i=27,执行完成!
线程:ForkJoinPool-1-worker-4任务i=28,执行完成!
线程:ForkJoinPool-1-worker-5任务i=26,执行完成!
任务24完成时间:1565150031341
任务25完成时间:1565150031342
任务26完成时间:1565150031342
任务27完成时间:1565150031342
任务28完成时间:1565150031342
线程:ForkJoinPool-1-worker-2任务i=29,执行完成!
任务29完成时间:1565150032344
list=[3, 0, 2, 1, 4, 5, 8, 7, 6, 13, 11, 10, 9, 12, 14, 18, 15, 17, 16, 21, 20, 22, 19, 23, 24, 25, 27, 28, 26, 29]
总耗时=7033
- 结果分析
示例中 worker-5执行了5个任务。worker-2从woker-5的队列中窃取了一个任务来执行。共执行了7个任务。其它线程执行了6个任务。循环执行CompletionService.take()方法可以取出所有任务执行结果。
使用不同的预定义线程池、不同的配置,执行结果会不同。可以尝试一下。
CompletionService这里不展开说。