1.condition的作用是什么?
配合lock实现生产者消费者模式,阻塞、唤醒线程达到线程通信目的
2.尝试用condition去实现一个阻塞队列
随便写的 注重思想
来个思路:维持两个等待队列,一个等着生产,一个等着消费
放入时满了阻塞生产,唤醒消费
获取时空了阻塞消费,唤醒生产
首先,面向接口编程,来个接口
public interface MyBlockQueue<E> {
void put(E e) throws InterruptedException;
E take() throws InterruptedException;
}
来个实现
public class MyArrayBlockQueue<E> implements MyBlockQueue<E>,Serializable {
private static final long serialVersionUID = 8683452581122892189L;
transient E[] elementData;
private static final int MAX_ARRAY_SIZE = 2147483639;
Lock lock;
Condition notEmpty;
Condition notFull;
int count;
public MyArrayBlockQueue(int var1) {
if (var1 > 0) {
this.elementData = (E[])new Object[var1];
} else {
if (var1 != 0) {
throw new IllegalArgumentException("Illegal Capacity: " + var1);
}
}
this.elementData = (E[])new Object[var1];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public MyArrayBlockQueue() {
this.elementData = (E[])new Object[0];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
@Override
public void put(E e) throws InterruptedException {
try {
lock.lock();
while(count == MAX_ARRAY_SIZE){
notFull.await();
}
elementData[count++] = e;
notEmpty.signal();
System.out.println("put:"+toString());
}finally {
lock.unlock();
}
}
@Override
public E take() throws InterruptedException {
try{
lock.lock();
while(count == 0){
notEmpty.await();
}
E e = elementData[0];
forword(elementData);
count--;
notFull.signal();
System.out.println("take:"+toString());
return e;
}finally {
lock.unlock();
}
}
private void forword(E[] elementData) {
for (int i = 1; i < elementData.length; i++) {
elementData[i-1] = elementData[i];
}
}
@Override
public String toString() {
return "MyArrayBlockQueue{" +
"elementData=" + Arrays.toString(elementData) +
'}';
}
}
生产者
public class PutThread implements Runnable{
private MyBlockQueue<String> queue;
public PutThread(MyBlockQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while(true){
try {
queue.put(UUID.randomUUID().toString().substring(0,5));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者
public class TakeThread implements Runnable {
private MyBlockQueue<String> queue;
public TakeThread(MyBlockQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while(true){
try {
queue.take();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
来个测试
public class Test {
public static void main(String[] args) throws InterruptedException {
MyBlockQueue<String> queue = new MyArrayBlockQueue<>(5);
for (int i = 0; i < 5; i++) {
new Thread(new PutThread(queue)).start();
new Thread(new TakeThread(queue)).start();
}
}
}
来个效果图