一个简单的线程池,应该具备以下能力:
1.能够有效的管理工作线程数量。(可以通过4个参数来管理,初始化线程数,最大线程数,核心线程数,维护工作线程的时间间隔)
2.能够管理提交的任务。(有一个队列来管理,已提交的任务,且当缓存的任务数量达到定义的上限时,应该设定一些拒绝策略告知调用者)
所以定义以下接口:
线程池接口定义
public interface ThreadPool {
/**
* 提交任务到线程池
* @param runnable
*/
void execute(Runnable runnable);
/**
* 关闭线程池
*/
void shutDown();
/**
* 线程池是否被关闭
* @return
*/
boolean isShutDown();
/**
* 得到初始化线程池数量
* @return
*/
int getInitSize();
/**
* 得到最大线程池数量
* @return
*/
int getMaxSize();
/**
* 得到核心线程池数量
* @return
*/
int getCoreSize();
/**
* 得到活跃线程池数量
* @return
*/
int getActiveSize();
/**
* 获取任务缓冲队列大小
* @return
*/
int getQueueListSize();
}
任务队列定义
package threadPool;
/**
* 任务队列,应该有限制大小的参数
*
* 提供offer方法,用于提交runnable任务
*
* 提供take 方法,用于取runnable任务
*
*/
public interface RunnableQueue {
/**
* 提交任务,提交的任务首先会进入缓冲队列
* @param runnable
*/
void offer(Runnable runnable);
/**
* 取任务,从队列中取
* @return
*/
Runnable take() throws InterruptedException;
/**
* 获取缓冲队列大小
* @return
*/
int getQueueSize();
}
创建线程的工厂定义:
package threadPool;
public interface ThreadFactory {
Thread createThread(Runnable runnable);
}
拒绝策略接口定义
package threadPool;
public interface DenyPolicy {
void reject(Runnable runnable, ThreadPool threadPool);
//该拒绝策略直接将任务抛弃,就不管
class DiscarDenyPolicy implements DenyPolicy{
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
// nothing doing something
}
}
//该拒绝策略,当任务满时,抛出异常
class AbortDenyPoliy implements DenyPolicy{
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
throw new RuntimeException("this runnable "+runnable+"will be abort");
}
}
//该拒绝策略,被抛弃的任务就在当前线程中运行
class RunnerDenyPoliy implements DenyPolicy{
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
if (!threadPool.isShutDown()){
runnable.run();
}
}
}
}
以下是接口的简单实现
任务队列的实现
package threadPool;
import java.util.LinkedList;
public class LinkedRunnableQueue implements RunnableQueue{
private int limitQueueSize;
private DenyPolicy denyPolicy;
private ThreadPool threadPool;
private LinkedList<Runnable> taskList = new LinkedList<Runnable>();
public LinkedRunnableQueue(int limitQueueSize,DenyPolicy denyPolicy,ThreadPool threadPool){
this.denyPolicy = denyPolicy;
this.limitQueueSize = limitQueueSize;
this.threadPool = threadPool;
}
@Override
public void offer(Runnable runnable) {
synchronized (taskList){
int size = taskList.size();
if (size >= limitQueueSize){
//当队列满了,就拒绝
denyPolicy.reject(runnable,threadPool);
}else {
//加入队尾
taskList.addLast(runnable);
//唤醒等待的线程
taskList.notifyAll();
}
}
}
@Override
public Runnable take() throws InterruptedException{
synchronized (taskList){
//若队列为空,则让线程等待
while (taskList.isEmpty()){
try {
taskList.wait();
} catch (InterruptedException e) {
throw e;
}
}
return taskList.removeFirst();
}
}
@Override
public int getQueueSize() {
return taskList.size();
}
}
线程工厂的简单实现
package threadPool;
import java.util.concurrent.atomic.AtomicInteger;
public class DefaultThreadFactory implements ThreadFactory{
private static AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
private static ThreadGroup threadGroup = new ThreadGroup("MyThread-Pool-"+GROUP_COUNTER.getAndDecrement());
private static AtomicInteger COUNTER = new AtomicInteger(0);
@Override
public Thread createThread(Runnable runnable) {
return new Thread(threadGroup,runnable,threadGroup.getName()+"-thread-"+COUNTER.getAndDecrement());
}
}
注入到线程池中的线程需要实现的逻辑单元,即Runnable的实现
package threadPool;
public class InternalTask implements Runnable{
private RunnableQueue runnableQueue;
private volatile boolean running = true;
public InternalTask(RunnableQueue runnableQueue){
this.runnableQueue = runnableQueue;
}
//当前任务为running 且没有被中断,则不断的从队列中取runnable 并执行
@Override
public void run() {
while (running && !Thread.currentThread().isInterrupted()){
try {
Runnable take = runnableQueue.take();
take.run();
} catch (Exception e) {
running = false;
e.printStackTrace();
}
}
}
//停止当前任务
public void stop(){
this.running = false;
}
}
线程池的实现:
package threadPool;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
public class BasicThreadPool implements ThreadPool{
//初始化线程数
private int initSize;
//最大线程数
private int maxSize;
//核心线程数
private int coreSize;
//当前活跃的线程数量
private int currentActiveSize;
//维护线程数量的时间间隔,秒级
private long keepAliveTime;
private TimeUnit timeUnit;
private ThreadFactory threadFactory;
private Thread thread;
//当前线程池的状态,是否被关闭
private volatile boolean isShutDown = false;
//任务队列
private RunnableQueue runnableQueue;
//工作线程队列
private Queue<ThreadTask> threadTasks = new ArrayDeque<ThreadTask>();
public BasicThreadPool(int initSize,int maxSize,int coreSize,int taskQueueLimit){
this(initSize,maxSize,coreSize,10,TimeUnit.SECONDS,new DefaultThreadFactory(),
new DenyPolicy.DiscarDenyPolicy(),taskQueueLimit);
}
public BasicThreadPool (int initSize,int maxSize,int coreSizeSize,
int keepAliveTime,TimeUnit timeUnit,
ThreadFactory threadFactory,DenyPolicy denyPolicy,
int taskQueueLimit){
this.initSize = initSize;
this.maxSize = maxSize;
this.coreSize = coreSizeSize;
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
this.threadFactory = threadFactory;
if (this.threadFactory == null)
this.threadFactory = new DefaultThreadFactory();
this.runnableQueue = new LinkedRunnableQueue(taskQueueLimit,denyPolicy,this);
//调用初始化方法
this.init();
}
private void init(){
//TODO 初始化线程池
//启动线程维护
this.thread = new Thread(()->{
//在线程池没有被中断,且未被打断的情况下维护
while (!isShutDown && !Thread.currentThread().isInterrupted()){
try {
timeUnit.sleep(keepAliveTime);
} catch (InterruptedException e) {
isShutDown = true; //线程被中断
break;
}
synchronized (this){
if (isShutDown)
break;
System.out.println("维护线程数量");
//当前任务队列还有需要处理的任务,且 活跃的线程数 小于 核心线程数,则继续扩容
if (runnableQueue.getQueueSize() > 0 && currentActiveSize < coreSize){
for (int i=initSize;i<coreSize;i++){
newThread();
}
}
if (runnableQueue.getQueueSize() > 0 && currentActiveSize < maxSize){
for (int i=coreSize;i<maxSize;i++){
newThread();
}
}
//当前没有在等待的任务,且活跃的线程数,大于核心线程数,则释放任务线程
if (runnableQueue.getQueueSize() == 0 && currentActiveSize > coreSize){
for (int i=coreSize;i<currentActiveSize;i++){
removeThread();
}
}
}
}
});
this.thread.start();
//根据初始化线程数量,进入初始化
for (int i=0;i<initSize;i++){
newThread();
}
}
@Override
public void execute(Runnable runnable) {
if (this.isShutDown)
throw new IllegalStateException("thread pool is destroyed");
this.runnableQueue.offer(runnable);
}
@Override
public void shutDown() {
synchronized (this){
if (isShutDown)
return;
isShutDown = true;
threadTasks.forEach(threadTask -> {
threadTask.internalTask.stop();
threadTask.thread.interrupt();
this.currentActiveSize--;
});
thread.interrupt();
}
}
@Override
public boolean isShutDown() {
return isShutDown;
}
@Override
public int getInitSize() {
if (isShutDown)
throw new IllegalStateException("thread pool is destroyed");
return this.initSize;
}
@Override
public int getMaxSize() {
if (isShutDown)
throw new IllegalStateException("thread pool is destroyed");
return this.maxSize;
}
@Override
public int getCoreSize() {
if (isShutDown)
throw new IllegalStateException("thread pool is destroyed");
return this.coreSize;
}
@Override
public int getActiveSize() {
synchronized (this){
return currentActiveSize;
}
}
@Override
public int getQueueListSize() {
if (isShutDown)
throw new IllegalStateException("thread pool is destroyed");
return this.runnableQueue.getQueueSize();
}
private void newThread(){
InternalTask internalTask = new InternalTask(this.runnableQueue);
Thread thread = this.threadFactory.createThread(internalTask);
ThreadTask threadTask = new ThreadTask(internalTask,thread);
threadTasks.offer(threadTask);
this.currentActiveSize ++;
thread.start();
}
private void removeThread(){
//从线程池中移除某一个线程
ThreadTask threadTask = threadTasks.remove();
threadTask.internalTask.stop();
this.currentActiveSize --;
}
//为thread与InternalTask的一个组合
private static class ThreadTask{
InternalTask internalTask;
Thread thread;
public ThreadTask(InternalTask task,Thread thread){
this.internalTask = task;
this.thread = thread;
}
}
}
一个简单的线程池,就这么实现啦。。。。。