0x01 普通的应用场景
线程池在大多数场景下,用于处理并发任务。例如我们需要同时处理10个请求,为了避免串行处理导致效率较低,一般会使用线程池,创建相应的线程去处理这10个请求。线程池能帮助我们管理线程的创建、销毁,通过任务队列缓存任务等,这里就不详细介绍了。
0x02 复杂的业务场景
假设我需要实现一个文件上传的功能:读取文件,将文件上传(写入)至指定的磁盘。针对较大的文件,我们需要分块上传,比如将大文件分成n个5MB的小块,使用线程池相关的技术去上传,这样可以提升效率。
理论上比较简单,但是我们看一下下面的对话,思考其中的问题:
A: 假设我们的线程池最大线程数为100,单个文件太大,分成了200个小块,线程数不够用,其他文件的上传被阻塞,怎么办?我们实际的应用场景肯定是希望多个文件同时开始上传的
B: 针对每个文件创建一个线程池,不就行了吗?
A: 线程池的创建、销毁是需要开销的,这种使用方式本质上与线程池的设计理念是相悖的
B: 那我提前创建好10个线程池,不就缓解多个文件同时上传的问题了吗?
A: 可以说是对的,但是提前创建过早的占用了系统资源,并且怎么让同一个文件的多个小块使用同一个线程池的线程去处理呢?
0x03 实现方式
import cn.hutool.core.thread.ThreadFactoryBuilder;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
private static ThreadLocal<ThreadPoolExecutor> threadLocal = new ThreadLocal<>();
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(20),
new ThreadFactoryBuilder().setNamePrefix("pool-root-").build());
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
int finalI = i;
Runnable task = () -> {
System.out.println(Thread.currentThread().getName() + " 开始处理第" + finalI + "个文件");
// ① 获取ThreadLocal对象对应的ThreadPoolExecutor对象
ThreadPoolExecutor threadPoolExecutor = threadLocal.get();
if (threadPoolExecutor == null) {
threadPoolExecutor = new ThreadPoolExecutor(3, 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadFactoryBuilder().setNamePrefix(Thread.currentThread().getName() + "-").build());
threadLocal.set(threadPoolExecutor);
}
// ② 文件分块处理
List<Object> smallFiles = splitFile(new File("abc.pdf"));
for (Object smallFile : smallFiles) {
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 正在处理第" + finalI + "个文件的第" + smallFile + "小块");
});
}
};
executor.submit(task);
}
}
/**
* 模拟文件分块,这里的 1 2 3 4表示4块文件
* @param file
* @return
*/
private static List<Object> splitFile(File file) {
List<Object> result = new ArrayList<>();
result.add("1");
result.add("2");
result.add("3");
result.add("4");
return result;
}
}
运行结果如下:
pool-root-0 开始处理第0个文件
pool-root-1 开始处理第1个文件
pool-root-2 开始处理第2个文件
pool-root-3 开始处理第3个文件
pool-root-4 开始处理第4个文件
pool-root-0-0 正在处理第0个文件的第1小块
pool-root-2-0 正在处理第2个文件的第1小块
pool-root-1-0 正在处理第1个文件的第1小块
pool-root-4-0 正在处理第4个文件的第1小块
pool-root-0-1 正在处理第0个文件的第2小块
pool-root-3-0 正在处理第3个文件的第1小块
pool-root-4-1 正在处理第4个文件的第2小块
pool-root-2-1 正在处理第2个文件的第2小块
pool-root-0-2 正在处理第0个文件的第3小块
pool-root-0-0 正在处理第0个文件的第4小块
pool-root-4-0 正在处理第4个文件的第4小块
pool-root-4-2 正在处理第4个文件的第3小块
pool-root-1-1 正在处理第1个文件的第2小块
pool-root-1-1 正在处理第1个文件的第4小块
pool-root-3-1 正在处理第3个文件的第2小块
pool-root-3-2 正在处理第3个文件的第3小块
pool-root-3-0 正在处理第3个文件的第4小块
pool-root-2-0 正在处理第2个文件的第4小块
pool-root-2-2 正在处理第2个文件的第3小块
pool-root-1-2 正在处理第1个文件的第3小块
从运行结果可以看出,每个文件使用单独的线程池,互不干扰,这里外层的线程数量达到一定值后,内层的线程池将不会再新建,而是重复使用。如果并发较高,则会出现处理文件的任务被放在任务队列中等待阻塞的情况,也就是文件上传被阻塞,当然这种情况应该考虑增加线程池线程数量或者通过增加应用节点解决-.-
0x04 小结
这种场景需要考虑多层线程池、线程池隔离,类似的场景还有很多,比如电商订单金额计算(需要按平台级别、店铺级别等计算优惠)、文本翻译(需要将文本分隔成多个句子进行翻译)等。
这类场景的特点是一个任务可以划分为子任务,子任务相对来说耗时比较长,这样的业务特性使得上述解决方案比较适用,当然这只是个人理解 ~~