概要
生产者Producer 生产某个对象(共享资源),放在缓冲池中,然后消费者从缓冲池中取出这个对象。也就是生产者生产一个,消费者取出一个。这样进行循环。
封装理念
单纯的说生产者与消费我也不能说得很清楚,就拿生活中的事情来举个例子。某东上销售IPhone,首先商品有一定的库存,而库存怎么来的呢?就是IPhone的代理工厂生产的,也许一个,也许多个,这样就形成了我们的生产者。很多用户都去购买IPhone,这个肯定是我们的消费者了。如果没有货,消费者就需要等待代理工厂为某东供货。如果下架了该商品,那么代理工厂就停止生产,消费者买完了库存中的IPhone也就不能再购买了。
实战
- 生产线,管理生产者和消费者。包括了注册和移除等相关操作
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 生产线,管理生产者和消费者
*
* @author melon
* @version 1.0
* @since JDK1.8
*/
public class PipeLine<T> {
/**
* 存储生产者与生产者线程
*/
private Map<Producer<T>, Thread> producers;
/**
* 存储消费者与消费者线程
*/
private Map<Consumer<T>, Thread> consumers;
/**
* 重入锁
*/
private Lock lock;
/**
* 等待计数器
*/
private CountDownLatch latch;
/**
* 标志产线是否正在运行了
*/
private volatile boolean started;
/**
* 阻塞队列容量
*/
private int capacity;
/**
* 阻塞队列
*/
private BlockingQueue<T> queue;
/**
* @param capacity 阻塞队列容量
*/
public PipeLine(int capacity) {
this.queue = new ArrayBlockingQueue<T>(capacity);
this.producers = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.capacity = capacity;
}
/**
* 注册生产者{@link com.melon.other.pipeline.Producer}
*
* @param producer 生产者
* @return 生产线
*/
public PipeLine<T> registProducer(Producer<T> producer) {
this.lock.lock();
try {
producer.bindPipeLine(this);
this.producers.put(producer, new Thread(producer));
} finally {
this.lock.unlock();
}
System.out.println(String.format("Regist producer[%s],count %s", producer.getId(), this.producers.size()));
return this;
}
/**
* 移除生产者
*
* @param producer 生产者
*/
void unregistProducer(Producer<T> producer) {
lock.lock();
try {
producers.remove(producer);
System.out.println(String.format("Unregist producer[%s], count %s", producer.getId(), producers.size()));
} finally {
latch.countDown();
lock.unlock();
}
}
/**
* 注册消费者{@link com.melon.other.pipeline.Consumer}
*
* @param consumer 消费者
* @return 生产线
*/
public PipeLine<T> registConsumer(Consumer<T> consumer) {
this.lock.lock();
try {
consumer.bindPipeLine(this);
this.consumers.put(consumer, new Thread(consumer));
} finally {
this.lock.unlock();
}
System.out.println(String.format("Regist consumer[%s],count %s", consumer.getId(), this.consumers.size()));
return this;
}
/**
* 移除消费者
*
* @param consumer 消费者
*/
void unregistConsumer(Consumer<T> consumer) {
lock.lock();
try {
consumers.remove(consumer);
System.out.println(String.format("Unregist consumer[%s], count %s", consumer.getId(), consumers.size()));
} finally {
latch.countDown();
lock.unlock();
}
}
/**
* 生产者向产线添加产品
*
* @param t 产品
* @throws InterruptedException 异常
*/
void push(T t) throws InterruptedException {
if (queue.size() == capacity)
System.out.println("【PipeLine full】");
queue.put(t);
}
/**
* 消费者从队列中拿出一个产品
*
* @return 产品
* @throws InterruptedException 异常
*/
T poll() throws InterruptedException {
return queue.poll(1, TimeUnit.SECONDS);
}
/**
* 获取队列容量
*
* @return int
*/
public int size() {
return queue.size();
}
/**
* 获取生产者数量
*
* @return int
*/
public int getProducerCount() {
lock.lock();
try {
if (!queue.isEmpty()) //如果还有产品就表示还有生产者
return 1;
return producers.size();
} finally {
lock.unlock();
}
}
/**
* 启动产线
*
* @return 产线
*/
public PipeLine<T> start() {
lock.lock();
try {
if (started) {
System.out.println("PipeLine has been started!");
return this;
}
//计数器总共的个数是生产者和消费者的综合
latch = new CountDownLatch(producers.size() + consumers.size());
//循环启动生产者
for(Thread producer:producers.values()){
producer.start();
}
//循环启动消费者
for(Thread consumer:consumers.values()){
consumer.start();
}
//切换标识
started = true;
return this;
} finally {
lock.unlock();
}
}
public void await() throws InterruptedException {
latch.await();
}
}
- 生产者
/**
* 生产者
*
* @author melon
* @version 1.0
* @since JDK1.8
*/
public abstract class Producer<T> implements Runnable {
/**
* 生产线{@link com.melon.other.pipeline.PipeLine}
*/
private PipeLine<T> pipeLine;
@Override
public void run() {
try {
for (; ; ) {
T t = this.produce();
if (t == null) { //生产者返回空,就表示这个线程停止生产,需要在finally移除当前的生产者
break;
}
pipeLine.push(t);
System.out.println(String.format("Producer[%s] produced,PipeLine size: %s", this.getId(), this.pipeLine.size()));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pipeLine.unregistProducer(this);
}
}
/**
* 绑定生产线
*
* @param pipeLine 生产线
*/
void bindPipeLine(PipeLine<T> pipeLine) {
this.pipeLine = pipeLine;
}
/**
* 获取当前生产者的线程id
*
* @return id - long
*/
protected long getId() {
return Thread.currentThread().getId();
}
/**
* 生产产品
*
* @return T
* @throws Exception 异常
*/
protected abstract T produce() throws Exception;
}
- 消费者
/**
* 消费者
*
* @author melon
* @version 1.0
* @since JDK1.8
*/
public abstract class Consumer<T> implements Runnable {
/**
* 生产线{@link com.melon.other.pipeline.PipeLine}
*/
private PipeLine<T> pipeLine;
@Override
public void run() {
try {
for (; ; ) {
T t = pipeLine.poll(); //拿一个产品
if (t == null && pipeLine.getProducerCount() == 0) { //如果产品没有拿到,并且没有生产者了,那么就要移除消费者
System.out.println(String.format("【Consumer[%s] hungry】", getId()));
break;
}
consume(t);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pipeLine.unregistConsumer(this);
}
}
/**
* 绑定生产线
*
* @param pipeLine 生产线
*/
void bindPipeLine(PipeLine<T> pipeLine) {
this.pipeLine = pipeLine;
}
/**
* 获取当前消费者的线程id
*
* @return id - long
*/
protected long getId() {
return Thread.currentThread().getId();
}
/**
* 消费
*
* @param t 产品
* @throws Exception 异常
*/
protected abstract void consume(T t) throws Exception;
}
- 测试
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author melon
* @version 1.0
* @since JDK1.8
*/
public class PipeT {
private static AtomicInteger num = new AtomicInteger();
public static void main(String[] args) {
new PipeLine<String>(10)
.registProducer(new P())
.registProducer(new P())
.registConsumer(new C())
.registConsumer(new C())
.registConsumer(new C())
.start();
}
public static class P extends Producer<String> {
@Override
protected String produce() {
if (num.get() >= 50) {
return null;
}
return getId() + " this " + num.getAndIncrement();
}
}
public static class C extends Consumer<String> {
@Override
protected void consume(String s) {
System.out.println(s + ">>" + getId());
}
}
}
- 运行结果
Regist producer[1],count 1
Regist producer[1],count 2
Regist consumer[1],count 1
Regist consumer[1],count 2
Regist consumer[1],count 3
Producer[10] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[10] produced,PipeLine size: 4
Producer[10] produced,PipeLine size: 5
Producer[10] produced,PipeLine size: 6
Producer[10] produced,PipeLine size: 7
Producer[10] produced,PipeLine size: 8
Producer[10] produced,PipeLine size: 9
Producer[10] produced,PipeLine size: 10
【PipeLine full】
10 this 0>>14
10 this 2>>14
10 this 3>>14
10 this 4>>14
10 this 5>>14
10 this 6>>14
10 this 7>>14
10 this 8>>14
10 this 9>>14
10 this 10>>14
Producer[10] produced,PipeLine size: 9
10 this 11>>14
10 this 1>>12
Producer[10] produced,PipeLine size: 1
10 this 12>>14
Producer[10] produced,PipeLine size: 1
10 this 13>>13
Producer[10] produced,PipeLine size: 1
10 this 14>>12
Producer[10] produced,PipeLine size: 1
10 this 15>>14
Producer[10] produced,PipeLine size: 1
10 this 16>>13
Producer[10] produced,PipeLine size: 1
10 this 17>>12
Producer[10] produced,PipeLine size: 1
10 this 18>>14
Producer[10] produced,PipeLine size: 1
10 this 19>>13
11 this 20>>12
Producer[11] produced,PipeLine size: 1
11 this 21>>14
Producer[11] produced,PipeLine size: 1
11 this 22>>13
Producer[11] produced,PipeLine size: 1
11 this 23>>12
Producer[11] produced,PipeLine size: 1
11 this 24>>14
Producer[11] produced,PipeLine size: 0
11 this 25>>13
Producer[11] produced,PipeLine size: 1
11 this 26>>12
Producer[11] produced,PipeLine size: 1
11 this 27>>14
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
11 this 28>>13
10 this 29>>13
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[11] produced,PipeLine size: 4
10 this 30>>14
11 this 31>>12
10 this 32>>12
Producer[10] produced,PipeLine size: 5
11 this 35>>14
10 this 36>>14
10 this 34>>13
Producer[10] produced,PipeLine size: 1
11 this 33>>12
10 this 37>>14
Producer[11] produced,PipeLine size: 0
Producer[10] produced,PipeLine size: 1
11 this 38>>13
Producer[11] produced,PipeLine size: 1
10 this 39>>14
11 this 40>>12
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[11] produced,PipeLine size: 4
Producer[10] produced,PipeLine size: 5
Producer[11] produced,PipeLine size: 6
Producer[10] produced,PipeLine size: 7
Producer[11] produced,PipeLine size: 8
Producer[10] produced,PipeLine size: 9
10 this 41>>13
11 this 44>>13
10 this 45>>13
11 this 46>>13
10 this 47>>13
11 this 48>>13
10 this 49>>13
10 this 43>>12
11 this 42>>14
Unregist producer[11], count 1
Unregist producer[10], count 0
【Consumer[13] hungry】
【Consumer[12] hungry】
Unregist consumer[13], count 2
【Consumer[14] hungry】
Unregist consumer[12], count 1
Unregist consumer[14], count 0