wait()和notifyall()
ublic class ShareDataV1 {
public static AtomicInteger atomicInteger = new AtomicInteger();
public volatile boolean flag = true;
public static final int MAX_COUNT = 10;
public static final List<Integer> pool = new ArrayList<>();
public void produce() {
// 判断,干活,通知
while (flag) {
// 每隔 1000 毫秒生产一个商品
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
synchronized (pool) {
//池子满了,生产者停止生产
//TODO 判断
while (pool.size() == MAX_COUNT) {
try {
System.out.println("pool is full, wating...");
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
pool.add(atomicInteger.incrementAndGet());
System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
//通知
pool.notifyAll();
}
}
}
public void consumue() {
// 判断,干活,通知
while (flag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
synchronized (pool) {
//池子空了,消费者停止消费
while (pool.size() == 0) {
try {
System.out.println("pool is empty, wating...");
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
int temp = pool.get(0);
pool.remove(0);
System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
//通知
pool.notifyAll();
}
}
}
public void stop() {
flag = false;
}
}
lock,condition,signal
public class ShareDataV2 {
public static AtomicInteger atomicInteger = new AtomicInteger();
public volatile boolean flag = true;
public static final int MAX_COUNT = 10;
public static final List<Integer> pool = new ArrayList<>();
private Lock lock = new ReentrantLock();
//也可以一个condition然后signalall
private Condition produce_condition = lock.newCondition();
private Condition consumue_condition = lock.newCondition();
public void produce() {
// 判断,干活,通知
while (flag){
lock.lock();
try {
Thread.sleep(100);
//池子满了,生产者停止生产
while (pool.size() == MAX_COUNT) {
//等待,不能生产
produce_condition.await();
}
//干活
pool.add(atomicInteger.incrementAndGet());
System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
consumue_condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public void consumue() {
// 判断,干活,通知
while (flag) {
lock.lock();
try {
Thread.sleep(1000);
//池子空了,消费者停止消费
while (pool.size() == 0) {
//等待,不能消费
System.out.println("pool is empty, wating...");
consumue_condition.await();
}
//干活
int temp = pool.get(0);
pool.remove(0);
System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
produce_condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public void stop() {
flag = false;
}
}
阻塞队列
public class ShareDataV3 {
private static final int MAX_CAPACITY = 10; //阻塞队列容量
private static BlockingQueue<Integer> blockingQueue= new ArrayBlockingQueue<>(MAX_CAPACITY); //阻塞队列
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
public void produce() throws InterruptedException {
while (FLAG){
boolean retvalue = blockingQueue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
if (retvalue==true){
System.out.println(Thread.currentThread().getName()+"\t 插入队列"+ atomicInteger.get()+"成功"+"资源队列大小= " + blockingQueue.size());
}else {
System.out.println(Thread.currentThread().getName()+"\t 插入队列"+ atomicInteger.get()+"失败"+"资源队列大小= " + blockingQueue.size());
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"FLAG变为flase,生产停止");
}
public void consume() throws InterruptedException {
Integer result = null;
while (true){
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (null==result){
System.out.println("超过两秒没有取道数据,消费者即将退出");
return;
}
System.out.println(Thread.currentThread().getName()+"\t 消费"+ result+"成功"+"\t\t"+"资源队列大小= " + blockingQueue.size());
Thread.sleep(1500);
}
}
public void stop() {
this.FLAG = false;
}
}
三个的统一调用
public class ProducerConsumer_V1 {
public static void main(String[] args) {
ShareDataV1 shareDataV1 = new ShareDataV1();
new Thread(() -> {
shareDataV1.produce();
}, "AAA").start();
new Thread(() -> {
shareDataV1.consumue();
}, "BBB").start();
new Thread(() -> {
shareDataV1.produce();
}, "CCC").start();
new Thread(() -> {
shareDataV1.consumue();
}, "DDD").start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
shareDataV1.stop();
}
}