生产者与消费者封装

概要

生产者Producer 生产某个对象(共享资源),放在缓冲池中,然后消费者从缓冲池中取出这个对象。也就是生产者生产一个,消费者取出一个。这样进行循环。

封装理念

单纯的说生产者与消费我也不能说得很清楚,就拿生活中的事情来举个例子。某东上销售IPhone,首先商品有一定的库存,而库存怎么来的呢?就是IPhone的代理工厂生产的,也许一个,也许多个,这样就形成了我们的生产者。很多用户都去购买IPhone,这个肯定是我们的消费者了。如果没有货,消费者就需要等待代理工厂为某东供货。如果下架了该商品,那么代理工厂就停止生产,消费者买完了库存中的IPhone也就不能再购买了。

实战

  1. 生产线,管理生产者和消费者。包括了注册和移除等相关操作
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 生产线,管理生产者和消费者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public class PipeLine<T> {
    /**
     * 存储生产者与生产者线程
     */
    private Map<Producer<T>, Thread> producers;

    /**
     * 存储消费者与消费者线程
     */
    private Map<Consumer<T>, Thread> consumers;

    /**
     * 重入锁
     */
    private Lock lock;

    /**
     * 等待计数器
     */
    private CountDownLatch latch;

    /**
     * 标志产线是否正在运行了
     */
    private volatile boolean started;

    /**
     * 阻塞队列容量
     */
    private int capacity;

    /**
     * 阻塞队列
     */
    private BlockingQueue<T> queue;


    /**
     * @param capacity 阻塞队列容量
     */
    public PipeLine(int capacity) {
        this.queue = new ArrayBlockingQueue<T>(capacity);
        this.producers = new ConcurrentHashMap<>();
        this.consumers = new ConcurrentHashMap<>();
        this.lock = new ReentrantLock();
        this.capacity = capacity;
    }

    /**
     * 注册生产者{@link com.melon.other.pipeline.Producer}
     *
     * @param producer 生产者
     * @return 生产线
     */
    public PipeLine<T> registProducer(Producer<T> producer) {
        this.lock.lock();
        try {
            producer.bindPipeLine(this);
            this.producers.put(producer, new Thread(producer));
        } finally {
            this.lock.unlock();
        }
        System.out.println(String.format("Regist producer[%s],count %s", producer.getId(), this.producers.size()));
        return this;
    }

    /**
     * 移除生产者
     *
     * @param producer 生产者
     */
    void unregistProducer(Producer<T> producer) {
        lock.lock();
        try {
            producers.remove(producer);
            System.out.println(String.format("Unregist producer[%s], count %s", producer.getId(), producers.size()));
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }


    /**
     * 注册消费者{@link com.melon.other.pipeline.Consumer}
     *
     * @param consumer 消费者
     * @return 生产线
     */
    public PipeLine<T> registConsumer(Consumer<T> consumer) {
        this.lock.lock();
        try {
            consumer.bindPipeLine(this);
            this.consumers.put(consumer, new Thread(consumer));
        } finally {
            this.lock.unlock();
        }
        System.out.println(String.format("Regist consumer[%s],count %s", consumer.getId(), this.consumers.size()));
        return this;
    }

    /**
     * 移除消费者
     *
     * @param consumer 消费者
     */
    void unregistConsumer(Consumer<T> consumer) {
        lock.lock();
        try {
            consumers.remove(consumer);
            System.out.println(String.format("Unregist consumer[%s], count %s", consumer.getId(), consumers.size()));
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }

    /**
     * 生产者向产线添加产品
     *
     * @param t 产品
     * @throws InterruptedException 异常
     */
    void push(T t) throws InterruptedException {
        if (queue.size() == capacity)
            System.out.println("【PipeLine full】");
        queue.put(t);
    }

    /**
     * 消费者从队列中拿出一个产品
     *
     * @return 产品
     * @throws InterruptedException 异常
     */
    T poll() throws InterruptedException {
        return queue.poll(1, TimeUnit.SECONDS);
    }

    /**
     * 获取队列容量
     *
     * @return int
     */
    public int size() {
        return queue.size();
    }

    /**
     * 获取生产者数量
     *
     * @return int
     */
    public int getProducerCount() {
        lock.lock();
        try {
            if (!queue.isEmpty()) //如果还有产品就表示还有生产者
                return 1;
            return producers.size();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 启动产线
     *
     * @return 产线
     */
    public PipeLine<T> start() {
        lock.lock();
        try {
            if (started) {
                System.out.println("PipeLine has been started!");
                return this;
            }
            //计数器总共的个数是生产者和消费者的综合
            latch = new CountDownLatch(producers.size() + consumers.size());
            //循环启动生产者
            for(Thread producer:producers.values()){
                producer.start();
            }
            //循环启动消费者
            for(Thread consumer:consumers.values()){
                consumer.start();
            }
            //切换标识
            started = true;
            return this;
        } finally {
            lock.unlock();
        }
    }

    public void await() throws InterruptedException {
        latch.await();
    }

}

  1. 生产者
/**
 * 生产者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public abstract class Producer<T> implements Runnable {

    /**
     * 生产线{@link com.melon.other.pipeline.PipeLine}
     */
    private PipeLine<T> pipeLine;


    @Override
    public void run() {
        try {
            for (; ; ) {
                T t = this.produce();
                if (t == null) { //生产者返回空,就表示这个线程停止生产,需要在finally移除当前的生产者
                    break;
                }
                pipeLine.push(t);
                System.out.println(String.format("Producer[%s] produced,PipeLine size: %s", this.getId(), this.pipeLine.size()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pipeLine.unregistProducer(this);
        }
    }

    /**
     * 绑定生产线
     *
     * @param pipeLine 生产线
     */
    void bindPipeLine(PipeLine<T> pipeLine) {
        this.pipeLine = pipeLine;
    }

    /**
     * 获取当前生产者的线程id
     *
     * @return id - long
     */
    protected long getId() {
        return Thread.currentThread().getId();
    }

    /**
     * 生产产品
     *
     * @return T
     * @throws Exception 异常
     */
    protected abstract T produce() throws Exception;

}

  1. 消费者
/**
 * 消费者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public abstract class Consumer<T> implements Runnable {

    /**
     * 生产线{@link com.melon.other.pipeline.PipeLine}
     */
    private PipeLine<T> pipeLine;

    @Override
    public void run() {
        try {
            for (; ; ) {
                T t = pipeLine.poll(); //拿一个产品
                if (t == null && pipeLine.getProducerCount() == 0) { //如果产品没有拿到,并且没有生产者了,那么就要移除消费者
                    System.out.println(String.format("【Consumer[%s] hungry】", getId()));
                    break;
                }
                consume(t);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pipeLine.unregistConsumer(this);
        }
    }

    /**
     * 绑定生产线
     *
     * @param pipeLine 生产线
     */
    void bindPipeLine(PipeLine<T> pipeLine) {
        this.pipeLine = pipeLine;
    }

    /**
     * 获取当前消费者的线程id
     *
     * @return id - long
     */
    protected long getId() {
        return Thread.currentThread().getId();
    }

    /**
     * 消费
     *
     * @param t 产品
     * @throws Exception 异常
     */
    protected abstract void consume(T t) throws Exception;
}

  1. 测试
  
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public class PipeT {
    private static AtomicInteger num = new AtomicInteger();

    public static void main(String[] args) {
        new PipeLine<String>(10)
                .registProducer(new P())
                .registProducer(new P())
                .registConsumer(new C())
                .registConsumer(new C())
                .registConsumer(new C())
                .start();
    }

    public static class P extends Producer<String> {

        @Override
        protected String produce() {
            if (num.get() >= 50) {
                return null;
            }
            return getId() + " this " + num.getAndIncrement();
        }
    }

    public static class C extends Consumer<String> {

        @Override
        protected void consume(String s) {
            System.out.println(s + ">>" + getId());
        }
    }
}
  1. 运行结果
Regist producer[1],count 1
Regist producer[1],count 2
Regist consumer[1],count 1
Regist consumer[1],count 2
Regist consumer[1],count 3
Producer[10] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[10] produced,PipeLine size: 4
Producer[10] produced,PipeLine size: 5
Producer[10] produced,PipeLine size: 6
Producer[10] produced,PipeLine size: 7
Producer[10] produced,PipeLine size: 8
Producer[10] produced,PipeLine size: 9
Producer[10] produced,PipeLine size: 10
【PipeLine full】
10 this 0>>14
10 this 2>>14
10 this 3>>14
10 this 4>>14
10 this 5>>14
10 this 6>>14
10 this 7>>14
10 this 8>>14
10 this 9>>14
10 this 10>>14
Producer[10] produced,PipeLine size: 9
10 this 11>>14
10 this 1>>12
Producer[10] produced,PipeLine size: 1
10 this 12>>14
Producer[10] produced,PipeLine size: 1
10 this 13>>13
Producer[10] produced,PipeLine size: 1
10 this 14>>12
Producer[10] produced,PipeLine size: 1
10 this 15>>14
Producer[10] produced,PipeLine size: 1
10 this 16>>13
Producer[10] produced,PipeLine size: 1
10 this 17>>12
Producer[10] produced,PipeLine size: 1
10 this 18>>14
Producer[10] produced,PipeLine size: 1
10 this 19>>13
11 this 20>>12
Producer[11] produced,PipeLine size: 1
11 this 21>>14
Producer[11] produced,PipeLine size: 1
11 this 22>>13
Producer[11] produced,PipeLine size: 1
11 this 23>>12
Producer[11] produced,PipeLine size: 1
11 this 24>>14
Producer[11] produced,PipeLine size: 0
11 this 25>>13
Producer[11] produced,PipeLine size: 1
11 this 26>>12
Producer[11] produced,PipeLine size: 1
11 this 27>>14
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
11 this 28>>13
10 this 29>>13
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[11] produced,PipeLine size: 4
10 this 30>>14
11 this 31>>12
10 this 32>>12
Producer[10] produced,PipeLine size: 5
11 this 35>>14
10 this 36>>14
10 this 34>>13
Producer[10] produced,PipeLine size: 1
11 this 33>>12
10 this 37>>14
Producer[11] produced,PipeLine size: 0
Producer[10] produced,PipeLine size: 1
11 this 38>>13
Producer[11] produced,PipeLine size: 1
10 this 39>>14
11 this 40>>12
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[11] produced,PipeLine size: 4
Producer[10] produced,PipeLine size: 5
Producer[11] produced,PipeLine size: 6
Producer[10] produced,PipeLine size: 7
Producer[11] produced,PipeLine size: 8
Producer[10] produced,PipeLine size: 9
10 this 41>>13
11 this 44>>13
10 this 45>>13
11 this 46>>13
10 this 47>>13
11 this 48>>13
10 this 49>>13
10 this 43>>12
11 this 42>>14
Unregist producer[11], count 1
Unregist producer[10], count 0
【Consumer[13] hungry】
【Consumer[12] hungry】
Unregist consumer[13], count 2
【Consumer[14] hungry】
Unregist consumer[12], count 1
Unregist consumer[14], count 0
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,273评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,349评论 3 398
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,709评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,520评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,515评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,158评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,755评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,660评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,203评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,287评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,427评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,122评论 5 349
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,801评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,272评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,393评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,808评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,440评论 2 359

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,109评论 1 32
  • 简介 在实际的软件开发过程中,经常会碰到如下场景: 某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模...
    RadioWaves阅读 6,335评论 2 16
  • 用快乐去奔跑,用心灵去倾听,用思维去发展,用努力去奋斗,用目标去衡量,用大爱去生活。
    大道至简活出精彩阅读 160评论 0 1
  • 李王的这本全书真的是爆炸咯,搞得我最烦的居然是初等数学,各种求导,还得相加,看到就爆炸了,静下心慢慢算也得算好久的...
    烧鸭阅读 223评论 0 0
  • “明天就是正式上班的日子,不知道会有多少人,今天晚上难以安眠。放假之前挤压的工作,会从明天开始来得更凶猛一些...
    蓝天碧海30阅读 662评论 0 0