我们在java并发编程- 5 - 线程池下join()的替代方案:CountDownLatch、CyclicBarrier 里其实已经用过线程池了。线程池是为了让线程可重复利用,并且方便对线程的管理。
之前我们用的是Executors去创建的线程池。但是现在很多公司的编码规范里已经要求不能使用Executors了。原因是因为它使用了无界的LinkedBlockingQueue,在高负载下,很容易OOM。
推荐使用ThreadPoolExecutor。
1 基本用法:
//创建一个线程池:
//corePooSize:线程池维护的线程的最小数量。
//maximumPoolSize:线程池维护的线程的最大数量
//keepAliveTime: 允许最大空闲的时间。当前线程>=corePooSize,超过该时间的空闲的线程,会被回收
//unit: keepAliveTime的单位
//workQueue:缓冲队列
//申请线程时:1、如果当前线程<corePooSize,则新创建。
// 2、如果当前线程>=corePooSize,workQueue没满,则线程进入workQueue
// 3、如果当前线程>=corePooSize,workQueue已满,maximumPoolSize未达到,则新建线程处理任务。
// 4、如果当前线程>=corePooSize,workQueue已满,maximumPoolSize已达到,则执行handler的模式去拒绝任务。
//handler:线程池对拒绝任务的处理方式
// ThreadPoolExecutor.DiscardOldestPolicy() :抛弃旧的任务
// ThreadPoolExecutor.AbortPolicy() : 默认, 抛出java.util.concurrent.RejectedExecutionException异常
// ThreadPoolExecutor.CallerRunsPolicy() : 重试添加当前的任务,他会自动重复调用execute()方法
// ThreadPoolExecutor.DiscardPolicy() :抛弃当前的任务
ThreadPoolExecutor executor=new ThreadPoolExecutor(20,50,3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),new ThreadPoolExecutor.DiscardOldestPolicy());
@Test
public void executorTest() throws InterruptedException {
executor.execute(()->{
while (true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("1");
}
});
executor.execute(()->{
while (true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("2");
}
});
//阻塞,直到线程池里所有任务结束
executor.shutdown();
boolean loop=true;
do { //等待所有任务完成
loop = !executor.awaitTermination(2, TimeUnit.SECONDS);
} while(loop);
}
需要注意2点:
1、尽量使用有界队列。
2、如果使用默认的拒绝策略,一定要做好异常处理。
2 怎么获取到任务执行结果
前面我们用的execute()方法没办法返回任务执行结果。需要用刀Future相关:
@Test
public void futureTest() throws ExecutionException, InterruptedException {
// 创建 FutureTask
FutureTask<User> futureTask = new FutureTask<User>(()-> {
return new User(2222,1000);
});
// 创建线程池
ThreadPoolExecutor es=new ThreadPoolExecutor(20,50,3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),new ThreadPoolExecutor.DiscardOldestPolicy());
// 提交 FutureTask
es.submit(futureTask);
// 获取计算结果
User result = futureTask.get();
log.info("result:"+result);
}
futureTask还可以提供了:取消任务、判断是否执行完成等功能。