Java 线程池源码阅读笔记

创建线程池

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

其中参数的含义为

corePoolSize 为核心线程梳理
       corePoolSize 个线程会一直存在线程池之中,直到 workQueue 所有任务都被执行。  
maximumPoolSize 最大线程梳理
        其中 corePoolSize 不能小于 0
            maximumPoolSize 不能小于1
            maximumPoolSize 不能小于 corePoolSize
keepAliveTime   非核心线程最大存活时间
       如果线程池中的线程,超出 corePoolSize ,多出的线程执行完任务后将会被回收。        
       keepAliveTime 为该线程空闲时,最长存活时间。
unit 时间单位
workQueue 任务队列
      当线程池 corePoolSize 个线程全部都在运行时,任务会被放到该队列
threadFactory  线程工厂
      创造线程的工厂
handler   处理无法执行的任务(任务队列满,也无法创建新的线程)
      无法被执行的任务,通过这个handler处理

添加任务到线程池

executorService.execute( Runnable)
executorService.submit( Runnable)

Executors 框架提供的方式

Executors 提供了四种创建线城市的方式
分别是

Executors.newSingleThreadExecutor();
              创建单线程池。   
              corePoolSize =  maximumPoolSize = 1
              
Executors.newCachedThreadPool()
              最小为0 ,最大为Integer.MAX_VALUE 个线程的线程池
              corePoolSize = 0
              maximumPoolSize = Integer.MAX_VALUE    
    
Executors.newFixedThreadPool(corePoolSize);
              创建 corePoolSize 个核心线程的线程池
              corePoolSize =  maximumPoolSize = corePoolSize
              
Executors.newScheduledThreadPool(corePoolSize)
              最小为corePoolSize ,最大为Integer.MAX_VALUE 个线程的线程池
              corePoolSize = corePoolSize
              maximumPoolSize = Integer.MAX_VALUE

创建线程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

备注

executorService.submit() 会调用 execute
代码在 ThreadPoolExecutor 父类 AbstractExecutorService 中可以查看


  public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

流程图如下

threadPool_02.png

任务执行

ThreadPoolExecutor 中的线程都被封装到 Worker 类之中。

一个 Worker 可以认为就是一个线程。

Worker 被存放到一个叫做 workers 的 HashSet<Worker> 成员变量中。通过

private boolean addWorker(Runnable firstTask, boolean core) {
 ……
}

方法创建 Worker

addWorker 方法可以分为两个部分

第一部分
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

该部分通过以下条件判断是否可以继续创建线程

 1、线程池是否还在运行
 2、当前线程数量是否超过最大线程梳理
 3、如果可以创建线程 ctl 对象(AtomicInteger)+1 

备注

ctl 对象是用来记录当前线程数量,线程池运行状态的 AtomicInteger 。
AtomicInteger 一共 32 为存储空间
前 3 位代表线程池运行状态,后面 29 位表示能线程数量
第二部分
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;

第二部分代码创建 Worker 对象,并且执行了 Worker 中 thread.run() 方法。

通过 Worker 构造函数,可以得知:thread.run() 最终执行的是 Worker 的 run 方法。

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

而 work.run 中会调用 ThreadPoolExecutor 的 runWorker 方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            ……
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker()方法最终会调用我们 executorService.execute(runnable) 中的 runnable.run()

以上流程图如下

threadPool_03.png

线程回收

上文中说过 ThreadPoolExecutor 会维护 corePoolSize 个线程常驻(或者不到 corePoolSize)。
超出 corePoolSize 不到 maximumPoolSize 的线程在执行完毕任务以后,就会被回收。

线程伴随的 Worker 创建以后,在 runWorker(Worker w) 方法中 开始执行一个名字为 firstTask 的 Runnable 。

然后在 runWorker(Worker w) 一直调用 getTask() 从 workQueue 中获取新的 Runnable 执行。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

如果 getTask 中返回了 null ,线程将会执行完毕,进行回收。

如果 getTask 在阻塞 比如在

  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

或者返回了一个 Runnable 线程将会继续执行。

流程图如下:

threadPool_04.png

任务队列 workQueue 的维护

[创建线程] 有提到,当线程池线程超过核心线程时,会尝试将任务放到。如果添加不成功再试图创新新的线程。核心方法是 BlockingQueue 的 offer(E e) 方法

BlockingQueue<Runnable> workQueue;
……
workQueue.offer(command)

BlockingQueue 本身是一个接口,线程池中使用的 BlockingQueue 子类有

SynchronousQueue     (newCachedThreadPool)
LinkedBlockingQueue  (newFixedThreadPool)
LinkedBlockingQueue  (newSingleThreadExecutor)
DelayedWorkQueue      (newScheduledThreadPool)
  1. 其中 newFixedThreadPool 和 newSingleThreadExecutor 使用 LinkedBlockingQueue ,并且 corePoolSize = maximumPoolSize 。

     在线程数量到达corePoolSize 以后的任务都会被存在 workQueue 之中。
     直到  LinkedBlockingQueue 对了满为止。
     LinkedBlockingQueue 默认容量为 Integer.MAX_VALUE
    
  2. newCachedThreadPool 创建的线程池。corePoolSize = 0,maximumPoolSize Integer.MAX_VALUE 。

     SynchronousQueue 本身没有什么容量 
     offer() 往queue里放一个element后立即返回,
     如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;
     否则返回false
     
     所以 SynchronousQueue 可以重复利用存活的线程,在无法使用存活线程执行任务时
     创建新的线程。
     
     newCachedThreadPool 创建的线程,在没有任务可以执行的时候,会回收
    
  3. newScheduledThreadPool 创建的线程池,corePoolSize < maximumPoolSize = Integer.MAX_VALUE 。使用了 DelayedWorkQueue 控制

    DelayedWorkQueue 是一个无界队列,初始大小为 16 ,
    可以自增长,最大为 Integer.MAX_VALUE。

ScheduledThreadPoolExecutor 提供了一些不同于其他线程池的功能,这里不做研究。

有兴趣的可以看下其他人的研究,比如 深入理解Java线程池:ScheduledThreadPoolExecutor

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容