多线程
创建一个多线程,需要重写 Runnable run() 方法, 然后通过 Thread.start() 进行执行,
如果频繁需要使用到多线程,我们一个一个的创建开销会很大,而线程池就是解决这个问题的,就像【数据库的连接池】一样。
Thread thread = new Thread(() -> System.out.println("Thread Hello"));
thread.start();
Java 线程池
线程池 管理线程,包括创建,检测,替换,关闭等。
java.util.concurrent 在并发编程中很常用的实用工具类
ThreadPoolExecutor 最基础的父类,几种线程池的操作都在该类中
Executor 执行已提交的 Runnable 任务的对象。
ExecutorService 接口(实现了Executor) 提供管理终止 Executor 的方法。
Executors 工厂类,提供 Exexutor,ExecutorService,ScheduleExecutorService,ThreadFactory,Callable类的工厂和实用方法。
ThreadFactory 创建 Thread 的工厂类
ExecutorService 四种线程池类型
CachedThreadPool
Executors.newCachedThreadPool() 缓存线程池 (重用已经构造过的线程池, 适用于 执行许多短期异步任务的程序)
ScheduledThreadPool
Executors.newScheduledThreadPool() 定时(调度)线程池 (可以在 线程执行前后 进行时间延迟, scheduleAtFixedRate 方法, scheduleWithFixedDelay 方法)
FixedThreadPool
Executors.newFixedThreadPool() 重用固定数量的线程池 (创建的时候固定线程池数量, 线程都处于活动状态, 会等待线程完成, 线程失败或终止了, 将会替换一个新的线程)
SingleThreadExecutor
Executors.newSingleThreadExecutor() 单个线程的线程池 (只创建一个单独的线程, 可适用于处理时间比较长的方法)
Executors Code
import org.apache.commons.lang3.time.DateFormatUtils;
import org.junit.jupiter.api.Test;
import java.util.Date;
import java.util.concurrent.*;
/**
* ThreadPoolExecutor
*
* 四种线程池的特性和使用方式
* 1. SingleThreadExecutor
* 2. FixedThreadExecutor
* 3. CacheThreadExecutor
* 4. ScheduledThreadExecutor
*
* @author zhou
*/
public class TestExecutorService {
/**
* 单线程执行器
*
* 只有一个线程的线程池
* 执行结果: 需要等待 仅有的线程 执行完, 才能执行下一次遍历。
*/
@Test
public void testSingleThreadExecutor() throws InterruptedException, ExecutionException {
// new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println("thread-name: " + Thread.currentThread().getName() + ", time: " + DateFormatUtils.format(new Date(), "HH:mm:ss"));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(6000);
}
/**
* 可重用固定线程数的线程池
*
* 初始化时创建固定数量的线程, 之后不再创建, 除非 有线程不可用, 就进行创建并替换。
* 执行结果: 会重复使用线程池中的线程, 而不会再创建新的线程, 没有线程可用时, 会一直等待有空闲的线程, 并不会重新创建。
*/
@Test
public void testFixedThreadExecutor() throws InterruptedException {
// new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println("thread-name: " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(6000);
}
/**
* 缓存线程的线程池
*
* 由于使用的是 SynchronousQueue, 本身只维护一个线程, 插入的时候必须有线程进行移除操作才能进行插入
* 终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
* 初始化为 0, 最大为 Integer.MAX_VALUE (2147483647)
*
* 执行结果: 没有线程可用就创建, 并不会进行等待。
*/
@Test
public void testCacheThreadExecutorService() throws InterruptedException {
// new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(6000);
}
/**
* 定时(调度)线程池
*
*/
@Test
public void testScheduledThreadExecutor() throws InterruptedException, ExecutionException, TimeoutException {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 下方 四个方法都能获取 线程 返回的数据。
// 1. schedule() 延迟启动时间
// System.out.println("start");
// for (int i = 0; i < 3; i++) {
// executor.schedule(() -> System.out.println("运行线程"), 1, TimeUnit.SECONDS);
// }
// Thread.sleep(3000);
// 2. schedule() 延迟启动时间, 通过 <V> 泛型来指定返回的参数的类型
// executor.schedule(() -> {
// System.out.println("test");
// }, 1, TimeUnit.SECONDS);
// Thread.sleep(4000);
// 3. scheduleAtFixedRate() 初始化延迟和定时调度延迟
// command 运行的方法
// initialDelay 初始化延迟时间
// period 线程执行前延迟时间
// unit 统一的时间单位
// 如果第一个线程的执行时间超过第二个线程延迟的时间, 则等待第一个线程执行完, 再执行第二个线程。
// executor.scheduleAtFixedRate(() -> {
// System.out.println(System.currentTimeMillis());
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }, 0, 1, TimeUnit.SECONDS);
//
// Thread.sleep(5000);
// 4. scheduleWithFixedDelay() 固定延迟调度
// command 运行的方法
// initialDelay 初始化延迟时间
// delay 线程延期时间
// unit 统一的时间单位
// 和上方方法唯一不同的就是 delay 的处理
// 如果第一个线程的执行时间超过第二个线程延迟的时间, 则第二个线程的执行时间为 (第一个线程的执行时间 + delay)
// executor.scheduleWithFixedDelay(() -> {
// System.out.println(System.currentTimeMillis());
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }, 0, 1, TimeUnit.SECONDS);
// Thread.sleep(5000);
// 5. 需要获取线程执行完后返回的参数的对象/方法解析
ScheduledFuture<String> schedule = executor.schedule(() -> {
return String.valueOf(System.currentTimeMillis());
}, 2, TimeUnit.SECONDS);
// getDelay(计算时间单位) 执行到该方法的时候, 执行的线程还有多久执行完, 返回 0 或者 负数的时候代表线程已经执行完成。
// System.out.println(schedule.getDelay(TimeUnit.SECONDS));
// get() 获取线程执行完后的返回值
// System.out.println(schedule.get());
// get(超时时间, 时间单位) 最长等待多长时间接收到线程执行完后的返回值
// 抛出四个异常
// 线程取消异常
// 线程执行异常
// 线程等待时被中断异常
// 线程超时异常 (线程还在执行中, 我们就去获取的情况)
// System.out.println(schedule.get(1, TimeUnit.SECONDS));
// Thread.sleep(4000);
}
}
ThreadPoolExecutor
ThreadPoolExecutor 参数
1. corePoolSize 线程池中核心线程数
2. maximumPoolSize 线程池中最大线程数
3. keepAliveTime 空闲时间, 当线程池数量超过核心线程池时, 多余的空闲线程存活的时间
4. unit 空闲时间单位
5. workQueue 等待队列, 线程池中的线程数超过核心线程数时, 任务将放在等待队列中
6. threadFactory 线程工厂
7. handler 拒绝策略, 当线程池和等待队列都满了之后, 需要通过该对象的回调函数进行回调处理
workQueue 等待队列
1. ArrayBlockingQueue 队列是有界的, 需要初始化队列数量, 基于数组实现的阻塞队列
2. LinkedBlockingQueue 队列可以有界和无界, 基于链表实现的阻塞队列
3. SynchronousQueue 不存储元素的阻塞队列, 每个插入操作必须等到另一个线程调用移除操作, 否则插入操作一直处于阻塞状态
4. PriorityBlockingQueue 带优先级的无界阻塞队列
5. LinkedBlockingDeque 队列可有界和无界, 基于双向链表实现的阻塞队列
@Test
public void testWorkQueue() {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(20);
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
SynchronousQueue synchronousQueue = new SynchronousQueue();
PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
linkedBlockingDeque);
executor.execute(() -> {
System.out.println("test");
});
}
handler 拒绝策略
自带四种拒绝策略, 可自定义拒绝策略
1. AbortPolicy (默认) 处理程序遭到拒绝将抛出运行时异常 RejectedExecutionException
2. CallerRunPolicy 由调用该方法的线程执行 (比如: 我主线程调用的, 直接交给主线程执行)
3. DiscardPolicy 直接将任务删除, 不做任何处理
4. DiscardOldestPolicy 将位于工作队列的头部任务删除, 再执行当前任务 (再次失败, 再次删除和重试)
5. 自定义 实现 RejectedExecutionHandler 重写 rejectedExecution 方法
@Test
public void handler() throws InterruptedException {
ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
// 看时间戳的变化 (在两秒钟的时间内, 丢弃了很多线程)
ThreadPoolExecutor.DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
ThreadPoolExecutor.DiscardOldestPolicy discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
MyRejectedHandler myRejectedHandler = new MyRejectedHandler();
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(8), myRejectedHandler);
for (int i = 0; i < 6; i++) {
executor.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" + System.currentTimeMillis() + ":" + executor.getQueue().size());
});
}
Thread.sleep(50000);
}
// 自定义拒绝策略
public static class MyRejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("阻塞了");
}
}
线程执行前, 执行后, 关闭和终止的方法回调
回调方法
1. beforeExecute 执行线程之前
2. afterExecute 执行线程之后
3. terminated 线程池结束调用
关闭线程池
1. shutdown() 将线程池状态置为 SHUTDOWN, 按过去已提交任务的顺序发起一个有序的关闭。
2. shutdownNow() 将线程池状态置为 SHUTDOWN, 通过 Thread.interrupt() 方法停止所有线程, 并返回从未开始执行的任务列表(也就是队列中的线程),
interrupt() 方法不能保证能停止正在运行的任务, 但会尽力尝试, 所以有些无法响应中断的线程, 可能永远无法终止。
判断线程池是否关闭
1. isShutdown() 线程已关闭, 返回 true
2. isTerminating() 当 shutdown 和 shutdownNow 正在终止但未完成中的过程中, 返回 true
3. isTerminated() 如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
@Test
public void testAOPExecuteMethod() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(15)) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("执行线程之前");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行线程之后" + t);
}
@Override
protected void terminated() {
System.out.println("线程终止");
}
};
executor.execute(() -> {
System.out.println("执行线程");
});
executor.shutdown();
System.out.println(executor.isTerminated());
System.out.println(executor.isShutdown());
System.out.println(executor.isTerminating());
// executor.shutdownNow();
executor.execute(() -> {
System.out.println("执行线程");
});
}
线程池统计信息
1. getActiveCount() 返回处于活动状态的线程的大致数量
2. getCompletedTaskCount() 返回已完成执行的任务的大致总数。
3. getCorePoolSize() 返回核心线程数
4. getKeepAliveTime(TimeUnit unit) 返回线程保持活动的时间, 这个就是 keepAliveTime
5. getLargestPoolSize() 返回线程池中同时存在的最大线程数
6. getMaximunPoolSize() 返回线程池中最大线程数
7. getPoolSize() 返回线程池中的当前线程数。
8. getQueue() 返回此执行程序使用的任务队列
9. getRejectedExecutionHandler() 返回拒绝策略
10. getTackCount() 返回已计划执行的任务的大致总数
其他方法
1. prestartAllCoreThreads() 启动所有核心线程, 使其处于等待工作的空闲状态 return 已启动的线程数
2. prestartCoreThread() 启动核心线程(一个), 使其处于等待工作的空闲状态 return 启动了线程, 返回 true
3. purge() 尝试从工作队列移除所有已取消的 Future 任务
4. remove() 从执行程序的内部队列中移除此任务 (如果存在), 如果尚未开始, 则其不再运行
运行流程解析
首先需要记住几个基本参数和作用, 因为运行的过程都需要通过这些参数。
- coreSize 核心线程数量
- maximumPoolSize 最大线程数量
- workQueue 线程等待队列
- keepAliveTime 线程过期时间
- handler 拒绝策略
核心参数
Worker 译为: 工人, 维护运行任务的线程中断控制状态。
HashSet<Worker> workers 工人集合, 设置包含池中的所有工作线程。只有在持有主锁时才能访问
ReentrantLock mainLock 主锁, 对工人集合和相关簿记的访问保持锁定。
线程状态
RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED
execute(Runnable command) 方法解析
方法处理分为三种情况, 也就是针对 command 的处理。
- 添加到核心线程中 (当核心线程数足够时)
- 添加到等待队列中 (超出核心线程数时)
- 拒绝线程 (等待队列满了, 交给拒绝策略进行处理)
addWorker(Runnable firstTask, boolean core) 方法解析
firstTack 需要运行的线程
core 以 corePoolSize 为界限, 小于 corePoolSize = true, 大于 corePoolSize = false
返回值和处理分为以下情况:
- 如果池已停止或有资格关闭,则此方法返回false
- 如果线程工厂在被请求时没有创建一个线程,那么它也会返回false
- 创建 Worker 存储 firstTask, 并获取锁, 添加创建的 Worker 到 workers 中, 释放锁, 如果是添加成功, 则启动线程。
BlockingQueue<Runnable> workQueue 译为: 工作队列
用于保存任务并将任务传递给工作线程的队列。
运行流程
workers 核心线程数量是 coreSize, 最大的线程数量是 maximumPoolSize。
添加线程的正常流程:
创建 Worker 存储线程, 并将 Worker 存储到 workers 中, 启动并运行线程。
添加线程的等待流程:
当运行的线程数 大于 maximumPoolSize 并 小于 workQueue 最大数量时, 将线程存储到 workQueue 中, 等待线程池空闲时取出运行。
添加线程的拒绝流程:
当线程池运行的线程数等于 maximumPoolSize, workQueue 已经超过限制的时候, 通过 handler 拒绝策略处理拒绝的线程。
参考博文
https://www.jianshu.com/p/7ab4ae9443b9
https://www.cnblogs.com/dolphin0520/p/3932921.html
https://blog.csdn.net/qq_22764659/article/details/83620730
https://www.cnblogs.com/CarpenterLee/p/9558026.html