Java线程池基础详解

为什么要使用线程池

  1. 反复创建线程开销大
  2. 过多的线程会占用太多内存

线程池的好处

  1. 加快响应速度
  2. 合理利用CPU和内存
  3. 统一管理

线程池适合应用的场合

  1. 服务器接受到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率
  2. 在开发中,如果需要创建5哥以上的线程,那么就可以使用线程池来管理

线程池构造函数的参数

参数名 类型 含义
corePoolSize int 核心线程数
maximumPoolSize int 最大线程数
keepAliveTime long 保持存活时间
workQueue BlockingQueue 任务存储队列
threadFactory ThreadFactory 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程
Handler RejectedExecutionHandler 由于线程池无法接受你所提交的任务的拒绝策略
corePoolSize

指的是核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务

maximumPoolSize

线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是maximumPoolSize

keepAliveTime

如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止

ThreadFactory

新的线程都是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory,创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。通常使用默认的ThreadFactory就可以了。

workQueue

有3种最常见的队列类型
1.直接交接:SynchronousQueue
2.无界队列:LinkedBlockingQueue
3.有界队列:ArrayBlockingQueue

线程池添加线程规则

  1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
  2. 如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列
  3. 如果队列已满,并且线程数小于maximumPoolSize,则创建一个新线程来运行任务
  4. 如果队列已满,并且线程数大于或等于maximumPoolSize,则拒绝该任务
线程池添加线程规则

增减线程的特点

  1. 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池;
  2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它;
  3. 通过设置maximumPoolSize为很高的值(例如Integer.MAX_VALUE),可以允许线程池容纳任意数量的并发任务;
  4. 是只有在队列填满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。

线程池应该手动创建还是自动创建

newFixedThreadPool:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • 通过源码可以看出,newFixedThreadPool 使用的是 LinkedBlockingQueue,由于 LinkedBlockingQueue 是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM

newSingleThreadExecutor:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • 通过源码可以看出,这里和上面的 newFixedThreadPool 的原理基本一样,只不过是把线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存。

newCachedThreadPool:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 可缓存线程池
  • 特点:无界限线程池,具有自动回收多余线程的功能(默认时间是60秒)
  • 弊端:在于第二个参数 maximumPoolSize 被设置为了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM。

newScheduledThreadPool:

  • 支持定时及周期性任务执行的线程池
正确的创建线程池的方法

根据不同的业务场景,选择合适的方式,最后是我们自己手动创建线程池,自己设置线程池参数。

线程池里的线程数量设定为多少比较合适

  1. CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的 1-2 倍左右
  2. 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考 Brain Goetz 推荐的计算方法:
    线程数=CPU核心数x(1+平均等待时间/平均工作时间)

停止线程池的正确方法

  1. shutdown
    shutdown 并不是立即粗暴的结束线程,线程池仍然会继续执行已创建的任务,但是不会接收新的任务了。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ShutDownDemo {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();
        // 1500毫秒后此处会抛出异常 RejectedExecutionException
        executorService.execute(new ShutDownTask());
    }
    
}

class ShutDownTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述代码在1.5秒后执行 executorService.shutdown(); 之后,再执行
executorService.execute(new ShutDownTask()); 则会报错抛出异常 RejectedExecutionException

  1. isShutdown
    返回true或false告诉我们线程是否已经停止
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        System.out.println(executorService.isShutdown());  // 打印false
        executorService.shutdown();
        System.out.println(executorService.isShutdown());  // 打印true

        System.out.println(executorService.isTerminated());

    }
  1. isTerminated
    返回true或false告诉我们线程是否已经完全停止
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();
        // 打印false,因为线程还没有完全执行完
        System.out.println(executorService.isTerminated());
    }
  1. awaitTermination
    awaitTermination 有三种情况会返回,没返回之前都是阻塞
    第一种情况:所有任务都执行完毕了
    第二种情况:等待的时间到了
    第三种情况:等待的过程中被打断了,会抛出 InterruptedException
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        boolean b = executorService.awaitTermination(3L, TimeUnit.SECONDS);
        // 会打印false,因为3秒钟不够线程全部执行完
        System.out.println(b);
    }
  1. shutdownNow
    正在执行任务的线程继续执行,等待队列中的线程直接结束并返回
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ShutDownDemo {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        // 等待1.5秒
        Thread.sleep(1500);
        // 这里会返回已经放到线程池队列中还没有执行的Runnable
        List<Runnable> runnables = executorService.shutdownNow();
    }

}

class ShutDownTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // 接收被中断信号
            System.out.println(Thread.currentThread().getName() + "被中断了!");
        }
    }
}

executorService.shutdownNow();会返回已经放到线程池队列中还没有执行的Runnable
运行结果:

...
...
...
pool-1-thread-8
pool-1-thread-7
pool-1-thread-6
pool-1-thread-10
pool-1-thread-1
pool-1-thread-3
pool-1-thread-1被中断了!
pool-1-thread-4
pool-1-thread-2
pool-1-thread-7被中断了!
pool-1-thread-8被中断了!
pool-1-thread-10被中断了!
pool-1-thread-6被中断了!
pool-1-thread-9被中断了!
pool-1-thread-5被中断了!

Process finished with exit code 0

线程池任务太多,怎么拒绝

  • 拒绝时机

  1. 当Executor关闭时,提交新任务会被拒绝
  2. 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时
  • 拒绝策略

  1. AbortPolicy,直接抛出一个异常
  2. DiscardPolicy,悄悄的把任务丢弃,没有通知
  3. DiscardOldestPolicy,把队列中最老的那个任务丢弃,新任务加进来
  4. CallerRunsPolicy,谁提交的任务谁去执行(比如说主线程给线程池提交了一个任务,但是线程池已经饱和无法再执行了,这时则会让提交任务的主线程去执行这个任务)

线程池钩子函数

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 演示每个任务执行前后放钩子函数
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    private boolean isPaused;
    private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行了");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }

        Thread.sleep(2000);
        // 等待2秒,执行暂停方法
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了!");
        Thread.sleep(2000);
        // 再等待2秒,执行恢复方法
        pauseableThreadPool.resume();
        System.out.println("线程池被恢复了!");
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 暂停线程
     */
    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 恢复线程
     */
    private void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
}

运行结果:


钩子函数执行效果
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。