9.阻塞队列和线程池

阻塞队列

特性

  1. 队列是空的时候,从队列获取元素的操作会被阻塞
  2. 队列是满的时候,往队列添加元素的操作会被阻塞
阻塞队列示例

实现

public class BlockingQueue {

  private List queue = new LinkedList();
  private int  limit = 10;

  public BlockingQueue(int limit){
    this.limit = limit;
  }

  public synchronized void enqueue(Object item)
  throws InterruptedException  {
    // 队列满,阻塞其他线程
    while(this.queue.size() == this.limit) {
      wait();
    }
    // 队列空,唤醒阻塞线程开始enqueue
    if(this.queue.size() == 0) {
      notifyAll();
    }
    this.queue.add(item);
  }

  public synchronized Object dequeue()
  throws InterruptedException{
    // 队列空,阻塞其他线程
    while(this.queue.size() == 0){
      wait();
    }
    // 队列满,唤醒线程开始dequeue
    if(this.queue.size() == this.limit){
      notifyAll();
    }
    return this.queue.remove(0);
  }

}
    

线程池

定义

把并发任务传递给一个线程池,来替代为每个并发执行的任务启动一个新线程。如果线程池里面有空闲线程,任务就会分配给线程进行执行。

任务在线程池内部被插入一个阻塞队列(Blocking Queue),线程池里的线程会去取这个队列里的任务。

简单实现

public class ThreadPool {

    private BlockingQueue taskQueue = null;
    private List<PoolThread> threads = new ArrayList<PoolThread>();
    private boolean isStopped = false;

    public ThreadPool(int noOfThreads, int maxNoOfTasks){
        // 初始化阻塞队列和线程池中的线程
        taskQueue = new BlockingQueue(maxNoOfTasks);
        for(int i=0; i<noOfThreads; i++){
            threads.add(new PoolThread(taskQueue));
        }
        // 启动线程
        for(PoolThread thread : threads){
            thread.start();
        }
    }

    public synchronized void  execute(Runnable task) throws Exception{
        if(this.isStopped) throw
            new IllegalStateException("ThreadPool is stopped");
        // 将任务插入阻塞队列
        this.taskQueue.enqueue(task);
    }

    public synchronized void stop(){
        this.isStopped = true;
        for(PoolThread thread : threads){
           thread.doStop();
        }
    }

}


public class PoolThread extends Thread {

    private BlockingQueue taskQueue = null;
    private boolean isStopped = false;

    public PoolThread(BlockingQueue queue){
        taskQueue = queue;
    }

    public void run(){
        while(!isStopped()){
            try{
                Runnable runnable = (Runnable) taskQueue.dequeue();
                runnable.run();
            } catch(Exception e){
                //log or otherwise report exception,
                //but keep pool thread alive.
            }
        }
    }

    public synchronized void doStop(){
        isStopped = true;
        this.interrupt(); //break pool thread out of dequeue() call.
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容