什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式实现(Java)
阻塞队列是实现生产者消费者模式的关键,本文介绍两种自定义阻塞队列的实现以及JDK 1.5 以后新增的java.util.concurrent
包中提供的阻塞队列类。
首先,阻塞队列接口:
package com.bytebeats.concurrent.queue;
/**
* 阻塞队列接口
*
* @author Ricky Fung
* @create 2017-03-26 17:28
*/
public interface IBlockingQueue<T> {
void put(T data) throws InterruptedException;
T take() throws InterruptedException;
}
方式1
使用 Object.wait()/notifyAll() 来实现阻塞队列。
1、阻塞队列实现
package com.bytebeats.concurrent.queue;
import java.util.LinkedList;
/**
* 使用Object.wait()/notifyAll()实现的阻塞队列
*
* @author Zixi Wang
* @create 2017-11-01 16:18
*/
public class TraditionalBlockingQueue<T> implements IBlockingQueue<T> {
private int queueSize;
private final LinkedList<T> list = new LinkedList<T>();
private final Object lock = new Object();
public TraditionalBlockingQueue(){
this(10);
}
public TraditionalBlockingQueue(int queueSize) {
if(queueSize<1){
throw new IllegalArgumentException("queueSize must be positive number");
}
this.queueSize = queueSize;
}
@Override
public void put(T data) throws InterruptedException {
synchronized (lock){
while(list.size()>=queueSize) {
lock.wait();
}
list.addLast(data);
lock.notifyAll();
}
}
@Override
public T take() throws InterruptedException {
synchronized(lock){
while(list.size()<=0) {
lock.wait();
}
T data = list.removeFirst();
lock.notifyAll();
return data;
}
}
}
注意要点
判定 LinkedList大小为0或者大于等于queueSize时须使用while (condition) {},不能使用if(condition) {}。其中while(condition)循环,它又被叫做“自旋锁”。自旋锁以及wait()和notify()方法在线程通信这篇文章中有更加详细的介绍。为防止该线程没有收到notify()调用也从wait()中返回(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。
在 take 方法取走一个元素后须调用lock.notifyAll();,如果使用lock.notify()方法在某些情况下会导致 生产者-消费者 同时处于阻塞状态。
方式2
通过Lock和Condition实现阻塞队列
package com.bytebeats.concurrent.queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 通过Lock和Condition实现阻塞队列
*
* @author Zixi Wang
* @create 2017-11-01 17:08
*/
public class ConditionBlockingQueue<T> implements IBlockingQueue<T> {
private final Object[] items;
int putptr, takeptr, count;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public ConditionBlockingQueue(){
this(10);
}
public ConditionBlockingQueue(int queueSize) {
if(queueSize<1){
throw new IllegalArgumentException("queueSize must be positive number");
}
items = new Object[queueSize];
}
@Override
public void put(T data) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[putptr] = data;
if (++putptr == items.length) {
putptr = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
@Override
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.wait();
}
T data = (T) items[takeptr];
if (++takeptr == items.length) {
takeptr = 0;
}
--count;
notFull.signal();
return data;
} finally {
lock.unlock();
}
}
}
方式3
JDK 1.5 以后新增的java.util.concurrent
包新增了java.util.concurrent. BlockingQueue
接口:
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
并提供了如下几种阻塞队列实现:
java.util.concurrent.ArrayBlockingQueue
java.util.concurrent.LinkedBlockingQueue
java.util.concurrent.SynchronousQueue
java.util.concurrent.PriorityBlockingQueue
实现生产者-消费者模型使用java.util.concurrent.ArrayBlockingQueue
或者java.util.concurrent.LinkedBlockingQueue即可。
package com.bytebeats.concurrent;
import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;
/**
*
*
* @author Zixi Wang
* @create 2017-11-01 16:16
*/
public class Producer implements Runnable {
private IBlockingQueue<String> queue;
private int consumerNum;
public Producer(IBlockingQueue<String> queue, int consumerNum) {
this.queue = queue;
this.consumerNum = consumerNum;
}
@Override
public void run() {
for(int i=0; i< 100; i++){
try {
queue.put("data_"+i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
for(int i=0; i<consumerNum; i++){ //结束符
try {
queue.put(Constant.ENDING_SYMBOL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("Producer over");
}
}
消费者
package com.bytebeats.concurrent;
import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;
import java.util.concurrent.TimeUnit;
/**
* 消费者
*
* @author Zixi Wang
* @create 2017-11-01 16:16
*/
public class Consumer implements Runnable {
private IBlockingQueue<String> queue;
public Consumer(IBlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
String data = null;
try {
data = queue.take();
System.out.println("Consumer "+Thread.currentThread().getName()+" consume:"+data);
if (Constant.ENDING_SYMBOL.equals(data)) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("Consumer over");
}
}
我们用 一个生产者 两个消费者来做测试,如下:
package com.bytebeats.concurrent;
import com.bytebeats.concurrent.queue.ConditionBlockingQueue;
import com.bytebeats.concurrent.queue.IBlockingQueue;
/**
* ${DESCRIPTION}
*
* @author Zixi Wang
* @create 2017-11-01 16:21
*/
public class ProducerConsumerDemo {
public static void main(String[] args) {
//new ProducerConsumerDemo().testRun(new TraditionalBlockingQueue<String>());
new ProducerConsumerDemo().testRun(new ConditionBlockingQueue<String>());
}
public void testRun(IBlockingQueue<String> queue){
Thread producer = new Thread(new Producer(queue, 2));
producer.start();
Thread consumer1 = new Thread(new Consumer(queue));
consumer1.start();
Thread consumer2 = new Thread(new Consumer(queue));
consumer2.start();
}
}