生产者-消费者是十分经典的关于异步协作的问题。也有这样一种设计模式,其思想就是将用于存放消息(产物)的仓库独立出来,由这个仓库来保证线程安全。
因此,我写了一个简单的生产者消费者模式的实现,具体代码及注释如下
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author sgyh
*/
public class Repository<E> {
private Queue<E> queue;
private int maxSize;
private int size;
private ReentrantLock lock;
private Condition full;
private Condition empty;
/**
* 初始化,校验参数最大值,初始化锁,和condition,这里使用非公平锁
* @param maxSize
*/
public Repository(int maxSize) {
this.queue = new LinkedList<>();
if (maxSize <= 0) {
throw new IllegalArgumentException("The param maxSize must be greater than zero");
}
this.maxSize = maxSize;
this.lock = new ReentrantLock();
this.full = lock.newCondition();
this.empty = lock.newCondition();
}
/**
* 存取操作,上锁,检查是否满了,满则等待,直至被唤醒后,进行添加和size增加操作
* 操作完成后检查是否有消费者处于等待,有则唤醒(没这个判断直接唤醒也是可以的)
* @param e
* @throws InterruptedException
*/
public void put(E e) throws InterruptedException {
lock.lock();
while (true) {
if (size == maxSize) {
full.await();
} else {
queue.add(e);
size++;
System.out.println("存入,剩余" + size);
if (lock.hasWaiters(empty)) {
empty.signalAll();
}
lock.unlock();
break;
}
}
}
public int size() {
return size;
}
/**
* 取操作,检验size,是否可取,不可取则等待,直至被唤醒,进行取操作,并size--,
* 判断是否有生产者处于等待,有则唤醒(同上,无此判断直接唤醒也可)
* @return
*/
public E poll() throws InterruptedException {
E e;
lock.lock();
while (true) {
if (size == 0) {
empty.await();
} else {
e = queue.poll();
size--;
System.out.println("取出,剩余" + size);
if (lock.hasWaiters(full)) {
full.signalAll();
}
lock.unlock();
break;
}
}
return e;
}
/**
* maxSize 修改,修改时需上锁
* @param maxSize
*/
public void resize(int maxSize) {
lock.lock();
this.maxSize = maxSize;
if (lock.hasWaiters(full)) {
full.signalAll();
}
lock.unlock();
}
static class Product {
public Product() {
super();
}
}
static class Consumer<E> {
int i = 0;
void consume(Repository<E> repository) {
while (i++ < 10) {
try {
repository.poll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Producer<E> {
int i = 0;
void produce(E e, Repository<E> repository) {
while (i++ < 10) {
try {
repository.put(e);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
public static void main(String[] args) {
final Repository<Product> repository = new Repository<>(100);
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
new Producer<Product>().produce(new Product(), repository);
}).start();
new Thread(() -> {
new Consumer<Product>().consume(repository);
}).start();
}
}
}