Java实现一个阻塞队列

环境

  • jdk8

阻塞队列

实现一个阻塞队列,阻塞过程使用ReentrantLock锁和Condition来控制。

循环队列是如何实现的,以及实现的原理,判断栈满栈空的条件为何这样写,参考
Java语言实现一个循环队列

阻塞队列内部构造及初始化

使用ReetrantLock,进行多线程的锁的控制,以及使用Condition进行实现队列中操作等待和唤醒。

    static class BlockQueue<T> {
        private Object queue[];
        private int front;
        private int rear;
        private int maxSize;

        final private Lock lock = new ReentrantLock();
        Condition full = lock.newCondition();
        Condition empty = lock.newCondition();

        public BlockQueue(int maxSize) {
            this.front = 0;
            this.rear = 0;
            this.maxSize = maxSize;
            this.queue = new Object[maxSize];
        }
         /**
         * 阻塞 入队方法在这
         * @param element
         */
         
        /**
         * 阻塞出队方法在这
         */
    }

入队方法

判断队列是否full,如果满了之后,则进行等待出队之后empty.signl()的唤醒。
此处采用while(),而不是用if进行判断,是因为,在阻塞队列在多线程的情况下,经由signl会同时唤醒多个线程,而此时只有一个可以被消费,所以需要再次判断。


        /**
         * 阻塞 入队
         * @param element
         */
        public void put(T element) throws InterruptedException {
            lock.lock();
            try{
                while ( (rear + 1) % maxSize == front ) {
                    System.out.println("Queue is full");
                    full.await();
                }
                queue[rear] = element;
                rear = (rear + 1) % maxSize;
                empty.signal();
            } finally {
                lock.unlock();
            }
        }

出队方法

       /**
         * 阻塞出队
         */
        public T take() throws InterruptedException{
            lock.lock();
            try{
                while( rear == front ){
                    System.out.println("Queue is empty");
                    empty.await();
                }
                Object element = queue[front];
                queue[front] = null;
                front = (front+1)%maxSize;
                full.signal();
                return (T) element;
            }finally {
                lock.unlock();
            }
        }

测试用例

    public static void main(String[] args) throws InterruptedException {
        BlockQueue<Integer> queue = new BlockQueue<Integer>(4);
        queue.put(5);

        new Thread(() -> {
            try {
                System.out.println("添加");
                queue.put(11);
                queue.put(12);
                queue.put(13);
                queue.put(14);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                System.out.println("取出");
                queue.take();
                Thread.sleep(1);
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

    }

完整代码


/**
 * 实现一个阻塞队列
 */
public class BlockQueueDemo{

    public static void main(String[] args) throws InterruptedException {
        BlockQueue<Integer> queue = new BlockQueue<Integer>(4);
        queue.put(5);

        new Thread(() -> {
            try {
                System.out.println("添加");
                queue.put(11);
                queue.put(12);
                queue.put(13);
                queue.put(14);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                System.out.println("取出");
                queue.take();
                Thread.sleep(1);
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

    }

    static class BlockQueue<T> {
        private Object queue[];
        private int front;
        private int rear;
        private int maxSize;

        final private Lock lock = new ReentrantLock();
        Condition full = lock.newCondition();
        Condition empty = lock.newCondition();

        public BlockQueue(int maxSize) {
            this.front = 0;
            this.rear = 0;
            this.maxSize = maxSize;
            this.queue = new Object[maxSize];
        }

        /**
         * 阻塞入队
         * @param element
         */
        public void put(T element) throws InterruptedException {
            lock.lock();
            try{
                while ( (rear + 1) % maxSize == front ) {
                    System.out.println("Queue is full");
                    full.await();
                }
                queue[rear] = element;
                rear = (rear + 1) % maxSize;
                empty.signal();
            } finally {
                lock.unlock();
            }
        }

        /**
         * 阻塞出队
         */
        public T take() throws InterruptedException{
            lock.lock();
            try{
                while( rear == front ){
                    System.out.println("Queue is empty");
                    empty.await();
                }
                Object element = queue[front];
                queue[front] = null;
                front = (front+1)%maxSize;
                full.signal();
                return (T) element;
            }finally {
                lock.unlock();
            }
        }
    }

参考资料


如果有描述不清楚或未涉及之处,可直接留言评论,或发送邮件询问 weiycmail@163.com

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。