生产者与消费者封装

概要

生产者Producer 生产某个对象(共享资源),放在缓冲池中,然后消费者从缓冲池中取出这个对象。也就是生产者生产一个,消费者取出一个。这样进行循环。

封装理念

单纯的说生产者与消费我也不能说得很清楚,就拿生活中的事情来举个例子。某东上销售IPhone,首先商品有一定的库存,而库存怎么来的呢?就是IPhone的代理工厂生产的,也许一个,也许多个,这样就形成了我们的生产者。很多用户都去购买IPhone,这个肯定是我们的消费者了。如果没有货,消费者就需要等待代理工厂为某东供货。如果下架了该商品,那么代理工厂就停止生产,消费者买完了库存中的IPhone也就不能再购买了。

实战

  1. 生产线,管理生产者和消费者。包括了注册和移除等相关操作
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 生产线,管理生产者和消费者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public class PipeLine<T> {
    /**
     * 存储生产者与生产者线程
     */
    private Map<Producer<T>, Thread> producers;

    /**
     * 存储消费者与消费者线程
     */
    private Map<Consumer<T>, Thread> consumers;

    /**
     * 重入锁
     */
    private Lock lock;

    /**
     * 等待计数器
     */
    private CountDownLatch latch;

    /**
     * 标志产线是否正在运行了
     */
    private volatile boolean started;

    /**
     * 阻塞队列容量
     */
    private int capacity;

    /**
     * 阻塞队列
     */
    private BlockingQueue<T> queue;


    /**
     * @param capacity 阻塞队列容量
     */
    public PipeLine(int capacity) {
        this.queue = new ArrayBlockingQueue<T>(capacity);
        this.producers = new ConcurrentHashMap<>();
        this.consumers = new ConcurrentHashMap<>();
        this.lock = new ReentrantLock();
        this.capacity = capacity;
    }

    /**
     * 注册生产者{@link com.melon.other.pipeline.Producer}
     *
     * @param producer 生产者
     * @return 生产线
     */
    public PipeLine<T> registProducer(Producer<T> producer) {
        this.lock.lock();
        try {
            producer.bindPipeLine(this);
            this.producers.put(producer, new Thread(producer));
        } finally {
            this.lock.unlock();
        }
        System.out.println(String.format("Regist producer[%s],count %s", producer.getId(), this.producers.size()));
        return this;
    }

    /**
     * 移除生产者
     *
     * @param producer 生产者
     */
    void unregistProducer(Producer<T> producer) {
        lock.lock();
        try {
            producers.remove(producer);
            System.out.println(String.format("Unregist producer[%s], count %s", producer.getId(), producers.size()));
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }


    /**
     * 注册消费者{@link com.melon.other.pipeline.Consumer}
     *
     * @param consumer 消费者
     * @return 生产线
     */
    public PipeLine<T> registConsumer(Consumer<T> consumer) {
        this.lock.lock();
        try {
            consumer.bindPipeLine(this);
            this.consumers.put(consumer, new Thread(consumer));
        } finally {
            this.lock.unlock();
        }
        System.out.println(String.format("Regist consumer[%s],count %s", consumer.getId(), this.consumers.size()));
        return this;
    }

    /**
     * 移除消费者
     *
     * @param consumer 消费者
     */
    void unregistConsumer(Consumer<T> consumer) {
        lock.lock();
        try {
            consumers.remove(consumer);
            System.out.println(String.format("Unregist consumer[%s], count %s", consumer.getId(), consumers.size()));
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }

    /**
     * 生产者向产线添加产品
     *
     * @param t 产品
     * @throws InterruptedException 异常
     */
    void push(T t) throws InterruptedException {
        if (queue.size() == capacity)
            System.out.println("【PipeLine full】");
        queue.put(t);
    }

    /**
     * 消费者从队列中拿出一个产品
     *
     * @return 产品
     * @throws InterruptedException 异常
     */
    T poll() throws InterruptedException {
        return queue.poll(1, TimeUnit.SECONDS);
    }

    /**
     * 获取队列容量
     *
     * @return int
     */
    public int size() {
        return queue.size();
    }

    /**
     * 获取生产者数量
     *
     * @return int
     */
    public int getProducerCount() {
        lock.lock();
        try {
            if (!queue.isEmpty()) //如果还有产品就表示还有生产者
                return 1;
            return producers.size();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 启动产线
     *
     * @return 产线
     */
    public PipeLine<T> start() {
        lock.lock();
        try {
            if (started) {
                System.out.println("PipeLine has been started!");
                return this;
            }
            //计数器总共的个数是生产者和消费者的综合
            latch = new CountDownLatch(producers.size() + consumers.size());
            //循环启动生产者
            for(Thread producer:producers.values()){
                producer.start();
            }
            //循环启动消费者
            for(Thread consumer:consumers.values()){
                consumer.start();
            }
            //切换标识
            started = true;
            return this;
        } finally {
            lock.unlock();
        }
    }

    public void await() throws InterruptedException {
        latch.await();
    }

}

  1. 生产者
/**
 * 生产者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public abstract class Producer<T> implements Runnable {

    /**
     * 生产线{@link com.melon.other.pipeline.PipeLine}
     */
    private PipeLine<T> pipeLine;


    @Override
    public void run() {
        try {
            for (; ; ) {
                T t = this.produce();
                if (t == null) { //生产者返回空,就表示这个线程停止生产,需要在finally移除当前的生产者
                    break;
                }
                pipeLine.push(t);
                System.out.println(String.format("Producer[%s] produced,PipeLine size: %s", this.getId(), this.pipeLine.size()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pipeLine.unregistProducer(this);
        }
    }

    /**
     * 绑定生产线
     *
     * @param pipeLine 生产线
     */
    void bindPipeLine(PipeLine<T> pipeLine) {
        this.pipeLine = pipeLine;
    }

    /**
     * 获取当前生产者的线程id
     *
     * @return id - long
     */
    protected long getId() {
        return Thread.currentThread().getId();
    }

    /**
     * 生产产品
     *
     * @return T
     * @throws Exception 异常
     */
    protected abstract T produce() throws Exception;

}

  1. 消费者
/**
 * 消费者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public abstract class Consumer<T> implements Runnable {

    /**
     * 生产线{@link com.melon.other.pipeline.PipeLine}
     */
    private PipeLine<T> pipeLine;

    @Override
    public void run() {
        try {
            for (; ; ) {
                T t = pipeLine.poll(); //拿一个产品
                if (t == null && pipeLine.getProducerCount() == 0) { //如果产品没有拿到,并且没有生产者了,那么就要移除消费者
                    System.out.println(String.format("【Consumer[%s] hungry】", getId()));
                    break;
                }
                consume(t);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pipeLine.unregistConsumer(this);
        }
    }

    /**
     * 绑定生产线
     *
     * @param pipeLine 生产线
     */
    void bindPipeLine(PipeLine<T> pipeLine) {
        this.pipeLine = pipeLine;
    }

    /**
     * 获取当前消费者的线程id
     *
     * @return id - long
     */
    protected long getId() {
        return Thread.currentThread().getId();
    }

    /**
     * 消费
     *
     * @param t 产品
     * @throws Exception 异常
     */
    protected abstract void consume(T t) throws Exception;
}

  1. 测试
  
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public class PipeT {
    private static AtomicInteger num = new AtomicInteger();

    public static void main(String[] args) {
        new PipeLine<String>(10)
                .registProducer(new P())
                .registProducer(new P())
                .registConsumer(new C())
                .registConsumer(new C())
                .registConsumer(new C())
                .start();
    }

    public static class P extends Producer<String> {

        @Override
        protected String produce() {
            if (num.get() >= 50) {
                return null;
            }
            return getId() + " this " + num.getAndIncrement();
        }
    }

    public static class C extends Consumer<String> {

        @Override
        protected void consume(String s) {
            System.out.println(s + ">>" + getId());
        }
    }
}
  1. 运行结果
Regist producer[1],count 1
Regist producer[1],count 2
Regist consumer[1],count 1
Regist consumer[1],count 2
Regist consumer[1],count 3
Producer[10] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[10] produced,PipeLine size: 4
Producer[10] produced,PipeLine size: 5
Producer[10] produced,PipeLine size: 6
Producer[10] produced,PipeLine size: 7
Producer[10] produced,PipeLine size: 8
Producer[10] produced,PipeLine size: 9
Producer[10] produced,PipeLine size: 10
【PipeLine full】
10 this 0>>14
10 this 2>>14
10 this 3>>14
10 this 4>>14
10 this 5>>14
10 this 6>>14
10 this 7>>14
10 this 8>>14
10 this 9>>14
10 this 10>>14
Producer[10] produced,PipeLine size: 9
10 this 11>>14
10 this 1>>12
Producer[10] produced,PipeLine size: 1
10 this 12>>14
Producer[10] produced,PipeLine size: 1
10 this 13>>13
Producer[10] produced,PipeLine size: 1
10 this 14>>12
Producer[10] produced,PipeLine size: 1
10 this 15>>14
Producer[10] produced,PipeLine size: 1
10 this 16>>13
Producer[10] produced,PipeLine size: 1
10 this 17>>12
Producer[10] produced,PipeLine size: 1
10 this 18>>14
Producer[10] produced,PipeLine size: 1
10 this 19>>13
11 this 20>>12
Producer[11] produced,PipeLine size: 1
11 this 21>>14
Producer[11] produced,PipeLine size: 1
11 this 22>>13
Producer[11] produced,PipeLine size: 1
11 this 23>>12
Producer[11] produced,PipeLine size: 1
11 this 24>>14
Producer[11] produced,PipeLine size: 0
11 this 25>>13
Producer[11] produced,PipeLine size: 1
11 this 26>>12
Producer[11] produced,PipeLine size: 1
11 this 27>>14
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
11 this 28>>13
10 this 29>>13
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[11] produced,PipeLine size: 4
10 this 30>>14
11 this 31>>12
10 this 32>>12
Producer[10] produced,PipeLine size: 5
11 this 35>>14
10 this 36>>14
10 this 34>>13
Producer[10] produced,PipeLine size: 1
11 this 33>>12
10 this 37>>14
Producer[11] produced,PipeLine size: 0
Producer[10] produced,PipeLine size: 1
11 this 38>>13
Producer[11] produced,PipeLine size: 1
10 this 39>>14
11 this 40>>12
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[11] produced,PipeLine size: 4
Producer[10] produced,PipeLine size: 5
Producer[11] produced,PipeLine size: 6
Producer[10] produced,PipeLine size: 7
Producer[11] produced,PipeLine size: 8
Producer[10] produced,PipeLine size: 9
10 this 41>>13
11 this 44>>13
10 this 45>>13
11 this 46>>13
10 this 47>>13
11 this 48>>13
10 this 49>>13
10 this 43>>12
11 this 42>>14
Unregist producer[11], count 1
Unregist producer[10], count 0
【Consumer[13] hungry】
【Consumer[12] hungry】
Unregist consumer[13], count 2
【Consumer[14] hungry】
Unregist consumer[12], count 1
Unregist consumer[14], count 0
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,148评论 1 32
  • 简介 在实际的软件开发过程中,经常会碰到如下场景: 某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模...
    RadioWaves阅读 6,387评论 2 16
  • 用快乐去奔跑,用心灵去倾听,用思维去发展,用努力去奋斗,用目标去衡量,用大爱去生活。
    大道至简活出精彩阅读 166评论 0 1
  • 李王的这本全书真的是爆炸咯,搞得我最烦的居然是初等数学,各种求导,还得相加,看到就爆炸了,静下心慢慢算也得算好久的...
    烧鸭阅读 294评论 0 0
  • “明天就是正式上班的日子,不知道会有多少人,今天晚上难以安眠。放假之前挤压的工作,会从明天开始来得更凶猛一些...
    蓝天碧海30阅读 688评论 0 0