Java 线程池的使用 ThreadPoolExecutor

多线程

创建一个多线程,需要重写 Runnable run() 方法, 然后通过 Thread.start() 进行执行,
如果频繁需要使用到多线程,我们一个一个的创建开销会很大,而线程池就是解决这个问题的,就像【数据库的连接池】一样。

Thread thread = new Thread(() -> System.out.println("Thread Hello"));
thread.start();

Java 线程池

线程池 管理线程,包括创建,检测,替换,关闭等。

java.util.concurrent 在并发编程中很常用的实用工具类

ThreadPoolExecutor 最基础的父类,几种线程池的操作都在该类中

Executor 执行已提交的 Runnable 任务的对象。

ExecutorService 接口(实现了Executor) 提供管理终止 Executor 的方法。

Executors 工厂类,提供 Exexutor,ExecutorService,ScheduleExecutorService,ThreadFactory,Callable类的工厂和实用方法。

ThreadFactory 创建 Thread 的工厂类

ExecutorService 四种线程池类型

CachedThreadPool

Executors.newCachedThreadPool() 缓存线程池 (重用已经构造过的线程池, 适用于 执行许多短期异步任务的程序)

ScheduledThreadPool

Executors.newScheduledThreadPool() 定时(调度)线程池 (可以在 线程执行前后 进行时间延迟, scheduleAtFixedRate 方法, scheduleWithFixedDelay 方法)

FixedThreadPool

Executors.newFixedThreadPool() 重用固定数量的线程池 (创建的时候固定线程池数量, 线程都处于活动状态, 会等待线程完成, 线程失败或终止了, 将会替换一个新的线程)

SingleThreadExecutor

Executors.newSingleThreadExecutor() 单个线程的线程池 (只创建一个单独的线程, 可适用于处理时间比较长的方法)

Executors Code
import org.apache.commons.lang3.time.DateFormatUtils;
import org.junit.jupiter.api.Test;
import java.util.Date;
import java.util.concurrent.*;

/**
 * ThreadPoolExecutor
 *
 * 四种线程池的特性和使用方式
 *      1. SingleThreadExecutor
 *      2. FixedThreadExecutor
 *      3. CacheThreadExecutor
 *      4. ScheduledThreadExecutor
 *
 * @author zhou
 */
public class TestExecutorService {

    /**
     * 单线程执行器
     *
     * 只有一个线程的线程池
     * 执行结果: 需要等待 仅有的线程 执行完, 才能执行下一次遍历。
     */
    @Test
    public void testSingleThreadExecutor() throws InterruptedException, ExecutionException {
        // new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println("thread-name: " + Thread.currentThread().getName() + ", time: " + DateFormatUtils.format(new Date(), "HH:mm:ss"));
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(6000);
    }

    /**
     * 可重用固定线程数的线程池
     *
     * 初始化时创建固定数量的线程, 之后不再创建, 除非 有线程不可用, 就进行创建并替换。
     * 执行结果: 会重复使用线程池中的线程, 而不会再创建新的线程, 没有线程可用时, 会一直等待有空闲的线程, 并不会重新创建。
     */
    @Test
    public void testFixedThreadExecutor() throws InterruptedException {
        // new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
        ExecutorService executor = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println("thread-name: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(6000);
    }

    /**
     * 缓存线程的线程池
     *
     * 由于使用的是 SynchronousQueue, 本身只维护一个线程, 插入的时候必须有线程进行移除操作才能进行插入
     * 终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
     * 初始化为 0, 最大为 Integer.MAX_VALUE (2147483647)
     *
     * 执行结果: 没有线程可用就创建, 并不会进行等待。
     */
    @Test
    public void testCacheThreadExecutorService() throws InterruptedException {
        // new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(6000);
    }

    /**
     * 定时(调度)线程池
     *
     */
    @Test
    public void testScheduledThreadExecutor() throws InterruptedException, ExecutionException, TimeoutException {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

        // 下方 四个方法都能获取 线程 返回的数据。

        // 1. schedule() 延迟启动时间
//        System.out.println("start");
//        for (int i = 0; i < 3; i++) {
//            executor.schedule(() -> System.out.println("运行线程"), 1, TimeUnit.SECONDS);
//        }
//        Thread.sleep(3000);

        // 2. schedule() 延迟启动时间, 通过 <V> 泛型来指定返回的参数的类型
//        executor.schedule(() -> {
//            System.out.println("test");
//        }, 1, TimeUnit.SECONDS);
//        Thread.sleep(4000);

        // 3. scheduleAtFixedRate() 初始化延迟和定时调度延迟
        // command 运行的方法
        // initialDelay 初始化延迟时间
        // period  线程执行前延迟时间
        // unit 统一的时间单位
        // 如果第一个线程的执行时间超过第二个线程延迟的时间, 则等待第一个线程执行完, 再执行第二个线程。
//        executor.scheduleAtFixedRate(() -> {
//            System.out.println(System.currentTimeMillis());
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }, 0, 1, TimeUnit.SECONDS);
//
//        Thread.sleep(5000);

        // 4. scheduleWithFixedDelay() 固定延迟调度
        // command 运行的方法
        // initialDelay 初始化延迟时间
        // delay 线程延期时间
        // unit 统一的时间单位
        // 和上方方法唯一不同的就是 delay 的处理
        // 如果第一个线程的执行时间超过第二个线程延迟的时间, 则第二个线程的执行时间为 (第一个线程的执行时间 + delay)
//        executor.scheduleWithFixedDelay(() -> {
//            System.out.println(System.currentTimeMillis());
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }, 0, 1, TimeUnit.SECONDS);
//        Thread.sleep(5000);

        // 5. 需要获取线程执行完后返回的参数的对象/方法解析
        ScheduledFuture<String> schedule = executor.schedule(() -> {
            return String.valueOf(System.currentTimeMillis());
        }, 2, TimeUnit.SECONDS);

        // getDelay(计算时间单位) 执行到该方法的时候, 执行的线程还有多久执行完, 返回 0 或者 负数的时候代表线程已经执行完成。
        // System.out.println(schedule.getDelay(TimeUnit.SECONDS));

        // get()   获取线程执行完后的返回值
        // System.out.println(schedule.get());

        // get(超时时间, 时间单位)  最长等待多长时间接收到线程执行完后的返回值
        // 抛出四个异常
        // 线程取消异常
        // 线程执行异常
        // 线程等待时被中断异常
        // 线程超时异常 (线程还在执行中, 我们就去获取的情况)
        // System.out.println(schedule.get(1, TimeUnit.SECONDS));
        // Thread.sleep(4000);


    }

}

ThreadPoolExecutor

ThreadPoolExecutor 参数

 1. corePoolSize 线程池中核心线程数
 2. maximumPoolSize  线程池中最大线程数
 3. keepAliveTime 空闲时间, 当线程池数量超过核心线程池时, 多余的空闲线程存活的时间
 4. unit 空闲时间单位
 5. workQueue 等待队列, 线程池中的线程数超过核心线程数时, 任务将放在等待队列中
 6. threadFactory 线程工厂
 7. handler 拒绝策略, 当线程池和等待队列都满了之后, 需要通过该对象的回调函数进行回调处理

workQueue 等待队列

 1. ArrayBlockingQueue 队列是有界的, 需要初始化队列数量, 基于数组实现的阻塞队列
 2. LinkedBlockingQueue 队列可以有界和无界, 基于链表实现的阻塞队列
 3. SynchronousQueue 不存储元素的阻塞队列, 每个插入操作必须等到另一个线程调用移除操作, 否则插入操作一直处于阻塞状态
 4. PriorityBlockingQueue 带优先级的无界阻塞队列
 5. LinkedBlockingDeque 队列可有界和无界, 基于双向链表实现的阻塞队列
 
@Test
public void testWorkQueue() {
    ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(20);
    LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
    SynchronousQueue synchronousQueue = new SynchronousQueue();
    PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
    LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();

    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
            linkedBlockingDeque);

    executor.execute(() -> {
        System.out.println("test");
    });

}

handler 拒绝策略

 自带四种拒绝策略, 可自定义拒绝策略
 1. AbortPolicy (默认) 处理程序遭到拒绝将抛出运行时异常 RejectedExecutionException
 2. CallerRunPolicy 由调用该方法的线程执行 (比如: 我主线程调用的, 直接交给主线程执行)
 3. DiscardPolicy 直接将任务删除, 不做任何处理
 4. DiscardOldestPolicy 将位于工作队列的头部任务删除, 再执行当前任务 (再次失败, 再次删除和重试)
 5. 自定义 实现 RejectedExecutionHandler 重写 rejectedExecution 方法
 
@Test
public void handler() throws InterruptedException {
    ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
    ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    // 看时间戳的变化 (在两秒钟的时间内, 丢弃了很多线程)
    ThreadPoolExecutor.DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
    ThreadPoolExecutor.DiscardOldestPolicy discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
    MyRejectedHandler myRejectedHandler = new MyRejectedHandler();

    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(8), myRejectedHandler);

    for (int i = 0; i < 6; i++) {
        executor.execute(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ":" + System.currentTimeMillis() + ":" + executor.getQueue().size());
        });
    }
    Thread.sleep(50000);
}

// 自定义拒绝策略
public static class MyRejectedHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("阻塞了");
    }
}

线程执行前, 执行后, 关闭和终止的方法回调

回调方法
 1. beforeExecute 执行线程之前
 2. afterExecute  执行线程之后
 3. terminated 线程池结束调用

 关闭线程池
 1. shutdown() 将线程池状态置为 SHUTDOWN, 按过去已提交任务的顺序发起一个有序的关闭。
 2. shutdownNow() 将线程池状态置为 SHUTDOWN, 通过 Thread.interrupt() 方法停止所有线程, 并返回从未开始执行的任务列表(也就是队列中的线程),
                  interrupt() 方法不能保证能停止正在运行的任务, 但会尽力尝试, 所以有些无法响应中断的线程, 可能永远无法终止。
 
 判断线程池是否关闭
 1. isShutdown() 线程已关闭, 返回 true
 2. isTerminating() 当 shutdown 和 shutdownNow 正在终止但未完成中的过程中, 返回 true
 3. isTerminated() 如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。

 
@Test
public void testAOPExecuteMethod() {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 0, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(15)) {
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("执行线程之前");
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println("执行线程之后" + t);
        }

        @Override
        protected void terminated() {
            System.out.println("线程终止");
        }
    };
    executor.execute(() -> {
        System.out.println("执行线程");
    });
    executor.shutdown();
    System.out.println(executor.isTerminated());
    System.out.println(executor.isShutdown());
    System.out.println(executor.isTerminating());

    // executor.shutdownNow();

    executor.execute(() -> {
        System.out.println("执行线程");
    });
}

线程池统计信息

 1. getActiveCount() 返回处于活动状态的线程的大致数量
 2. getCompletedTaskCount() 返回已完成执行的任务的大致总数。
 3. getCorePoolSize() 返回核心线程数
 4. getKeepAliveTime(TimeUnit unit) 返回线程保持活动的时间, 这个就是 keepAliveTime
 5. getLargestPoolSize() 返回线程池中同时存在的最大线程数
 6. getMaximunPoolSize()  返回线程池中最大线程数
 7. getPoolSize() 返回线程池中的当前线程数。
 8. getQueue() 返回此执行程序使用的任务队列
 9. getRejectedExecutionHandler() 返回拒绝策略
 10. getTackCount() 返回已计划执行的任务的大致总数

其他方法

 1. prestartAllCoreThreads() 启动所有核心线程, 使其处于等待工作的空闲状态  return 已启动的线程数
 2. prestartCoreThread()     启动核心线程(一个), 使其处于等待工作的空闲状态      return 启动了线程, 返回 true
 3. purge() 尝试从工作队列移除所有已取消的 Future 任务
 4. remove() 从执行程序的内部队列中移除此任务 (如果存在), 如果尚未开始, 则其不再运行

运行流程解析

首先需要记住几个基本参数和作用, 因为运行的过程都需要通过这些参数。

  1. coreSize 核心线程数量
  2. maximumPoolSize 最大线程数量
  3. workQueue 线程等待队列
  4. keepAliveTime 线程过期时间
  5. handler 拒绝策略

核心参数

Worker 译为: 工人, 维护运行任务的线程中断控制状态。

HashSet<Worker> workers 工人集合, 设置包含池中的所有工作线程。只有在持有主锁时才能访问

ReentrantLock mainLock 主锁, 对工人集合和相关簿记的访问保持锁定。

线程状态

RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED

execute(Runnable command) 方法解析

方法处理分为三种情况, 也就是针对 command 的处理。

  1. 添加到核心线程中 (当核心线程数足够时)
  2. 添加到等待队列中 (超出核心线程数时)
  3. 拒绝线程 (等待队列满了, 交给拒绝策略进行处理)

addWorker(Runnable firstTask, boolean core) 方法解析

firstTack 需要运行的线程
core 以 corePoolSize 为界限, 小于 corePoolSize = true, 大于 corePoolSize = false

返回值和处理分为以下情况:

  1. 如果池已停止或有资格关闭,则此方法返回false
  2. 如果线程工厂在被请求时没有创建一个线程,那么它也会返回false
  3. 创建 Worker 存储 firstTask, 并获取锁, 添加创建的 Worker 到 workers 中, 释放锁, 如果是添加成功, 则启动线程。

BlockingQueue<Runnable> workQueue 译为: 工作队列
用于保存任务并将任务传递给工作线程的队列。

运行流程

workers 核心线程数量是 coreSize, 最大的线程数量是 maximumPoolSize。

添加线程的正常流程:
创建 Worker 存储线程, 并将 Worker 存储到 workers 中, 启动并运行线程。

添加线程的等待流程:
当运行的线程数 大于 maximumPoolSize 并 小于 workQueue 最大数量时, 将线程存储到 workQueue 中, 等待线程池空闲时取出运行。

添加线程的拒绝流程:
当线程池运行的线程数等于 maximumPoolSize, workQueue 已经超过限制的时候, 通过 handler 拒绝策略处理拒绝的线程。

参考博文

Java线程池实现原理及其在美团业务中的实践 [好文章]
https://mp.weixin.qq.com/s?__biz=MjM5NjQ5MTI5OA==&mid=2651751537&idx=1&sn=c50a434302cc06797828782970da190e&chksm=bd125d3c8a65d42aaf58999c89b6a4749f092441335f3c96067d2d361b9af69ad4ff1b73504c&scene=0&xtrack=1&key=ae016389609d723a06c894a78ed921fef9476d4e06e1197bfef300aae587bd68cf0051ae83f3188aaca7aa78ea7c4f8521efe58e2b460cf3d745151a7dd649853fd77594f5a72eb20bd11dce97a2b8fb&ascene=1&uin=MTQ3NDMzNDk3NQ%3D%3D&devicetype=Windows+10&version=62080079&lang=zh_CN&exportkey=A0ndiTybr05ZZONXp8FpJ20%3D&pass_ticket=R39BuOO9md3f7V%2Bau%2F1g5tqzZnJX6A%2F%2BwVnp3tnVqZXp1bBf2H2yftgivyEwXllL

https://www.jianshu.com/p/7ab4ae9443b9

https://www.cnblogs.com/dolphin0520/p/3932921.html

https://blog.csdn.net/qq_22764659/article/details/83620730

https://www.cnblogs.com/CarpenterLee/p/9558026.html

如果存在问题,请在评论告知我,我会及时改进。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容