实现一个自定义的有界阻塞队列

实现一个自定义的有界阻塞队列. 当队列为空时,阻塞直到有可取的元素被唤醒;当队列满时,阻塞直到有空间存放元素被唤醒.

分析:
 1)为实现有界: 采用数组进行存储元素模拟队列,为了提高空间的利用率,使用循环队列
 2)为实现阻塞和唤醒,构造同步机制,使用内置锁(synchronized)或者显式锁(Lock)

1,使用内置锁(synchronized)

class BlockingQueueWithSynchronized<T> {
    private int tail, head, count;
    private T[] elems;

    BlockingQueueWithSynchronized(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("capacity需要大于0");
        }
        elems = (T[]) new Object[capacity];
    }

    public synchronized T take() throws InterruptedException {
        while (isEmpty()) {
            wait();
        }
        T t = elems[head];
        elems[head] = null;
        head++;
        if (head == elems.length) {
            head = 0;
        }
        count--;
        notifyAll();
        return t;
    }

    public synchronized void put(T t) throws InterruptedException {
        while (isFull()) {
            wait();
        }

        elems[tail] = t;
        tail++;
        if (tail == elems.length) {
            tail = 0;
        }
        count++;
        notifyAll();
    }

    public synchronized boolean isFull() {
        return elems.length == count;
    }

    public synchronized boolean isEmpty() {
        return count == 0;
    }
}

在这里每次取(take)和每次存(put)都会进行唤醒操作;但是执行唤醒操作后,被唤醒的线程可能再次被阻塞.
例如,当队列为空,进行取操作被阻塞,当放入元素后唤醒所有的线程(notifyAll),但是只有一个可用元素, 所有被唤醒的取元素线程都将进行竞争,只有一个线程能最终获取获取元素,其余取元素线程会再次被阻塞挂起.
那么将会问为何不使用notify呢?这样就可以避免唤醒所有的线程,但是问题来了, notify只能唤醒一个线程,如果唤醒的是存放元素的线程,那么取元素的线程就丧失了一次获取的机会.
所以采用synchronize可以会造成唤醒的"粒度"过大.
下面采用显式锁的方式来解决这个问题

2,显式锁(Lock)

class BlockingQueueWithLock<T> {
    private int tail, head, count;
    private T[] elems;

    private final Lock lock = new ReentrantLock();
    // 不满 ,可继续放
    private final Condition notFull = lock.newCondition();
    // 不空 ,可继续取
    private final Condition notEmpty = lock.newCondition();

    BlockingQueueWithLock(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("capacity需要大于0");
        }
        elems = (T[]) new Object[capacity];
    }
    
    public T take() throws InterruptedException{
        try{
            lock.lock();
            while(isEmpty()){
                notEmpty.await();
            }
            
            T t = elems[head];
                        //不让队列在继续引用该元素,GC可以回收
            elems[head] = null;
            head++;
            if(head == elems.length){
                head = 0;
            }
            count--;
            notFull.signal();
            return t;
        }finally{
            lock.unlock();
        }
    }
    
    public void put(T t) throws InterruptedException{
        try{
            lock.lock();
            while(isFull()){
                notFull.await();
            }
            elems[tail] = t;
            tail++;
            if(tail == elems.length){
                tail = 0;
            }
            count++;
            notEmpty.signal();
        
        }finally{
            lock.unlock();
        }
    }

    public boolean isFull() {
        try {
            lock.lock();
            return elems.length == count;
        } finally {
            lock.unlock();
        }
    }

    public boolean isEmpty() {
        try {
            lock.lock();
            return count == 0;
        } finally {
            lock.unlock();
        }
    }
}

对于上面的实现方式,唤醒线程的时候使用signal为什么不是signalAll呢?
例如,当队列满的时候取走一个元素,然后notFull.signal唤醒等待存放的线程存放元素,如果使用signalAll那么将唤醒所有阻塞的线程竞争这个一个存放的位置, 只有一个胜出,其余的将会继续被阻塞.
如果使用signal那么只会唤醒一个线程,这样减少了线程阻塞挂起和线程恢复的开销

参考:
<<java编发编程实战>>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容