为了更好的理解ExecutorService,实现一个简单线程池

一个简单的线程池,应该具备以下能力:
1.能够有效的管理工作线程数量。(可以通过4个参数来管理,初始化线程数,最大线程数,核心线程数,维护工作线程的时间间隔)
2.能够管理提交的任务。(有一个队列来管理,已提交的任务,且当缓存的任务数量达到定义的上限时,应该设定一些拒绝策略告知调用者)

所以定义以下接口:
线程池接口定义

public interface ThreadPool {

    /**
     * 提交任务到线程池
     * @param runnable
     */
    void execute(Runnable runnable);

    /**
     * 关闭线程池
     */
    void shutDown();

    /**
     * 线程池是否被关闭
     * @return
     */
    boolean isShutDown();

    /**
     * 得到初始化线程池数量
     * @return
     */
    int getInitSize();

    /**
     * 得到最大线程池数量
     * @return
     */
    int getMaxSize();

    /**
     * 得到核心线程池数量
     * @return
     */
    int getCoreSize();

    /**
     * 得到活跃线程池数量
     * @return
     */
    int getActiveSize();

    /**
     * 获取任务缓冲队列大小
     * @return
     */
    int getQueueListSize();
}

任务队列定义

package threadPool;

/**
 * 任务队列,应该有限制大小的参数
 *
 * 提供offer方法,用于提交runnable任务
 *
 * 提供take 方法,用于取runnable任务
 *
 */
public interface RunnableQueue {

    /**
     * 提交任务,提交的任务首先会进入缓冲队列
     * @param runnable
     */
    void offer(Runnable runnable);

    /**
     * 取任务,从队列中取
     * @return
     */
    Runnable take() throws InterruptedException;

    /**
     * 获取缓冲队列大小
     * @return
     */
    int getQueueSize();
}

创建线程的工厂定义:

package threadPool;

public interface ThreadFactory {
    Thread createThread(Runnable runnable);
}

拒绝策略接口定义

package threadPool;

public interface DenyPolicy {
    void reject(Runnable runnable, ThreadPool threadPool);

    //该拒绝策略直接将任务抛弃,就不管
    class DiscarDenyPolicy implements DenyPolicy{
        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            // nothing doing something
        }
    }

    //该拒绝策略,当任务满时,抛出异常
    class AbortDenyPoliy implements DenyPolicy{
        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            throw new RuntimeException("this runnable "+runnable+"will be abort");
        }
    }

    //该拒绝策略,被抛弃的任务就在当前线程中运行
    class RunnerDenyPoliy implements DenyPolicy{
        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            if (!threadPool.isShutDown()){
                runnable.run();
            }
        }
    }
}

以下是接口的简单实现
任务队列的实现

package threadPool;

import java.util.LinkedList;

public class LinkedRunnableQueue implements RunnableQueue{

    private int limitQueueSize;

    private DenyPolicy denyPolicy;

    private ThreadPool threadPool;

    private LinkedList<Runnable> taskList = new LinkedList<Runnable>();

    public LinkedRunnableQueue(int limitQueueSize,DenyPolicy denyPolicy,ThreadPool threadPool){
        this.denyPolicy = denyPolicy;
        this.limitQueueSize = limitQueueSize;
        this.threadPool = threadPool;
    }

    @Override
    public void offer(Runnable runnable) {
        synchronized (taskList){
            int size = taskList.size();
            if (size >= limitQueueSize){
                //当队列满了,就拒绝
                denyPolicy.reject(runnable,threadPool);
            }else {
                //加入队尾
                taskList.addLast(runnable);
                //唤醒等待的线程
                taskList.notifyAll();
            }
        }
    }

    @Override
    public Runnable take() throws InterruptedException{
        synchronized (taskList){
            //若队列为空,则让线程等待
            while (taskList.isEmpty()){
                try {
                    taskList.wait();
                } catch (InterruptedException e) {
                    throw e;
                }
            }
            return taskList.removeFirst();
        }
    }

    @Override
    public int getQueueSize() {
        return taskList.size();
    }
}

线程工厂的简单实现

package threadPool;

import java.util.concurrent.atomic.AtomicInteger;

public class DefaultThreadFactory implements ThreadFactory{

    private static AtomicInteger GROUP_COUNTER = new AtomicInteger(1);

    private static ThreadGroup threadGroup = new ThreadGroup("MyThread-Pool-"+GROUP_COUNTER.getAndDecrement());

    private static AtomicInteger COUNTER = new AtomicInteger(0);

    @Override
    public Thread createThread(Runnable runnable) {
        return new Thread(threadGroup,runnable,threadGroup.getName()+"-thread-"+COUNTER.getAndDecrement());
    }
}

注入到线程池中的线程需要实现的逻辑单元,即Runnable的实现

package threadPool;

public class InternalTask implements Runnable{

    private RunnableQueue runnableQueue;

    private volatile boolean running = true;

    public InternalTask(RunnableQueue runnableQueue){
        this.runnableQueue = runnableQueue;
    }

    //当前任务为running 且没有被中断,则不断的从队列中取runnable 并执行
    @Override
    public void run() {
        while (running && !Thread.currentThread().isInterrupted()){
            try {
                Runnable take = runnableQueue.take();
                take.run();
            } catch (Exception e) {
                running = false;
                e.printStackTrace();
            }
        }
    }

    //停止当前任务
    public void stop(){
        this.running = false;
    }
}

线程池的实现:

package threadPool;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

public class BasicThreadPool implements ThreadPool{

    //初始化线程数
    private int initSize;

    //最大线程数
    private int maxSize;

    //核心线程数
    private int coreSize;

    //当前活跃的线程数量
    private int currentActiveSize;

    //维护线程数量的时间间隔,秒级
    private long keepAliveTime;

    private TimeUnit timeUnit;

    private ThreadFactory threadFactory;

    private Thread thread;

    //当前线程池的状态,是否被关闭
    private volatile boolean isShutDown = false;

    //任务队列
    private RunnableQueue runnableQueue;

    //工作线程队列
    private Queue<ThreadTask> threadTasks = new ArrayDeque<ThreadTask>();

    public BasicThreadPool(int initSize,int maxSize,int coreSize,int taskQueueLimit){
        this(initSize,maxSize,coreSize,10,TimeUnit.SECONDS,new DefaultThreadFactory(),
                new DenyPolicy.DiscarDenyPolicy(),taskQueueLimit);
    }

    public BasicThreadPool (int initSize,int maxSize,int coreSizeSize,
                            int keepAliveTime,TimeUnit timeUnit,
                            ThreadFactory threadFactory,DenyPolicy denyPolicy,
                            int taskQueueLimit){
        this.initSize = initSize;
        this.maxSize = maxSize;
        this.coreSize = coreSizeSize;
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
        this.threadFactory = threadFactory;
        if (this.threadFactory == null)
            this.threadFactory = new DefaultThreadFactory();

        this.runnableQueue = new LinkedRunnableQueue(taskQueueLimit,denyPolicy,this);

        //调用初始化方法
        this.init();
    }

    private void init(){
        //TODO 初始化线程池
        //启动线程维护
        this.thread = new Thread(()->{
            //在线程池没有被中断,且未被打断的情况下维护
            while (!isShutDown && !Thread.currentThread().isInterrupted()){
                try {
                    timeUnit.sleep(keepAliveTime);
                } catch (InterruptedException e) {
                    isShutDown = true; //线程被中断
                    break;
                }
                synchronized (this){
                    if (isShutDown)
                        break;

                    System.out.println("维护线程数量");
                    //当前任务队列还有需要处理的任务,且 活跃的线程数 小于 核心线程数,则继续扩容
                    if (runnableQueue.getQueueSize() > 0 && currentActiveSize < coreSize){
                        for (int i=initSize;i<coreSize;i++){
                            newThread();
                        }
                    }


                    if (runnableQueue.getQueueSize() > 0 && currentActiveSize < maxSize){
                        for (int i=coreSize;i<maxSize;i++){
                            newThread();
                        }
                    }

                    //当前没有在等待的任务,且活跃的线程数,大于核心线程数,则释放任务线程
                    if (runnableQueue.getQueueSize() == 0 && currentActiveSize > coreSize){
                        for (int i=coreSize;i<currentActiveSize;i++){
                            removeThread();
                        }
                    }
                }
            }
        });
        this.thread.start();

        //根据初始化线程数量,进入初始化
        for (int i=0;i<initSize;i++){
            newThread();
        }
    }

    @Override
    public void execute(Runnable runnable) {
        if (this.isShutDown)
            throw new IllegalStateException("thread pool is destroyed");
        this.runnableQueue.offer(runnable);
    }

    @Override
    public void shutDown() {
        synchronized (this){
            if (isShutDown)
                return;

            isShutDown = true;
            threadTasks.forEach(threadTask -> {
                threadTask.internalTask.stop();
                threadTask.thread.interrupt();
                this.currentActiveSize--;
            });

            thread.interrupt();
        }
    }

    @Override
    public boolean isShutDown() {
        return isShutDown;
    }

    @Override
    public int getInitSize() {
        if (isShutDown)
            throw new IllegalStateException("thread pool is destroyed");
        return this.initSize;
    }

    @Override
    public int getMaxSize() {
        if (isShutDown)
            throw new IllegalStateException("thread pool is destroyed");
        return this.maxSize;
    }

    @Override
    public int getCoreSize() {
        if (isShutDown)
            throw new IllegalStateException("thread pool is destroyed");
        return this.coreSize;
    }

    @Override
    public int getActiveSize() {
        synchronized (this){
            return currentActiveSize;
        }
    }

    @Override
    public int getQueueListSize() {
        if (isShutDown)
            throw new IllegalStateException("thread pool is destroyed");
        return this.runnableQueue.getQueueSize();
    }

    private void newThread(){
        InternalTask internalTask = new InternalTask(this.runnableQueue);
        Thread thread = this.threadFactory.createThread(internalTask);
        ThreadTask threadTask = new ThreadTask(internalTask,thread);
        threadTasks.offer(threadTask);
        this.currentActiveSize ++;
        thread.start();
    }

    private void removeThread(){
        //从线程池中移除某一个线程
        ThreadTask threadTask = threadTasks.remove();
        threadTask.internalTask.stop();
        this.currentActiveSize --;
    }

    //为thread与InternalTask的一个组合
    private static class ThreadTask{
        InternalTask internalTask;
        Thread thread;

        public ThreadTask(InternalTask task,Thread thread){
            this.internalTask = task;
            this.thread = thread;
        }
    }
}

一个简单的线程池,就这么实现啦。。。。。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容