线程池 - ThreadPoolExecutor

使用线程池的好处

1、降低资源消耗:通过重复利用已经创建的线程降低线程的创建和销毁造成的消耗。
2、提高响应速度:当任务到达时,任务可以不需要等到线程创建就立即执行。
3、提高线程的可管理性:使用线程池可以对线程资源进行统一分配、调优和监控。

构造(创建线程池)

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

参数含义

1、corePoolSize:线程池的基本大小,当一个任务提交到线程池时,线程池会创建一个线程来执行任务,其它空闲的基本线程也能够继续执行新任务创建新线程,等到需要执行的任务数大于线程池基本大小时就不会再创建。
2、maximumPoolSize:线程池允许的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
3、keepAliveTime:线程池的工作线程空闲后,保持存活的时间。如果任务很多,并且每个任务执行的时间比较短,可以把这个时间调大一点,提高线程的利用率。
4、unit:线程活动保持时间的单位
5、workQueue:任务队列,用于保存等待执行的任务的阻塞队列。可选队列如下:
  5.1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,遵循FIFO原则。
  5.2、LinkedBlockingQueue:基于链表结构的阻塞队列,遵循FIFO原则。
  5.3、SynchronousQueue:不存储元素的阻塞队列,每一个插入必须等到另一个线程调用移除操作,否则插入操作会一直处于阻塞状态。
  5.4、PriorityBlockingQueue:具有优先级的无限阻塞队列。
6、handler:饱和策略,当队列和线程池都满了,线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。默认使用AbortPolicy。可选策略如下:
  6.1、AbortPolicy:无法处理新任务时直接抛出异常。
  6.2、CallerRunsPolicy:直接在调用者的调用线程中运行任务。
  6.3、DiscardOldestPolicy:丢弃最旧的未处理请求,并执行当前任务。
  6.4、DiscardPolicy:直接丢弃任务。
7、threadFactory:设置创建线程的工厂,可以使用的工厂有:
ThreadFactory()Executors.privilegedThreadFactory()Executors.defaultThreadFactory()等等

线程池处理流程

处理流程.png

1、当任务提交到线程池后,先判断当前运行的线程是否少于corePoolSize:
  少于 > 创建新线程来执行任务
  大于 > 将任务加入到任务队列
2、加入任务队列时判断任务队列是否已满:
  未满 > 加入任务队列
  已满 > 创建新的线程来执行任务
3、队列已满,新建线程执行任务时判断当前运行的线程是否超出了maximumPoolSize:
  未超出 > 新建线程执行任务
  超出 > 拒绝任务,执行所选的饱和策略

线程池中线程执行任务示意图

线程池中线程执行任务示意图
  • 任务提交到线程池后,新建一个线程执行任务
  • 任务执行完后,会反复从任务队列中获取任务来执行

ThreadPoolExecutor 方法

类型 方法 描述
protected void afterExecute(Runnable r, Throwable t) 任务执行结束后调用
protected void beforeExecute(Runnable r, Throwable t) 任务执行之前调用
void allowCoreThreadTimeOut(boolean value) 设置用于控制在保持活动时间内没有任务到达时核心线程是否可能超时并终止的策略,并在新任务到达时根据需要替换
boolean allowsCoreThreadTimeOut() 如果此池允许核心线程超时,并且在keepAlive时间内没有任务到达时终止,返回true
boolean awaitTermination(long timeout, TimeUnit unit) 阻塞直到所有任务在关闭请求后完成执行,或发生超时,或当前线程中断
void execute(Runnable command) 提交不需要返回值的任务
protected void finalize() 当该执行程序不再被引用并且没有线程时,调用shutdown
int getActiveCount() 获取活动的线程数
long getCompletedTaskCount() 获取线程池在运行过程中完成的任务数量
int getCorePoolSize() 获取核心线程数
long getKeepAliveTime(TimeUnit unit) 获取线程保持活动时间
int getLargestPoolSize() 获取线程池中曾经创建过的最大线程数量,通过这个数据可以判断线程池曾经是否满过
int getMaximumPoolSize() 获取允许的最大线程数
int getQueue() 获取线程池中的当前线程数
BlockingQueue<Runnable> getPoolSize() 返回执行任务使用的任务队列
RejectedExecutionHandler getRejectedExecutionHandler() 返回无法执行任务的处理程序
long getTaskCount() 获取线程池需要执行的线程数量
ThreadFactory getThreadFactory() 返回用于创建新线程的线程工厂
boolean isShutdown() 当调用了线程池的shutdown或者shutdownNow后,调用isShutdown()返回true
boolean isTerminated() 当所有任务都关闭后,才表示线程池关闭成功,调用isTerminated()返回true
boolean isTerminating() 当调用了线程池的shutdown或者shutdownNow后,任务还没有全部结束,调用isTerminating()返回true
int prestartAllCoreThreads() 提前创建并启动所有核心线程,使它们空闲地等待工作
boolean prestartCoreThread() 启动一个核心线程,使其闲置地等待工作
void purge() 尝试从工作队列中删除所有已取消的未来任务
boolean remove(Runnable task) 如果执行程序的任务队列中存在此任务,则将其删除
void setCorePoolSize(int corePoolSize) 设置核心线程数
void setKeepAliveTime(long time, TimeUnit unit) 设置线程闲置后存活时间
void setMaximumPoolSize(int maximumPoolSize) 设置允许的最大线程数
void setRejectedExecutionHandler(RejectedExecutionHandler handler) 为无法执行的任务设置新的处理程序
void setThreadFactory(ThreadFactory threadFactory) 设置用于创建新线程的线程工厂
void shutdown() 将线程池设置成shutdown状态,然后中断所有没有正在执行的线程
List<Runnable> shutdownNow() 首先将线程池设置成stop状态,然后尝试停止所有正在执行的任务,暂停正在等待的任务,并返回正在等待执行的任务的列表
protected void terminated() 执行程序终止时调用的方法
String toString() 返回线程池状态、任务数等信息的字符串

提交任务方式

  • execute()提交不需要返回值的任务
  • submit() 提交需要返回值的任务

关闭线程池方式

  • shutdownNow() 首先将线程池设置成stop状态,然后尝试停止所有正在执行的任务,暂停正在等待的任务,并返回正在等待执行的任务的列表
  • shutdown() 将线程池设置成shutdown状态,然后中断所有没有正在执行的线程

例子

package com.sy.thread.example;


import java.util.concurrent.*;

/**
 * Description: thread
 *
 * @author songyu
 */
public class ThreadPoolExecutorTest1 {
    public static void main(String[] args) {
        //定义创建线程使用工厂
        ThreadFactory threadFactory = Executors.privilegedThreadFactory();
        //定义线程池
        //workQueue: 任务队列
        //ArrayBlockingQueue:基于数组结构的有界阻塞队列,遵循FIFO原则
        //LinkedBlockingQueue:基于链表结构的阻塞队列,遵循FIFO原则
        //SynchronousQueue:不存储元素的阻塞队列,每一个插入必须等到另一个线程调用移除操作,否则插入操作会一直处于阻塞状态
        //PriorityBlockingQueue:具有优先级的无限阻塞队列
        //线程池pool的”corePoolSize”和”maximumPoolSize”分别为1和2,这意味着”线程池能同时运行的任务数量最大是2,LinkedBlockingQueue容量 设置为2,代表任务队列只能有两个任务阻塞等待
        //1. 线程进入线程池后判断执行中的线程数量是否超过了corePoolSize,超过了就将任务加入到任务队列,没超过新建线程执行任务
        //2. 任务队列容量满了以后(仅限有界队列),判断执行中的任务数是否超过了maximumPoolSize,没超过创建新的线程继续执行,超过了根据饱和策略处理任务,例子中使用的是AbortPolicy,无法处理任务直接抛出异常
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 100, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2), threadFactory) {
            //自定义beforeExecute和afterExecute方法
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                //System.out.println("任务执行前调用");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                //System.out.println("任务执行后调用");
            }
        };
        //提前创建并启动所有核心线程,使它们空闲地等待工作
        //预热,任务来时可以直接执行任务
        pool.prestartAllCoreThreads();
        //设置饱和策略
        //AbortPolicy 无法处理新任务时直接抛出异常
        //CallerRunsPolicy 直接在调用者的调用线程中运行任务
        //DiscardOldestPolicy 丢弃最旧的未处理请求,并执行当前任务
        //DiscardPolicy 直接丢弃任务
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        try {
            for (int i = 0; i < 5; i++) {
                //定义线程
                int finalI = i;
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("第" + finalI + "线程执行中");
                    }
                }, String.valueOf(i));
                //提交任务到线程池
                //execute 提交不需要返回值的任务
                //submit 提交需要返回值的任务
                //Future<?> future = pool.submit(thread);
                pool.execute(thread);
            }
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
        } finally {
            //关闭线程,两周方式
            //shutdownNow() 首先将线程池设置成stop状态,然后尝试停止所有正在执行的任务,暂停正在等待的任务,并返回正在等待执行的任务的列表
            //shutdown() 将线程池设置成shutdown状态,然后中断所有没有正在执行的线程
            //List<Runnable> runnables = pool.shutdownNow();
            //pool.shutdown();
        }
    }

}

执行结果

第0线程执行中
第1线程执行中
java.util.concurrent.RejectedExecutionException: Task Thread[4,5,main] rejected from com.sy.thread.example.ThreadPoolExecutorTest1$1@29453f44[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.sy.thread.example.ThreadPoolExecutorTest1.main(ThreadPoolExecutorTest1.java:62)
第2线程执行中
第3线程执行中

例子中的逻辑在注释中稍微描述了一下,可以简单看一下,只实现了一下基础用法,ThreadPoolExecutor方法较多,可以自己都试一下。

参考书籍《java并发编程的艺术》推荐大家阅读。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。