java线程池——自己实现简单线程池

java中我们常用到各种池,比如线程池、数据库连接池等,各种池其目的之一就是为了提高资源的利用率。很多时候初学者都是直接使用java提供的api,这样很方便。为了更好地使用提供的api,还是需要更深入的了解它的原理。下面代码清单是一个简单的自己实现的线程池

public interface ThreadPool {
    void execute(Runnable runnable);
    void shutdown();
    void addWorkers(int num);
    void removeWorker(int num);
    int getJobSize();
}
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;

public class MyPoolThread implements ThreadPool {

    private static final int MAX_WORKER_NUMBERS = 30;
    private static final int MIN_WORKER_NUMBERS = 1;
    private static final int DEFAULT_WORKER_NUMBERS = 10;

    private final LinkedList<Worker> workers = new LinkedList<>();
    private final LinkedList<Runnable> jobs = new LinkedList<>();

    private AtomicLong nextId = new AtomicLong(0);
    private int workerNum;

    public MyPoolThread(int num) {
        int workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : 
num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initializeWorkers(workerNum);
    }

    public MyPoolThread() {
        this(DEFAULT_WORKER_NUMBERS);
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (jobs) {
            jobs.add(runnable);
            jobs.notify();
        }
    }

    @Override
    public void shutdown() {
        synchronized (workers) {
            removeWorker(workerNum);
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized (workers) {
            if (num > 0 && num + workers.size() <= MAX_WORKER_NUMBERS) {
                initializeWorkers(num);
            } else if (num <= 0) {
                throw new IllegalArgumentException("num = " + num + "不能小于 1");
            } else {
                throw new IllegalArgumentException("工作者数量已到达最大值");
            }
        }
    }

    private void initializeWorkers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + nextId.getAndAdd(1));
            thread.start();
        }
        workerNum += num;
    }

    @Override
    public void removeWorker(int num) {
        synchronized (workers) {
            checkNum(num, 1, workerNum);
            for (int i = 0; i < num; i++) {
                Worker worker = workers.removeLast();
                worker.cancel();
                worker.thread.interrupt();
                worker = null;
            }
            workerNum -= num;
        }
    }

    private void checkNum(int num, int min, int max) {
        if (num > max || num < min) {
            throw new IllegalArgumentException("num = " + 
num + "应该大于等于" + min + "并且小于等于" + max);
        }
    }


    @Override
    public int getJobSize() {
        return jobs.size();
    }

    class Worker implements Runnable {

        private boolean running = true;
        private Thread thread;

        boolean cancel() {
            if (!running) {
                throw new RuntimeException("worker is cancel");
            }
            running = false;
            return true;
        }

        @Override
        public void run() {
            thread = Thread.currentThread();
            while (running) {
                Runnable job;
                synchronized (jobs) {
                    if (jobs.size() == 0) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            setEx(thread, e);
                        }
                    }
                    job = jobs.remove();
                }
                try {
                    executeJob(job);
                } catch (Throwable e) {
                    setEx(thread, e);
                }
            }
        }

        private void executeJob(Runnable job) {
            job.run();
        }
    }

    private void setEx(Thread thread, Throwable e) {
//  异常处理
    }
}
下面是主方法
public static void main(String[] args) {
        MyPoolThread myPoolThread = new MyPoolThread();
        for (int i = 0; i < 50; i++) {
            int j = i;
            myPoolThread.execute(() -> {
                System.out.println(j+":我是消息");
            });
        }
        myPoolThread.shutdown();
    }

可以看到这里线程池还是有许多的问题了,不过作为了解其基本原理,已经够了。这个线程池在初始化的时候就初始化了一定数目的线程。并且启动了它们,通过synchronized锁的notify/wait机制实现一旦有任务进来,就会有工作线程去执行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 【JAVA 线程】 线程 进程:是一个正在执行中的程序。每一个进程执行都有一个执行顺序。该顺序是一个执行路径,或者...
    Rtia阅读 2,786评论 2 20
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,878评论 1 19
  • 由于时间仓促,有些地方未写完,后面会继续补充.如有不妥之处,欢迎及时与我沟通. 如果你也是在学习java,给你们推...
    分不清java阅读 2,851评论 0 15
  • 相关概念 面向对象的三个特征 封装,继承,多态.这个应该是人人皆知.有时候也会加上抽象. 多态的好处 允许不同类对...
    东经315度阅读 2,010评论 0 8
  • 一连工作了7天在这段时间里没有看书,迷上了两个动漫,一个是国产一个是日漫。 小的时候看的最多的是国产动漫,那时每到...
    昱锦CC阅读 471评论 0 0