Java线程池原理分析

【1.0】 Executors类提供了几种不同特性的线程池,其主要实现类都离不开ThreadPoolExecutor,先看一下ThreadPoolExecutor的构建方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

【1.1】 参数说明

corePoolSize保持最小的核心线程数,不会被回收,除非allowCoreThreadTimeOut被设置

maximumPoolSize 允许被执行时最大的线程数,当核心线程池数和队列满了,会加入最大线程数中执行

keepAliveTime 当线程数大于核心线程数,超过的线程保持的时间,和unit搭配使用

unit 缓存时间单位

workQueue任务队列,可进行任务阻塞等待,保存将要执行的队列

threadFactory创建新线程的的工厂对象

handler 线程数或者队列容量达到极限,造成线程池阻塞是的处理器

【1.2】 执行方法

当调用 execute() 方法添加一个任务时,线程池会做如下判断:

  1. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
  2. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
  3. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
  4. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
      
        int c = ctl.get();
        //检查执行任务数
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//加入核心线程池执行
                return;
            c = ctl.get();
        }
        //判断线程池状态,加入任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //判断线程池是否关闭
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//加入队列失败,会加入最大线程数执行
            reject(command);
    }
    
    //  wc >= (core ? corePoolSize : maximumPoolSize))  线程数不超过maximumPoolSize

【1.3】 线程池会创建一样Worker来封装Thread,然后不断的执行队列的任务

private boolean addWorker(Runnable firstTask, boolean core) {

        ......

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动线程
                    t.start();
                    workerStarted = true;
                }
            }

        ......
    }

【1.4】 Worker继承了Runnable的接口,在初始化的时候会设置到新建的线程,当start线程就中心runWorker,将不循环断获取任务列表的任务执行,代码如下

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
        
     Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
    public void run() {
            runWorker(this);
        }

【1.5】 循环执行任务


final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //循环获取任务,如果队列数据为空,线程会进入等待
            while (task != null || (task = getTask()) != null) {
                w.lock();
                ......
            }
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

【2.0】 常用的任务队列

【2.1】 ArrayBlockingQueue 数组储存,有界队列,超过会从队头覆盖对象,FIFO

    final Object[] items;

    private void insert(E x) {
        items[putIndex] = x;
        //计算下个index
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }
    
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

【2.2】 LinkedBlockingQueue 单向链表储存,有界列表,默认数是2的31次方2147483648,

    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }
    
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

     public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

【2.3】 DelayedWorkQueue 数组存储,队长可自增长,周期性任务使用

public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            //对象必须的RunnableScheduledFuture的子类
            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();//自动增长
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

【2.4】 SynchronousQueue使用队列或者栈进行储存

  public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }

【3.0】 DefaultThreadFactory 线程创建工厂类

public Thread newThread(Runnable r) {
            //直接实例一个线程
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }

【4.0】 Executors为我们通过一些实例不同特性的线程池方法,主要有以下几个

【4.1】 Executors.newScheduledThreadPool() 可执行周期性任务的线程池
实例是ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor继承ThreadPoolExecutor,队列使用LinkedBlockingQueue

 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
  public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    
      ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;//执行时间
            this.period = period;//重复次数
            this.sequenceNumber = sequencer.getAndIncrement();
        }

【4.2】 Executors.newCachedThreadPool()线程数可自动扩展,闲着线程60秒将会被回收

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

【4.3】 Executors.newFixedThreadPool() 执行固定线程数的线程池,核心线程数等于最大线程数,且有限,队列无限

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

【4.4】 Executors.newSingleThreadExecutor() 单线程的线程池,FinalizableDelegatedExecutorService 只是包装了 ThreadPoolExecutor,对象在回收时会关闭线程池

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
     static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

【4.5】 Executors.newSingleThreadScheduledExecutor()
具有单线程池和周期性的特点,线程池包装了ScheduledThreadPoolExecutor,

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355

推荐阅读更多精彩内容