BlockingQueue实现生产者消费者模式

  • 提供put方法和take方法自动实现等待

put方法源码

  • 如果空间不足够的时候就会等待
 /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

take方法源码

  • 移走队头元素,队空时等待
/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
package thread;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer3 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1);
        ProducerConsumer3.Producer p1 = new ProducerConsumer3.Producer(queue);
        ProducerConsumer3.Consumer c1 = new ProducerConsumer3.Consumer(queue);
        p1.start();
        c1.start();
        p1.join();
        c1.join();
    }

    static class Producer extends Thread {
        BlockingQueue<Integer> queue;

        Producer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true) {
                int tmp = new Random().nextInt();
                try {
                    queue.put(tmp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Producing " + tmp);
            }

        }

    }

    static class Consumer extends Thread {
        BlockingQueue<Integer> queue;

        Consumer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    System.out.println("Consuming " + queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。