前言
线程池(Thread Pool)是一种基于池化思想管理线程的工具。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。使用线程池可以带来诸多好处:
①降低资源消耗:通过池化技术复用已创建的线程,减少线程创建和销毁的损耗。
②提高响应速度:任务到达时,特定情况下无需再创建线程。
③便于管理。
简单实现
public class ThreadPool {
//线程池中的默认线程池为5
private static int WORK_NUM = 5;
//队列默认任务个数为100
private static int TASK_COUNT = 100;
private WorkThread[] workThreads;
//任务队列
private final BlockingQueue<Runnable> taskQueue;
private final int worker_num;
public ThreadPool(int work_num,int taskCount){
this.worker_num = work_num;
taskQueue = new ArrayBlockingQueue<Runnable>(taskCount);
workThreads = new WorkThread[worker_num];
for (int i= 0;i<worker_num;i++){
workThreads[i] = new WorkThread();
workThreads[i].start();
}
}
//执行任务 只是把任务加进任务队列,什么时候执行由线程管理器决定
public void execute(Runnable task){
try {
taskQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void destroy(){
System.out.println("ready to close pool``````");
for (int i= 0;i<worker_num;i++){
workThreads[i].stopWork();
workThreads[i] = null;
}
taskQueue.clear();//清空任务队列
}
/**
* 工作线程
*/
class WorkThread extends Thread{
@Override
public void run() {
Runnable r = null;
try {
while (!isInterrupted()){
r = taskQueue.take();
if (r !=null){
System.out.println(getId()+" ready exec :"+r);
r.run();
}
}
}catch (Exception e){
}
}
public void stopWork(){
interrupt();
}
}
static class MyTask implements Runnable{
private String name;
private Random r = new Random();
public MyTask(String name){
this.name = name;
}
public String getName(){
return name;
}
@Override
public void run() {//执行任务
try {
Thread.sleep(r.nextInt(1000)+2000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getId()+" sleep Interrupt "
+Thread.currentThread().isInterrupted());
}
}
}
@NonNull
@Override
public String toString() {
return "WorkThread num is :" +worker_num
+" wait task num is :" +taskQueue.size();
}
}
}
先从构造器入手,
public ThreadPool(int work_num,int taskCount){
this.worker_num = work_num;
taskQueue = new ArrayBlockingQueue<Runnable>(taskCount);
workThreads = new WorkThread[worker_num];
for (int i= 0;i<worker_num;i++){
workThreads[i] = new WorkThread();
workThreads[i].start();
}
}
接收两个参数,一个是工作线程得个数,就是在线程池中默认创建得线程个数。第二个是任务个数,即等待队列中最多可接收得runable得个数。
工作线程做了啥事:
/**
* 工作线程
*/
class WorkThread extends Thread{
@Override
public void run() {
Runnable r = null;
try {
while (!isInterrupted()){
r = taskQueue.take();
if (r !=null){
System.out.println(getId()+" ready exec :"+r);
r.run();
}
}
}catch (Exception e){
}
}
public void stopWork(){
interrupt();
}
}
核心代码就从任务队列中不断取出runable对象(即MyTask)并执行。
excute()只是将任务(MyTask)加入到队列(BlockQueue)中。
//执行任务 只是把任务加进任务队列,什么时候执行由线程管理器决定
public void execute(Runnable task){
try {
taskQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
测试代码:
public static void main(String args[]) throws InterruptedException{
ThreadPool threadPool = new ThreadPool(5,100);
threadPool.execute(new MyTask("testA"));
threadPool.execute(new MyTask("testD"));
threadPool.execute(new MyTask("testB"));
threadPool.execute(new MyTask("testE"));
threadPool.execute(new MyTask("testF"));
System.out.println(threadPool);
Thread.sleep(10000);
threadPool.destroy();
System.out.println(threadPool);
}
运行截图:
WorkThread num is :5 wait task num is :1
17 ready exec :com.lxxl.flowlayout.ThreadPool$MyTask@17a46223
16 ready exec :com.lxxl.flowlayout.ThreadPool$MyTask@56392182
14 ready exec :com.lxxl.flowlayout.ThreadPool$MyTask@de7a04c
15 ready exec :com.lxxl.flowlayout.ThreadPool$MyTask@2b036cd1
18 ready exec :com.lxxl.flowlayout.ThreadPool$MyTask@10469abf
ready to close pool``````
WorkThread num is :5 wait task num is :0
引用美团知乎博客的一张图: