阻塞队列基本
image.png
阻塞队列模型
当队列时空 消费者阻塞
队列满了 生产者阻塞
使用阻塞队列,不需要管线程的唤醒和阻塞
BlockingQueue 在Collection底下
image.png
image.png
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println( blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("d"));
}
}
加了四个字符串 然后报了异常
image.png
超时控制
如果用了Put,take 会一直等着 拖着
如果队列满了 Put会满了
如果队列是空take会一直等着
offer(e,time,unit) 限定时间的put
poll()限定时间的
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// System.out.println(blockingQueue.add("a"));
// System.out.println( blockingQueue.add("b"));
// System.out.println(blockingQueue.add("c"));
//hu
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
}
}
题目一
image.png
一个初始值为0的变量,两个线程对其交替操作,一个加一,一个减一,来5轮
class ShareData{
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increase() throws Exception{
//1 判断
lock.lock();
System.out.println(Thread.currentThread().getName()+"has lock");
while (number !=0){ //等待,不生产
System.out.println(Thread.currentThread().getName()+"wait");
condition.await();
System.out.println(Thread.currentThread().getName()+"wait end");
}
number++;
System.out.println(Thread.currentThread().getName()+number);
//通知唤醒
condition.signalAll();
lock.unlock();
}
public void decrease() throws Exception{
//1 判断
lock.lock();
System.out.println(Thread.currentThread().getName()+"has lock");
while (number ==0){ //等待,不生产
System.out.println(Thread.currentThread().getName()+"wait");
condition.await();
System.out.println(Thread.currentThread().getName()+"wait end");
}
number--;
System.out.println(Thread.currentThread().getName()+number);
//通知唤醒
condition.signalAll();
lock.unlock();
}
}
//初始,两个变量,一个+1,一个-1 来五轮
// 线程操作资源类
//判断干活 通知
//防止虚假唤醒
public class Pro_Consumer_Traditional {
public static void main(String[] args) {
ShareData s=new ShareData();
new Thread(()->{
for (int i = 0; i < 5; i++) {
try {
s.increase();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
try {
s.decrease();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "BB").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
try {
s.increase();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "CC").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
try {
s.decrease();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "DD").start();
}
}
注意wait 条件不能用if 判断
因为如果有两个increase线程都在wait
有一个decrease线程把他们唤醒了
那么这两个线程都会抢占lock
然后都有可能会add number
如果用while的话
他们被唤醒后
只有一个线程可以获得锁,然后结束while
另一个线程还会继续wait
package JavaBsic.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
// volatile cas automaticInteger
class MyResource{
private volatile boolean Flag=true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> queue = null;
//高手一定传接口,不传类
public MyResource(BlockingQueue<String> queue) {
this.queue = queue;
System.out.println(queue.getClass().getName());
}
public void myProd() throws Exception{
String data=null;
boolean returnValue;
while(Flag){
data=atomicInteger.incrementAndGet()+"";
returnValue=queue.offer(data,2, TimeUnit.SECONDS);
if (returnValue)
System.out.println(Thread.currentThread().getName()+"插入成功"+data);
else
System.out.println(Thread.currentThread().getName()+"插入失败"+data);
Thread.sleep(300);
}
System.out.println("生产结束");
}
public void stop(){
this.Flag=false;
}
public void consume() throws Exception{
String data=null;
String returnValue;
while(Flag){
data=atomicInteger.incrementAndGet()+"";
returnValue=queue.poll(2, TimeUnit.SECONDS);
if(null==returnValue || returnValue.equalsIgnoreCase("")){
Flag=false;
System.out.println("超时");
return;
}
System.out.println(Thread.currentThread().getName()+"消费队列成功"+data);
}
System.out.println("消费结束");
}
}
public class ProdDonsumer_Block {
public static void main(String[] args) throws InterruptedException {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(()->{
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
},"consumer").start();
new Thread(()->{
try {
myResource.consume();
} catch (Exception e) {
e.printStackTrace();
}
},"consum").start();
Thread.sleep(2000);
myResource.stop();
}
}