生产者消费者问题

生产者-消费者是十分经典的关于异步协作的问题。也有这样一种设计模式,其思想就是将用于存放消息(产物)的仓库独立出来,由这个仓库来保证线程安全。
因此,我写了一个简单的生产者消费者模式的实现,具体代码及注释如下

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author sgyh
 */
public class Repository<E> {

    private Queue<E> queue;
    private int maxSize;
    private int size;
    private ReentrantLock lock;
    private Condition full;
    private Condition empty;

    /**
     * 初始化,校验参数最大值,初始化锁,和condition,这里使用非公平锁
     * @param maxSize
     */
    public Repository(int maxSize) {
        this.queue = new LinkedList<>();
        if (maxSize <= 0) {
            throw new IllegalArgumentException("The param maxSize must be greater than zero");
        }
        this.maxSize = maxSize;
        this.lock = new ReentrantLock();
        this.full = lock.newCondition();
        this.empty = lock.newCondition();
    }

    /**
     * 存取操作,上锁,检查是否满了,满则等待,直至被唤醒后,进行添加和size增加操作
     * 操作完成后检查是否有消费者处于等待,有则唤醒(没这个判断直接唤醒也是可以的)
     * @param e
     * @throws InterruptedException
     */
    public void put(E e) throws InterruptedException {
        lock.lock();
        while (true) {
            if (size == maxSize) {
                full.await();
            } else {
                queue.add(e);
                size++;
                System.out.println("存入,剩余" + size);
                if (lock.hasWaiters(empty)) {
                    empty.signalAll();
                }
                lock.unlock();
                break;
            }
        }
    }

    public int size() {
        return size;
    }

    /**
     * 取操作,检验size,是否可取,不可取则等待,直至被唤醒,进行取操作,并size--,
     * 判断是否有生产者处于等待,有则唤醒(同上,无此判断直接唤醒也可)
     * @return
     */
    public E poll() throws InterruptedException {
        E e;
        lock.lock();
        while (true) {
            if (size == 0) {
                empty.await();
            } else {
                e = queue.poll();
                size--;
                System.out.println("取出,剩余" + size);
                if (lock.hasWaiters(full)) {
                    full.signalAll();
                }
                lock.unlock();
                break;
            }
        }
        return e;
    }

    /**
     * maxSize 修改,修改时需上锁
     * @param maxSize
     */
    public void resize(int maxSize) {
        lock.lock();
        this.maxSize = maxSize;
        if (lock.hasWaiters(full)) {
            full.signalAll();
        }
        lock.unlock();
    }

    static class Product {

        public Product() {
            super();
        }

    }

    static class Consumer<E> {
        int i = 0;
        void consume(Repository<E> repository) {
            while (i++ < 10) {
                try {
                    repository.poll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

    }

    static class Producer<E> {
        int i = 0;

        void produce(E e, Repository<E> repository) {
            while (i++ < 10) {
                try {
                    repository.put(e);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {
        final Repository<Product> repository = new Repository<>(100);
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                new Producer<Product>().produce(new Product(), repository);
            }).start();
            new Thread(() -> {
                new Consumer<Product>().consume(repository);
            }).start();
        }
    }

}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容