说说 Java 线程池

一、引言

池的概念大家并不陌生,数据库连接池、线程池等...大体来说,有三个优点:

  1. 降低资源消耗。
  2. 提高响应速度。
  3. 便于统一管理。

以上是 “池化” 技术的相同特点,至于他们之间的不同点这里不讲,两者都是为了提高性能和效率,抛开实际做连连看找不同,没有意义。

同样,类比于线程池来说:

  • 降低资源消耗:
    重复利用线程池中已经创建的线程,相比之下省去了线程创建和销毁的性能消耗。
  • 提高响应速度:
    当有任务创建时,不必等待线程创建,可以立即执行。
  • 便于统一管理:
    使用线程池,可以对线程统一管理,对线程的执行状态做统一监控。

二、线程池的使用

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
                          RejectedExecutionHandler handler);
1、关键参数
  • corePoolSize 核心线程数
    当向线程池中提交一个任务时,如果线程池中的线程数量小于核心线程数,即使存在空闲线程,也会新建一个线程来执行当前任务,直到线程数量大于或等于核心线程数。
  • maximunPoolSize 最大线程数
    当任务队列满了,线程池中的线程数量小于最大线程数时,创建新线程执行任务。对于无界队列,忽略该参数。
  • keepAliveTime 线程存活时间
    大于核心线程数的那一部分线程的存活时间,如果这部分线程空闲超过这段时间,则进行销毁。
  • workqueue 任务队列
    线程池中的线程数大于核心线程数时,将任务放入此队列等待执行。
  • threadFactory 线程工厂
    用于创建线程,工厂使用 new Threa() 的方式创建线程,并为每个线程做统一规则的命名:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。
  • handler 饱和策略
    当线程池和队列都满了,则根据此策略处理任务。
2、任务队列类型
名称 描述
ArrayBlockingQueue 基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue 基于链表结构的阻塞队列,此队列按 FIFO (先进先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。Executors.newFixedThreadPool( ) 使用了这个队列。
SynchronousQueue 不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool( ) 使用了这个队列。
PriorityBlockingQueue 具有优先级的无限阻塞队列。
3、饱和策略类型
策略名称 特性
AbortPolicy 默认的饱和策略,直接抛出 RejectedExecutionException 异常
DiscardPolicy 不处理,直接丢弃任务
CallerRunsPolicy 使用调用者的线程执行任务
DiscardOldestPolicy 丢弃队列里最近的一个任务,执行当前任务

同时,还可以自行实现 RejectedExecutionHandler 接口来自定义饱和策略,比如记录日志、持久化等等。

  • void execute(Runnable command)
    ❀ 示例:
    ThreadFactory namedThreadFactory =
    new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    ExecutorService executor =
    new ThreadPoolExecutor(
    10,   
    1000,
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(10),
    namedThreadFactory,
    new ThreadPoolExecutor.AbortPolicy());
    executor.execute(
    () -> {
    System.out.println(1111);
    });
    

注意使用 execute 方法提交任务时,没有返回值。

  • Future<?> submit(Runnable task)
    ❀ 示例:

    Future<Integer> future = executor.submit(() -> {
          return 1 + 1;
        });
    Integer result = future.get();
    

    还可以使用 submit 方法提交任务,该方法返回一个 Future 对象,通过 Future#get( ) 方法可以获得任务的返回值,该方法会一直阻塞知道任务执行完毕。还可以使用 Future#get(long timeout, TimeUnit unit) 方法,该方法会阻塞一段时间后立即返回,而这时任务可能没有执行完毕。

5、关闭线程池

ThreadPoolExecutor 提供了 shutdown( ) 和 shutdownNow( ) 两个方法关闭线程池。原理是首先遍历线程池的工作线程,依次调用 interrupt( ) 方法中断线程,这样看来如果无法响应中断的任务就不能终止。

两者区别是:

  • shutdownNow( ) 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
  • shutdown( ) 首先将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

如果调用了其中一种方法,isShutdown 方法就会返回 true。当所有的任务都已关闭后, 才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。实际应用中可以根据任务是否一定要执行完毕的特性,决定使用哪种方法关闭线程池。

6、合理的配置线程池

通常我们可以根据 CPU 核心数量来设计线程池数量

可以通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的物理核心数量。值得注意的是,如果应用运行在一些 docker 或虚拟机容器上时,该方法取得的是当前物理机的 CPU 核心数。

  • IO 密集型 2nCPU

  • 计算密集型 nCPU+1

    • 其中 n 为 CPU 核心数量。
    • 为什么加 1:即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。

三、线程池的运行过程

image.png

当提交一个新任务时,线程池的处理步骤:

  1. 判断当前线程池内的线程数量是否小于核心线程数,如果小于则新建线程执行任务。否则,进入下个阶段。
  2. 判断队列是否已满,如果没满,则将任务加入等待队列。否则,进入下个阶段。
  3. 在上面基础上判断是否大于最大线程数,如果是根据响应的策略处理。否则,新建线程执行当前任务。

线程池的源码比较简单易懂,感兴趣的小伙伴可以自行查看 java.util.concurrent.ThreadPoolExecutor,在线程池中每个任务都被包装为一个一个的 Worker ,下面简单看下 Worker 的 run( ) 方法:

try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }

可以看到不断的循环取出 Task 并执行,而在任务的执行前后,有 beforeExecute 和 afterExecute 方法,我们可以实现两个方法实现一些监控逻辑。除此之外还可以集合线程池的一些属性或者重写 terminated() 方法在线程池关闭时进行监控。

四、常见的几种线程池实现

Executors 中提供了集中常见的线程池,分别应用在不同的场景。

  • FixThreadPool 固定数量的线程池,适用于对线程管理,高负载的系统
  • SingleThreadPool 只有一个线程的线程池,适用于保证任务顺序执行
  • CacheThreadPool 创建一个不限制线程数量的线程池,适用于执行短期异步任务的小程序,低负载系统
  • ScheduledThreadPool 定时任务使用的线程池,适用于定时任务

上面几种线程池的特性主要依赖于 ThreadPoolExecutor 的几个参数来实现,不同的核心线程数量,以及不同类型的阻塞队列,同时我们还可以自行实现自己的线程池满足业务需求。

值得注意的是,并不推荐使用 Executors 创建线程池,详见下:

  • Executors.newFixedThreadPool(int nThread)
public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>());
 }

❀ 继续来看 LinkedBlockingQueue :

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
}

可以看到使用 LinkedBlockingQueue 创建的是 Integer.MAX_VALUE 大小的队列,会堆积大量的请求,从而造成 OOM

  • Executors.newSingleThreadExexutor( )
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

同样,使用的 LinkedBlockingQueue ,一样的情况

  • Executors.newCachedThreadPool( )
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

代码课件线程池使用的最大线程数是 Integer.MAX_VALUE ,可能会创建大量线程,导致 OOM

  • Executors.newScheduleThreadPool()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

和上面是一样的问题,最大线程数是 Integer.MAX_VALUE

所以原则上来说禁止使用 Executors 创建线程池, 而使用 ThreadPoolExecutor 的构造函数来创建线程池。

五、结语

线程池在开发中还是比较常见的,结合不同的业务场景,结合最佳实践配置正确的参数,可以帮助我们的应用性能得到提升。

欢迎访问个人博客 获取更多知识分享。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352