阻塞队列
特性
- 队列是空的时候,从队列获取元素的操作会被阻塞
- 队列是满的时候,往队列添加元素的操作会被阻塞
阻塞队列示例
实现
public class BlockingQueue {
private List queue = new LinkedList();
private int limit = 10;
public BlockingQueue(int limit){
this.limit = limit;
}
public synchronized void enqueue(Object item)
throws InterruptedException {
// 队列满,阻塞其他线程
while(this.queue.size() == this.limit) {
wait();
}
// 队列空,唤醒阻塞线程开始enqueue
if(this.queue.size() == 0) {
notifyAll();
}
this.queue.add(item);
}
public synchronized Object dequeue()
throws InterruptedException{
// 队列空,阻塞其他线程
while(this.queue.size() == 0){
wait();
}
// 队列满,唤醒线程开始dequeue
if(this.queue.size() == this.limit){
notifyAll();
}
return this.queue.remove(0);
}
}
线程池
定义
把并发任务传递给一个线程池,来替代为每个并发执行的任务启动一个新线程。如果线程池里面有空闲线程,任务就会分配给线程进行执行。
任务在线程池内部被插入一个阻塞队列(Blocking Queue),线程池里的线程会去取这个队列里的任务。
简单实现
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThread> threads = new ArrayList<PoolThread>();
private boolean isStopped = false;
public ThreadPool(int noOfThreads, int maxNoOfTasks){
// 初始化阻塞队列和线程池中的线程
taskQueue = new BlockingQueue(maxNoOfTasks);
for(int i=0; i<noOfThreads; i++){
threads.add(new PoolThread(taskQueue));
}
// 启动线程
for(PoolThread thread : threads){
thread.start();
}
}
public synchronized void execute(Runnable task) throws Exception{
if(this.isStopped) throw
new IllegalStateException("ThreadPool is stopped");
// 将任务插入阻塞队列
this.taskQueue.enqueue(task);
}
public synchronized void stop(){
this.isStopped = true;
for(PoolThread thread : threads){
thread.doStop();
}
}
}
public class PoolThread extends Thread {
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThread(BlockingQueue queue){
taskQueue = queue;
}
public void run(){
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.dequeue();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
public synchronized void doStop(){
isStopped = true;
this.interrupt(); //break pool thread out of dequeue() call.
}
public synchronized boolean isStopped(){
return isStopped;
}
}