线程池(3)

1.什么是:

  • java.util.concurrent.Excutors提供的接口用于实现创建线程池。
  • 降低资源的消耗:降低线程创建和销毁的资源消耗。
  • 提高相应速度:线程的创建时间是T1,执行时间是T2,销毁时间是T3,免去了T1和T3的时间。
  • 提高线程的管理性。

2.线程池的基本架构:

image.png

1.接口(ExcutorService)的主要方法:
java.util.concurrent.ExcutorService是java线程池框架的主要接口,用Future保存任务的运行状态及运算结果,主要方法有:

  • void execute(Runnable) //提交任务到线程池
  • Future<?> submit(Runnable) //提交任务到线程池,并放回Future
  • Future<T> submit(Runnable,T) //提交任务到线程池,并返回Future,第二个参数回作为计算结果封装到Future
  • Future<T> submit(Callable) 提交任务到线程池,并返回Future,call方法的计算结果会封装到Future
  • List<Future<T>> invokeAll(Collection extends Callable tasks) // 批量提交任务,并返回结果
  • T invokeAny(Colleciton extends Callable tasks) //提交一个任务,并返回结果

2.线程池主要调度类(ThreadPoolExecutor):
ThreadPoolExecutor是线程池框架的主要实现类

2.1各个参数的含义:

  • int corePoolSize //线程池中核心线程数;当<corePoolSize,就会创建新线程,=corePoolSize,就会保存到BlockingQueue,如果调用preStartAllThreads(),就会一次性启动corePoolSize个线程数。
  • int maximumPoolSize // 允许的最大线程数,BlockingQueue满了,当<maximumPoolSize,就会再创键新的线程。
  • long keepAliveTime //线程空闲下来后,存活的时间,这个只在>corePoolSize才有用。
  • TimeUnit unit //存活时间单位值
  • BlockingQueue<Runnable> workQueue //保存任务的阻塞队列,当线程池中的线程数量>corePoolSize,就会进入workQueue
  • ThreadFactory threadFactory //创建线程的工厂,给新建的线程赋名
  • RejectedExcutionHandler handler //* 饱和策略,jdk提供了四种:*
    1.AbortPolicy:直接跑出异常(RejectedExcutionException),默认
    2.CallerRunsPolicy:用调用者所在的线程来执行
    3.DiscardOldestPolicy:丢弃阻塞队列里最老的任务
    4.DiscardPolicy:当前任务直接丢弃
    如果需要实现自己的饱和策略,实现RejectedExcutionHandler接口。

2.2线程池的工作顺序:
corePoolSize --> 任务队列 --> maximumPoolSise --> 拒绝策略

2.3提交任务的方式:

  • excute(Runnable command)不需要返回值
  • Future<T> submit(Callable<T> task ) 需要返回值
public class one {
    public static void main(String[] str) throws ExecutionException, InterruptedException {

        two to = new two();
        ExecutorService es = Executors.newCachedThreadPool();
        Future<Integer> submit = es.submit(to);
        es.shutdown();//关闭线程池
        System.out.println("获取到的结果:"+submit.get());
    }

    static class two implements Callable {
        @Override
        public Object call() throws Exception {
            Thread.sleep(2000);
            int result = 0;
            for(int i=0;i<100;i++){
                for(int j=0;j<i;j++){
                    result += j;
                }
            }
            return result;
        }
    }
}

2.4关闭线程池方式:

  • shutdownNow() :设置线程池状态,尝试停止正在运行或者暂停任务的线程。
  • shutdown():设置线程池状态,只会中断所有没有执行任务的线程。

2.5源码解析:

2.5.1.Execute源码:
a).最核心的execute方法,在AbstractExecutorService中并没有实现,直到ThreadPoolExecutor才实现。
b).ExecutorService中的submit(),invokeAll(),invokeAny()都调用了该方法,所以,Execute是核心中的核心:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 如果正在运行的线程数小于corePoolSize,那么将调用addWorker 方法来创建一个新的线程,并将该任务作为新线程的第一个任务来执行。
       当然,在创建线程之前会做原子性质的检查,如果条件不允许,则不创建线程来执行任务,并返回false.  

         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 如果一个任务成功进入阻塞队列,那么我们需要进行一个双重检查来确保是我们已经添加一个线程(因为存在着一些线程在上次检查后他已经死亡)或者
       当我们进入该方法时,该线程池已经关闭。所以,我们将重新检查状态,线程池关闭的情况下则回滚入队列,线程池没有线程的情况则创建一个新的线程。
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
       如果任务无法入队列(队列满了),那么我们将尝试新开启一个线程(从corepoolsize到扩充到maximum),如果失败了,那么可以确定原因,要么是
       线程池关闭了或者饱和了(达到maximum),所以我们执行拒绝策略。

         */
    
    // 1.当前线程数量小于corePoolSize,则创建并启动线程。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
        // 成功,则返回

return;
            c = ctl.get();
        }
    // 2.步骤1失败,则尝试进入阻塞队列,
        if (isRunning(c) && workQueue.offer(command)) {
       // 入队列成功,检查线程池状态,如果状态部署RUNNING而且remove成功,则拒绝任务
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
       // 如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
    // 3. 步骤1和2失败,则尝试将线程池的数量有corePoolSize扩充至maxPoolSize,如果失败,则拒绝任务
        else if (!addWorker(command, false))
            reject(command);
    }

上面流程解析:


image.png
解析步骤:
  • 进来后先做空指针校验
  • workerCountOf()方法可以获取当前线程池中的线程总数与corePoolSize比较大小
  • 如果小于,通过addWorker()方法来执行
  • 如果大于,提交到任务队列等待
  • 如果进入阻塞队列失败,将会把任务交给线程池
  • 如果线程达到最大线程数,任务提交失败,执行拒绝策略

2.5.2 addworker方法源码
execute()方法中添加任务的方式是addworker():

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
     // 外层循环,用于判断线程池状态
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
       // 内层的循环,任务是将worker数量加1
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    // worker加1后,接下来将woker添加到HashSet<Worker>中,并启动worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
         // 如果往HashSet<Worker>添加成功,则启动该线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker(Runnable task , boolean core) 主要任务是创建并启动线程,他会根据当前线程的状态和给定值(core or maximum)来判断是否创建线程。

addWorker提供了四种传参方式,execut使用了其中三种:
  • addWorker(paramRunnable,true):线程数小于corePoolSize时,放一个需要处理的task到workers set中,如果workers set长度超过corePoolSize,就返回false。
  • addWorker(null,false) :放入一个空的task到workers set,长度限制是maximumPoolSize。相当于创建了一个新线程,只是没有立马去分配任务。
  • addWorker(paramRunnable,false): 当队列满时,尝试将新来的task直接放入workers set,而此时线程的长度是maximumPoolSize,如果线程池也满了,就返回false
  • 还有一种没被使用的是addWorker(null,true)

3.常用的线程池

3.1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(corePoolSize : var0, 
                                       maximumPoolSize : var0, 
                                       keepAliveTime : 0L, 
                                       TimeUnit.MILLISECONDS, 
                                       new LinkedBlockingQueue());
    }

public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
    return new ThreadPoolExecutor(var0, 
                                  var0,
                                  0L, 
                                  TimeUnit.MILLISECONDS, 
                                  new LinkedBlockingQueue(), 
                                  var1);
}
  • 固定大小线程池,可以指定线程池大小,该线程池corePoolSize和maximumPoolSize相等,队列阻塞使用的是LinkBlockingQueue,大小为整数最大。
  • 该线程池中线程数量不变,有新任务提交时,如果有空闲线程则立即执行,如果没有,暂时存入阻塞队列。
  • LinkBlockingQueue是一个无界队列,当提交任务很频繁,linkBlockingQueue会迅速膨胀, 存在着耗尽资源的风险。
  • 当线程池空闲时,也不会释放工作线程,需要shutdown来停止。

3.2 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(
                    new ThreadPoolExecutor(1, 1, 0L, 
                                           TimeUnit.MILLISECONDS, 
                                           new LinkedBlockingQueue()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
        return new Executors.FinalizableDelegatedExecutorService(
                  new ThreadPoolExecutor(1, 1, 0L, 
                                         TimeUnit.MILLISECONDS, 
                                         new LinkedBlockingQueue(), var0));
    }
  • 单个线程的线程池,阻塞队列用的是LinkBlockingQueue,同上。

3.3 newCachedThread

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, 
                                      TimeUnit.SECONDS, 
                                      new SynchronousQueue());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
        return new ThreadPoolExecutor(0, 2147483647, 60L, 
                                      TimeUnit.SECONDS,
                                      new SynchronousQueue(), var0);
    }
  • 缓存线程池,缓存的线程默认存活60秒。
  • 线程池的核心线程数corePoolSize为0,最大为integer.max_value。
  • 阻塞队列是SynchronousQueue,是一个直接提交的阻塞队列,总是迫使线程池增加新的线程去执行任务。
  • 当没有线程执行时,空闲(keepAliveTime)时间超过60秒,工作线程将被终止收回。
  • 当有任务提交,如果没有空闲线程,则创建新的线程,如果同时有大量任务提交,线程池会新增等量线程处理任务,很可能很快会耗尽系统资源。

3.4 newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int var0) {
        return new ScheduledThreadPoolExecutor(var0);
    }

    public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
        return new ScheduledThreadPoolExecutor(var0, var1);
    }
  • 定时线程池,该线程通常用于周期性的去执行任务,比如同步数据。
  • scheduleAtFixedRate:以固定的频率去执行任务,周期指:每次成功执行任务之间的间隔。
  • schedultWithFixedDelay:以固定的延时去执行任务,延时指:上次成功执行后和下次之间的间隔时间。

4.常用线程池实例

4.1 newFixedThreadPoll

public static void main(String[] str){

        ExecutorService esFix = Executors.newFixedThreadPool(5);

        for(int i=0;i<50;i++){
            final int j = i;
            esFix.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(j);
                }
            });
        }
        esFix.shutdown();//释放资源,否则不释放
    }

4.2 newCachedThreadPool

public static void main(String[] str){

    ExecutorService esFix = Executors.newCachedThreadPool();

    for(int i=0;i<50;i++){
        final int j = i;
        esFix.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            }
        });
    }
}
}
  • 这里没有调用shutdown()方法,执行完毕,60秒后自动关闭释放资源

4.3 newSingleThreadPool

public static void main(String[] str){

    ExecutorService esFix = Executors.newSingleThreadExecutor();

    for(int i=0;i<50;i++){
        final int j = i;
        esFix.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            }
        });
    }
        esFix.shutdown();//释放资源,否则不释放
}
}

4.4 newScheduledThread

public static void main(String[] str){

    ScheduledExecutorService esFix = Executors.newScheduledThreadPool(2);

            esFix.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"执行了您呢");
            }
        },0,2, TimeUnit.SECONDS);
}
}

5.合理配置线程数

根据任务的性质来选取策略:

  • 计算密集型:比如加密、大数据分解、正则……,线程数要适当小点,最大为机器的cpu核心数+1(防止页缺失),cpu核心数计算:Runtime.getRuntime().avalibleProcessors()
  • IO密集型:读文件、数据库链接、网络通讯等,线程数要适当大点,推荐为机器的核心数x2
  • 混合型:尽量拆分,IO密集型>>计算密集型,拆分意义不大,队列上应该选择有界,无界可能导致oom

6.线程工厂

  • Executors线程池如果不指定线程工厂,会使用默认的DefaultThreadFactory,是非守护线程。
  • 使用自定义的线程工厂可以做很多事,比如:跟踪线程池在何时创建 了多少线程、定义线程名称和优先级。
  • 如果将新建的线程都设置成守护线程,当主线程退出后,将会强制销毁线程池。

下面记录了线程的创建,并将所有线程都设置为守护线程:

public class ThreadFactoryDemo {
    public static class MyTask1 implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis()+"Thrad ID:"+Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
          MyTask1 task = new MyTask1();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("创建线程"+t);
                return  t;
            }
        });
        for (int i = 0;i<=4;i++){
           es.submit(task);
        }
    }
}

7.阿里规范:

不建议直接使用Executors创建线程,而是通过ThreadPoolExecutor手动创建,使用Executors的两个弊端:

  • newFixedThreadPool 和 newSingleThreadPool,主要问题是堆积的请求处理队列中是无界的,可能出现oom问题。
  • newCachedThreadPool 和 newScheduledThreadPool,主要问题是线程最大数是Integer.MAX_VALUE,可能出现创建非常多的线程,出现oom

8.手动创建线程池的结果注意点:

1.任务独立。如何任务依赖于其他任务,那么可能产生死锁。例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。
2.合理配置阻塞时间过长的任务。如果任务阻塞时间过长,那么即使不出现死锁,线程池的性能也会变得很糟糕。在Java并发包里可阻塞方法都同时定义了限时方式和不限时方式。例如
Thread.join,BlockingQueue.put,CountDownLatch.await等,如果任务超时,则标识任务失败,然后中止任务或者将任务放回队列以便随后执行,这样,无论任务的最终结果是否成功,这种办法都能够保证任务总能继续执行下去。
3.设置合理的线程池大小。只需要避免过大或者过小的情况即可,上文的公式线程池大小=NCPU *UCPU(1+W/C)。
4.选择合适的阻塞队列。newFixedThreadPool和newSingleThreadExecutor都使用了无界的阻塞队列,无界阻塞队列会有消耗很大的内存,如果使用了有界阻塞队列,它会规避内存占用过大的问题,但是当任务填满有界阻塞队列,新的任务该怎么办?在使用有界队列是,需要选择合适的拒绝策略,队列的大小和线程池的大小必须一起调节。对于非常大的或者无界的线程池,可以使用SynchronousQueue来避免任务排队,以直接将任务从生产者提交到工作者线程。

9.看下facebook工程师是如何定义线程池的:

private static ExecutorService createDefaultExecutorService(Args args) {
        SynchronousQueue executorQueue = new SynchronousQueue();

        return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
                executorQueue);
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容