使用线程池的场合
- 单个任务处理时间短
- 将需处理的任务数量大
使用Java线程池好处
1、使用new Thread()创建线程的弊端:
- 每次通过new Thread()创建对象性能不佳。
- 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
- 缺乏更多功能,如定时执行、定期执行、线程中断。
2、使用Java线程池的好处: - 重用存在的线程,减少对象创建、消亡的开销,提升性能。
- 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
- 提供定时执行、定期执行、单线程、并发数控制等功能。
一般一个简单线程池至少包含下列组成部分。
- 线程池管理器(CustomThreadPool):用于创建并管理线程池
- 工作线程(CustomWorker): 线程池中线程
- 任务接口(基类)(CustomTask):每个任务必须实现的接口(或继承的基类),以供工作线程调度任务的执行。
- 任务队列:用于存放没有处理的任务。提供一种缓冲机制。
详细代码
线程池管理器至少有下列功能:创建线程池,销毁线程池,添加新任务
线程池的代码如下:
package com.tao.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
/**
* @author: Penger
* @time: 2019/7/23
* @description: <p>
* 自定义线程池简单实现:
* 1. 线程池大小
* 2. 创建、销毁线程池
* 3. 工作线程
* 4. 任务队列
* </p>
**/
@Slf4j
class CustomThreadPool {
/**
* 状态,是否在运行
*/
private volatile Boolean running = true;
/**
* 任务缓存队列
*/
private final BlockingQueue<CustomTask> taskQueue;
/**
* worker的执行需要thread
*/
private Vector<CustomWorker> workers = new Vector<>();
/**
* 最大线程数
*/
private int maxPoolSize;
/**
* 当前线程数
*/
private int workSize;
CustomThreadPool(int maxPoolSize, BlockingQueue<CustomTask> taskQueue) {
this.maxPoolSize = maxPoolSize;
this.workSize = 0;
this.taskQueue = taskQueue;
}
void execute(CustomTask task) {
Optional.ofNullable(task).orElseThrow(NullPointerException::new);
if (workSize < maxPoolSize) {
addThread(task);
} else if (!taskQueue.offer(task)) {
rejectTask(task);
}
}
/**
* 添加线程执行start
*
* @param task task
*/
private void addThread(CustomTask task) {
//当前线程数+1
workSize++;
//创建新的Worker
CustomWorker worker = new CustomWorker("Thread-" + workSize, task, taskQueue, running);
workers.add(worker);
//执行thread
worker.start();
}
/**
* 停止所有线程
*/
void shutdown() {
running = false;
if (!workers.isEmpty()) {
for (CustomWorker worker : workers) {
// worker.setRunning(false);
worker.interruptSelf();
}
}
Thread.currentThread().interrupt();
}
private void rejectTask(CustomTask task) {
log.info("blocking queue has no space, reject task{}", task.getTaskId());
}
}
工作线程可以继承thread,能够获取到任务队列,实现Thread的run方法,在run里面取任务执行,同时提供工作线程的销毁方法
package com.tao.threadpool;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
/**
* @author: Penger
* @time: 2019/7/23
* @description: <p>
* 自定义工作线程,优先执行直接赋予的任务,执行完毕之后再从队列获取任务
* </p>
**/
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Data
public class CustomWorker extends Thread {
private CustomTask task;
private final BlockingQueue<CustomTask> taskQueue;
private Boolean running;
CustomWorker(String name, CustomTask task, BlockingQueue<CustomTask> taskQueue, Boolean running) {
super(name);
this.task = task;
this.taskQueue = taskQueue;
this.running = running;
}
@Override
public void run() {
while (running) {
if (task != null) {
task.taskRun();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
task = taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("queue has [{}] elements", taskQueue.size());
}
}
void interruptSelf() {
this.interrupt();
}
}
任务接口(基类),定义线程池具体执行何种任务,通过taskRun()开始任务执行
package com.tao.threadpool;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* @author: Penger
* @time: 2019/7/23
* @description: <p>
* 自定义任务
* </p>
**/
@Slf4j
@Data
class CustomTask {
private int taskId;
CustomTask(int taskId) {
this.taskId = taskId;
}
void taskRun() {
log.info("custom task-{} running...", taskId);
}
}