Java多线程编程六 线程池

线程池ThreadPoolExecutor

为什么需要线程池?

线程池能够对线程进行统一分配,调优和监控:
- 降低资源消耗(防止线程不停的创建与销毁,减少了资源消耗)
- 提高响应速度
- 提高线程的可管理性

ThreadPoolExecutor类解析

threadpoolexecutoruml.png

线程池提交任务流程图

tpelct.png

核心参数

源码内部使用了一个Integer类型的原子变量来记录线程池状态(高三位)和线程池线程数(其余)。

状态 高三位 表现
RUNNING -1(111) 接收并允许新任务
SHUTDOWN 0(000) 拒绝新任务,但处理阻塞队列里的任务
STOP 1(001) 拒绝新任务,放弃执行任务,并中断正在处理的任务
TIDYING 2(010) 所有任务执行完后,当前线程池活动数为0,将要调用 terminated 方法
TERMINATED 3(011) terminated()方法执行完毕
//原子变量 ctl 存储线程池状态(高三位)与线程数(其他低位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//线程数的表示位数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//111 - 000~ 
private static final int RUNNING    = -1 << COUNT_BITS;
//000 - 000~ 拒绝新任务,但处理阻塞队列里的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//001 - 000~ 放弃执行任务,并中断正在处理的任务
private static final int STOP       =  1 << COUNT_BITS;
//010 - 000~ 所有任务执行完后,当前线程池活动数为0,将要调用 terminated 方法
private static final int TIDYING    =  2 << COUNT_BITS;
//011 - 000~ 终止状态
private static final int TERMINATED =  3 << COUNT_BITS;

//取高三位
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
//取低其他位
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
//获取 ctl 值
private static int ctlOf(int rs, int wc) { return rs | wc; }

//=========================分割线
private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<>();

private final Condition termination = mainLock.newCondition();

private volatile boolean allowCoreThreadTimeOut;

private int largestPoolSize;//历史最大创建线程数

private long completedTaskCount;//完成的任务数
//-------------------------分割线
//构造方法相关的参数
private volatile int corePoolSize;

private volatile int maximumPoolSize;

private volatile long keepAliveTime;
//阻塞队列
private final BlockingQueue<Runnable> workQueue;

private volatile ThreadFactory threadFactory;

private volatile RejectedExecutionHandler handler;

核心构造方法

public ThreadPoolExecutor(int corePoolSize,//核心线程数
                          int maximumPoolSize,//最大线程数
                          long keepAliveTime,//线程空闲至销毁等待时间
                          TimeUnit unit,//时间单位
                          BlockingQueue<Runnable> workQueue,//工作队列
                          ThreadFactory threadFactory,//线程工厂
                          RejectedExecutionHandler handler//饱和策略
                         ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

拒绝策略比较

拒绝策略 表现
ThreadPoolExecutor.AbortPolicy() 默认 丢弃任务并抛出RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy() 由调用线程(提交任务的线程)处理该任务
ThreadPoolExecutor.DiscardOldestPolicy() 丢弃队列最前面的任务,然后提交当前任务
ThreadPoolExecutor.DiscardPolicy() 丢弃任务,但是不抛出异常。

阻塞队列的比较

队列名 表现
ArrayBlockingQueue 基于数组的有界队列
LinkedBlockingQueue 基于链表的无界队列
SynchronousQueue 最多只有一个元素的队列
PriorityBlockingQueue 优先级队列

提交任务的源码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   /*
    * 1.是否比核心线程数少,是则新建一个核心线程完成该任务
    * 2.尝试添加到阻塞队列
    * 3.尝试新建线程完成任务,失败则线程池已满或关闭,执行拒绝策略
    */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //1
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        //2
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))//3
        reject(command);
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        /* 检查队列是否只在必要时为空,返回false
         * 1.如果线程池状态为 shutdown 以后的状态
         * 2.如果线程池状态为 shutdown 并且有了第一个任务
         * 3.如果线程池状态为 shutdown 且任务队列为空
         */
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                //线程池正在运行 或 shutdown状态且第一个任务为空
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // 线程处于运行态,状态错误 precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);//添加任务
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//启动任务
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

创建线程池的工具

Java 中线程池的顶级接口是 Executor,ExecutorService 是 Executor 的子类,也是真正的线程池接口,它提供了提交任务和关闭线程池等方法。调用 submit 方法提交任务还可以返回一个 Future 对象,利用该对象可以了解任务执行情况,获得任务的执行结果或取消任务。

由于线程池的配置比较复杂,Java SE 中定义了 Executors 类就是用来方便创建各种常用线程池的工具类。通过调用该工具类中的方法我们可以创建线程池

public static ExecutorService newSingleThreadExecutor();
//单线程池。核心线程数和最大线程数都为1,并且阻塞队列的长度为 Integer.MAX_VALUE ,keepAliveTime = 0,说明只要线程空闲就被回收

public static ExecutorService newFixedThreadPool();
//固定数量的线程池,是每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行。创建出一个核心线程数和最大线程数都为 nThread,并且阻塞队列的长度为 Integer.MAX_VALUE ,keepAliveTime = 0,说明只要线程空闲就被回收

public static ExecutorService newCachedThreadPool();
//可缓存线程池,是当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又自动添加新线程来执行,阻塞队列为同步队列.

public static Executors.newScheduledThreadPool(int nThreads):
    //创建一个定长线程池,支持定时及周期性任务执行。(定义线程池时)指定核心线程数量,非核心线程数量不固定。有请求提交时,先判断是否有核心线程空闲,若有核心线程空闲则处理请求。若没有,则判断是否超出任务队列的长度,没有则放置在任务队列中,若超出任务队列长度,则先将任务队列填满,其余的将新建非核心线程来进行处理。

线程池的其他方法

public void execute(Runnable command);
//提交任务command到线程池

public void shutdown();
//调用方法后,线程池不再接收新的任务,但工作队列里的任务还是要执行,该方法会立即方法,并不等待任务队列执行完成。

public List<Runnable> shutdownNow();
//调用该方法后,线程池不再接收新的任务,并且会丢弃工作队列里的任务,正在执行的任务会被中断,该方法会立即返回,并不等待激活线程任务执行完成,返回值为队列里被丢弃的任务队列

public boolean awaitTermination(long timeout, TimeUnit unit) thorws InterruptedException;
//调用方法后,当前线程会被阻塞,直到线程池的状态变成 TERMINATED 才返回,或者等待超时才返回

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor ,并实现了ScheduledExecutorService接口。线程池的段列是DelayedWorkQueue,其中和DelayedQueue类似,是一种延迟队列。ScheduledFutureTask 是具有返回值的任务,继承自FutureTask,内部的state变量表示任务状态,还有一个变量 period 表示任务的类型(0,当前任务是一次性的;负数,当前任务是fixed-delay任务,是固定延时的定时可重复执行任务;正数,当前任务是fixed-rate任务,是固定频率的定时可重复任务。

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit);//该方法的作用是提交一个延迟执行的任务,任务从提交时算起延迟单位为unit的delay时间后开始执行,任务只会执行一次。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);//当前任务执行完毕后,让其在延迟固定时间再次运行,其中initialDelay代表提交任务后延迟多少时间开始执行任务 command,delay 代表任务执行完毕后多少时间后再次运行 command,任务会一直重复运行直到任务运行抛出异常而被取消,或线程被关闭了

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);//按照固定频率执行任务,在initialDelay 开始第一次执行,此后initialDelay+n*period 时间点后再次执行,直到抛出异常或线程被关闭。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。