java线程池源码分析

execute执行流程

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl低3位表示线程池状态,高19位表示当前线程数
        int c = ctl.get();
        // 如果线程池未达到核心线程数,则创建线程执行
        if (workerCountOf(c) < corePoolSize) {
            // 创建线程执行任务成功,则返回
            if (addWorker(command, true))
                return;
            // 由于并发未创建成功,获取最新ctl值
            c = ctl.get();
        }
        // 线程池状态是running,且任务入队成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 线程池不是running,则将任务出队
            if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略
                reject(command);
                // 线程池是running状态,但是工作线程为0,则创建空任务线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 入队失败,则进行创建线程执行
        else if (!addWorker(command, false))
            reject(command);
    }
private boolean addWorker(Runnable firstTask, boolean core) {
// 这个for基本逻辑就是累加统计的线程数,用CAS重试实现
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 线程池状态不是running状态
            if (rs >= SHUTDOWN &&
            // 这里对应前面任务为空的情况,如果状态为shutdown,且队列为空,则返回失败
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果线程数量大于线程数,则返回失败,这里有两种情况,超过核心数,后面还会重试
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                    // 如果累加当前线程数成功则跳出重试
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                    // 重新获取,进行重试
                c = ctl.get();  
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 线程启动成功标志
        boolean workerStarted = false;
        // 线程添加works成功标志
        boolean workerAdded = false;
        Worker w = null;
        try {
        // 创建work线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {

                    int rs = runStateOf(ctl.get());
                    // 线程池状态是running  或  状态为shutdown 且 任务为空,则添加到wokers
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        // 更新largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 添加works成功则启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        // 任务不为空 或  阻塞队列中有任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 进行中断线程判断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                // 线程任务前置处理
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    // 线程任务后置执行
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
// 当调用shutdown时,getTask阻塞停止调用该方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) 
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试中断所有线程任务
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容