线程池原理以及如何实现线程复用(强烈推荐)

首先看一段线程池的使用代码

public class FixedThreadPoolDemo { 
    static ExecutorService executorService = Executors.newFixedThreadPool(3); 
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "-> 执行");
            });
        }
        // 关闭线程池
        executorService.shutdown();
    } 
}

一语概括线程池就是启动线程去执行任务的run()方法,创建的worker是真实的线程,这段代码就是任务的run方法

image.png

查看线程池的execute方法如下

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //第一步,获取ctl
        int c = ctl.get();
        //检查当前线程数是否达到核心线程数的限制,注意线程本身是不区分核心还是非核心,后面会进一步验证
        if (workerCountOf(c) < corePoolSize) {
            //如果核心线程数未达到,会直接添加一个核心线程,也就是说在线程池刚启动预热阶段,
            //提交任务后,会优先启动核心线程处理
            if (addWorker(command, true))
                return;
            //如果添加任务失败,刷新ctl,进入下一步
            c = ctl.get();
        }
        //检查线程池是否是运行状态,然后将任务添加到等待队列,注意offer是不会阻塞的
        if (isRunning(c) && workQueue.offer(command)) {
           //任务成功添加到等待队列,再次刷新ctl
            int recheck = ctl.get();
           //如果线程池不是运行状态,则将刚添加的任务从队列移除并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //判断当前线程数量,如果线程数量为0,则添加一个非核心线程,并且不指定首次执行任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       //添加非核心线程,指定首次执行任务,如果添加失败,执行异常策略
        else if (!addWorker(command, false))
            reject(command);
    }

调用了worker方法如下

private boolean addWorker(Runnable firstTask, boolean core) {
       //相当于goto,虽然不建议滥用,但这里使用又觉得没一点问题
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果线程池的状态到了SHUTDOWN或者之上的状态时候,只有一种情况还需要继续添加线程,
            //那就是线程池已经SHUTDOWN,但是队列中还有任务在排队,而且不接受新任务(所以firstTask必须为null)
           //这里还继续添加线程的初衷是,加快执行等待队列中的任务,尽快让线程池关闭
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
               //传入的core的参数,唯一用到的地方,如果线程数超过理论最大容量,如果core是true跟最大核心线程数比较,否则跟最大线程数比较
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //通过CAS自旋,增加线程数+1,增加成功跳出双层循环,继续往下执行
                if (compareAndIncrementWorkerCount(c))
                    break retry;
               //检测当前线程状态如果发生了变化,则继续回到retry,重新开始循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //走到这里,说明我们已经成功的将线程数+1了,但是真正的线程还没有被添加
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //添加线程,Worker是继承了AQS,实现了Runnable接口的包装类
            w = new Worker(firstTask);
            final Thread t = w.thread;这里的t就是worker自己, 构造方法以this传入了自己
            if (t != null) {
               //到这里开始加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //检查线程状态,还是跟之前一样,只有当线程池处于RUNNING,或者处于SHUTDOWN并且firstTask==null的时候,这时候创建Worker来加速处理队列中的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                       //线程只能被start一次
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                      //workers是一个HashSet,添加我们新增的Worker
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                  //启动Worker
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我们看一下重点代码w = new Worker(firstTask);
查看worker的构造器代码

  Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

这里给worker一个thread属性这个线程的具体实现是当前的worker,也就是new Thread(new Runnable(){public void run(){}})
所以在t.start();启动worker的就是启动worker 这个线程,所以就会运行run方法,接下来查看run 方法

  public void run() {
          runWorker(this);
    }


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //取出需要执行的任务,
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果task不是null,或者去队列中取任务,注意这里会阻塞,后面会分析getTask方法
            while (task != null || (task = getTask()) != null) {
               //这个lock在这里是为了如果线程被中断,那么会抛出InterruptedException,而退出循环,结束线程
                w.lock();
                //判断线程是否需要中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //任务开始执行前的hook方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//这里就是直接调用run 方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                       ////任务开始执行后的hook方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;//清空task
                    w.completedTasks++;//完成数添加
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
           //Worker退出
            processWorkerExit(w, completedAbruptly);
        }
    }

while 循环 直接执行线程的run()方法,该线程执行完第一个任务之后不断去获取新的任务,这样子就实现了worker的复用

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,324评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,356评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,328评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,147评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,160评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,115评论 1 296
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,025评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,867评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,307评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,528评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,688评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,409评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,001评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,657评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,811评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,685评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,573评论 2 353

推荐阅读更多精彩内容