Java线程池源码

1 实现通过线程池执行任务


    public class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ": Thread Id: " + Thread.currentThread().getId());
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    public static void main(String[] args) {
        MyTask task = new MyTask();
        // corePoolSize, maxPoolSize, keepAliveTime, unit
        // workQueue, threadFactory, handler
        ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 20, 0L
                , TimeUnit.MILLISECONDS
                , new LinkedBlockingDeque<>(1024)
                , Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 5; i++) {
            pool.execute(task);
            // pool.submit(task);
        }
        pool.shutdown();
    }
}

创建了一个ThreadPoolExecutor,参数中使用了默认的线程创建工厂DefaultThreadFactory

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

线程池中默认线程名称前缀为"pool-" + poolNumber.getAndIncrement() + "-thread-";

ThreadPoolExecutor构造方法中,将参数赋值到对象成员中

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

2 execute方法提交任务进行处理

我们知道,当工作线程数小于核心线程数,则会创建新线程执行任务,当工作线程数大于corePoolSize,则会将任务丢到workQueue中,当workQueue中存满任务时,则会执行拒绝策略。execute方法中是任务执行的流程:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        // 工作线程数量小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 线程池添加任务成功,则返回
            if (addWorker(command, true))
                return;
            // 线程池添加任务失败,再次获取线程数量
            c = ctl.get();
        }
        // 工作线程数大于核心线程数
        // 线程池处于RUNNING状态,并且往任务队列添加任务成功
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次对线程池状态检查
            int recheck = ctl.get();
            // 线程池状态不是RUNNING ,并且从任务队列删除任务成功
            if (! isRunning(recheck) && remove(command))
                // 拒绝任务
                reject(command);
            // 工作线程数量为0,则线程池处于TIDYING状态
            else if (workerCountOf(recheck) == 0)
                // 添加一个null任务
                addWorker(null, false);
        }
        // 线程池不在RUNNING状态
        // workQueue.offer为false,并且往任务队列添加任务失败,表示任务队列已满
        // (addWorker失败,表示线程池数量达到maxPoolSize)
        //  false表示工作线程数不和corePoolSize比较,而和maxPoolSize比较
        else if (!addWorker(command, false))
            // 执行拒绝策略
            reject(command);
    }
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

上述代码是ctl的定义:

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;

可以看到 RUNNING 表示 workQueue 中限制的线程数目,值为(2^29)-1大概为500 million

而线程池有四种状态:

RUNNING:接收新任务,处理任务队列中的任务

SHUTDOWN:不接受新任务,但是会消费任务队列中的任务

STOP:不接受新任务,不消费任务队列中的任务,并且会interrupt运行中的任务

TIDYING:所有任务终结,workCount数量为0,并且会去调用terminated()方法

TERMINATED:terminated()方法执行完成

四种状态的变化在注释中也已经给出

* RUNNING -> SHUTDOWN
*    On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
*    On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
*    When both queue and pool are empty
* STOP -> TIDYING
*    When pool is empty
* TIDYING -> TERMINATED
*    When the terminated() hook method has completed

3 addWorker往线程池添加任务

addWorker方法往线程池中添加任务,其实现如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 计算当前工作线程数
            int wc = workerCountOf(c);
            // 通过core参数判断工作线程数是否大于corePoolSize或者maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS添加工作线程数
            if (compareAndIncrementWorkerCount(c))
                // 成功则跳出循环
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 通过firstTask封装一个工作对象
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 往workers集合中添加work对象
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 添加成功标记
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                // 标记任务运行状态
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

其中new Worker封装了firstTask任务,构造方法如下

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    // firstTask为提交进来的command,即main方法中提交给线程池的任务
    this.firstTask = firstTask;
    // 通过线程工厂创建线程池,传入this,即worker
    this.thread = getThreadFactory().newThread(this);
}

newThread方法如下:

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

设置为非守护线程,newThread(this)传入的对象为this,为Worker对象,所以,在addWorker方法中的t.start()方法启动时,执行的是Worker的run方法。

addWorker方法主要将任务command对象封装成一个worker,添加到线程池中的HashSet<Worker>中,并且启动该任务,成功则返回true,失败返回false。

4 runWorker方法执行work

Worker的run方法中,执行如下代码:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // Worker中封装的firstTask为demo main方法中提交的任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 执行firstTask或者从阻塞队列中获取任务,执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行task
                    task.run();
                    // 任务出现异常,则抛出异常并停止运行线程池
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Worker对象里创建出来thread数量可以达到corePoolSize个,并且都不会被关闭。这些thread执行的任务为从任务队列中获取添加进来的command,并且执行(即runWorker方法做的事情)。

5 demo总结

再回到execute方法中进行debug,我们可以发现,在当前demo中

前四个MyTask任务提交之后是被addWorker处理的,也就是创建了新的线程。

而后面的5个新任务则是被塞进了workQueue中进行缓存。

当执行到main方法中的 pool.shutdown()这里时,看一下线程调用堆栈

其中运行着5个创建的线程,满足corePoolSize的数量,执行完pool.shutdown(),线程池中的线程会自动从任务队列中取出任务,进行执行,满足前文我们的分析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容