前言
面试中经常会有考官问道,让你自己手写是实现一个线程池。这里我就按照网上的一些参考来进行实现一个简单的线程池。主要目的是为了理解和记忆实现过程中遇到的类 以及实现过程。
首先我们来看一些具体的成员变量有哪些:
终于知道为什么面试官喜欢这个了吧,有并发锁,有阻塞队列,有volatile关键字。很多和多线程并发的东西都会在线程池中涉及。
那么面试官就会围绕这些知识点去展开,线程池可能只是开始(套路,全是套路!)
RUNNING的作用是标记线程池的整体状态是否在工作
lock是为了在线程池内部的一些操作上加上并发锁,来保证程序不出错。
workers是一个工作集,用来存放工人。而且是hashSet类型,这就代表是一个没有重复worker的集合。
queue 阻塞队列,用来存放线程池将用执行的任务。使用的是并发包下的阻塞队列,可以保证在任务的存取上是线程安全的。
threads是一个简易的线程工厂,源码中相对复杂。用来存放生成的线程
poolsize代表核心线程数 就是这个线程池中主要大部分情况下有多少线程
coreSize 代表正在线程池中工作的线程数
关于这些Size的变量,源码中比这个多。而且不好理解,这里就简单的这样认为。
shutdown是标记线程池停止运行的标记
下面是整个线程池的最主要方法 execute,是执行任务的入口
这个方法我们看,当任务为空,抛出异常。如果当前线程池中的空闲线程小于核心线程数的话就增加线程进入addThread方法,否则就会直接加入阻塞队列去等待。
下面是addThread方法
我们看到,这里创建一个工人进行工作,然后把工人加入到工作集中。创建一个线程去执行工人的工作。线程启动。整个过程在并发锁的保护下进行。
下面是shutdown方法
首先把运行标志记为false,然后把工作集中的工人都停止手中工作 阻塞掉。然后阻塞完成,改变线程池的停止状态为真。
下面是线程池的内部类 worker的简易实现
这里我直接把工人获得的任务放入阻塞队列中,然后每次执行都从里面去拿。和源码的实现有点不一样(源码会先去执行每个工人自己拿到的任务,之后才去阻塞队列中拿取)
worker的run方法
在线程池正常运行的状态下,一直获取阻塞队列中的任务并且执行
下面是用于线程池停止时的方法
当线程池停止工作时,就对所有线程发出阻塞指令不再继续工作。
完整代码:
package com.Thread.ThreadPoolExecutor;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
public class MyThreadPoolExecutor {
private volatile boolean RUNNING = true;// 是否正在运行
private final ReentrantLock lock = new ReentrantLock();// 并发锁
private final HashSet<Worker> workers = new HashSet<>();// 不重复的工作集
private static BlockingQueue<Runnable> queue = null;// 任务阻塞队列
private final ArrayList<Thread> threads = new ArrayList<>();// 线程工厂
private volatile int poolsize;// 线程池的核心线程数
private volatile int coresize;// 当前线程池中的线程数
private volatile boolean shutdown = false;// 是否停止工作
public MyThreadPoolExecutor(int poolsize) {
// TODO Auto-generated constructor stub
this.poolsize = poolsize;
queue = new ArrayBlockingQueue<>(poolsize);
}
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
if (coresize < poolsize) {
addThread(command);
} else {
try {
queue.put(command);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void addThread(Runnable task) {
lock.lock();
try {
coresize++;
Worker worker = new Worker(task);
workers.add(worker);
Thread thread = new Thread(worker);
threads.add(thread);
thread.start();
} finally {
// TODO: handle finally clause
lock.unlock();
}
}
public void shutdown() {
RUNNING = false;
if (!workers.isEmpty()) {
for (Worker worker : workers) {
worker.interruptIfIdle();
}
}
shutdown = true;
Thread.currentThread().interrupt();
}
private final class Worker implements Runnable {
public Worker(Runnable task) {
// TODO Auto-generated constructor stub
queue.offer(task);
}
public Runnable getTask() throws InterruptedException {
return queue.take();
}
@Override
public void run() {
// TODO Auto-generated method stub
while (true && RUNNING) {
if (shutdown) {
Thread.interrupted();
}
Runnable task = null;
try {
task = getTask();
task.run();
} catch (InterruptedException e) {
}
}
}
public void interruptIfIdle() {
for (Thread thread : threads) {
System.out.println(thread.getName() + " interrupt");
thread.interrupt();
}
}
}
public static void main(String[] args) {
}
}
class Main {
public static void main(String[] args) {
MyThreadPoolExecutor executor = new MyThreadPoolExecutor(3);
for (int i = 0; i < 10; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("线程" + Thread.currentThread().getName() + "在工作....");
}
});
}
executor.shutdown();
}
}