生产者消费者问题

一、概念

生产者-消费者模式是多线程并发协作的经典模式。所谓的生产者-消费者问题,实际上包含两类的线程, 一种是生产者线程用于生产数据,另外一种的消费者线程用于消费数据。为了解耦生产者和消费者之间的关系,通常采用共享内存的方式(共享数据区域)。生产者只需要把生产的数据放到共享数据区域,而不需要关心消费者的行为。消费者只需要到共享数据区域取数据,而不需要关心生产者的行为。共享数据区域应该要具备以下线程并发协作的功能。

  • 如果共享数据区域满了,阻塞生产者继续生产数据放入其中。
  • 如果共享数据区域空了,阻塞消费者继续消费数据。

在JAVA中,实现生产者消费者问题,可以采用以下三种方式。

  • 使用Object的wait/notify的消息通知机制;
  • 使用Lock的Condition的await和signal的消息通知机制。
  • 使用BlockingQueue实现。

二、wait/notify的消息通知机制

wait/notifyAll实现生产者-消费者:

public class ProductorConsumer {


public static void main(String[] args) {

    LinkedList linkedList = new LinkedList();
    ExecutorService service = Executors.newFixedThreadPool(15);
    for (int i = 0; i < 5; i++) {
        service.submit(new Productor(linkedList, 8));
    }

    for (int i = 0; i < 10; i++) {
        service.submit(new Consumer(linkedList));
    }

}

static class Productor implements Runnable {

    private List<Integer> list;
    private int maxLength;

    public Productor(List list, int maxLength) {
        this.list = list;
        this.maxLength = maxLength;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (list) {
                try {
                    while (list.size() == maxLength) {
                        System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                        list.wait();
                        System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                    list.add(i);
                    list.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}


static class Consumer implements Runnable {

    private List<Integer> list;

    public Consumer(List list) {
        this.list = list;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (list) {
                try {
                    while (list.isEmpty()) {
                        System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                        list.wait();
                        System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Integer element = list.remove(0);
                    System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                    list.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

}

三、await/signalAll

使用Lock中Condition的await/signalAll实现生产者-消费者

public class ProductorConsumer {

private static ReentrantLock lock = new ReentrantLock();
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();

public static void main(String[] args) {
    LinkedList linkedList = new LinkedList();
    ExecutorService service = Executors.newFixedThreadPool(15);
    for (int i = 0; i < 5; i++) {
        service.submit(new Productor(linkedList, 8, lock));
    }
    for (int i = 0; i < 10; i++) {
        service.submit(new Consumer(linkedList, lock));
    }

}

static class Productor implements Runnable {

    private List<Integer> list;
    private int maxLength;
    private Lock lock;

    public Productor(List list, int maxLength, Lock lock) {
        this.list = list;
        this.maxLength = maxLength;
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                while (list.size() == maxLength) {
                    System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                    full.await();
                    System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                }
                Random random = new Random();
                int i = random.nextInt();
                System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                list.add(i);
                empty.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}


static class Consumer implements Runnable {

    private List<Integer> list;
    private Lock lock;

    public Consumer(List list, Lock lock) {
        this.list = list;
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                while (list.isEmpty()) {
                    System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                    empty.await();
                    System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                }
                Integer element = list.remove(0);
                System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                full.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}

}

四、BlockingQueue

使用BlockingQueue实现生产者-消费者

public class ProductorConsumer {

    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(queue));
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(queue));
        }
    }


    static class Productor implements Runnable {

        private BlockingQueue queue;

        public Productor(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
                    queue.put(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        private BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer element = (Integer) queue.take();
                    System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

参考文章

https://juejin.im/post/5aeec675f265da0b7c072c56

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容