JAVA-阻塞队列-生产者消费者模型

阻塞队列基本

image.png

阻塞队列模型
当队列时空 消费者阻塞
队列满了 生产者阻塞

使用阻塞队列,不需要管线程的唤醒和阻塞

BlockingQueue 在Collection底下


image.png
image.png
public class BlockingQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println( blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));

        System.out.println(blockingQueue.add("d"));
        
    }
}

加了四个字符串 然后报了异常


image.png

超时控制
如果用了Put,take 会一直等着 拖着

如果队列满了 Put会满了

如果队列是空take会一直等着

offer(e,time,unit) 限定时间的put
poll()限定时间的

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//        System.out.println(blockingQueue.add("a"));
//        System.out.println( blockingQueue.add("b"));
//        System.out.println(blockingQueue.add("c"));
//hu
//        System.out.println(blockingQueue.add("d"));

        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
    }
}

题目一

image.png

一个初始值为0的变量,两个线程对其交替操作,一个加一,一个减一,来5轮

class ShareData{
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    public void increase() throws Exception{
        //1 判断
        lock.lock();
        System.out.println(Thread.currentThread().getName()+"has lock");
       while (number !=0){ //等待,不生产
            System.out.println(Thread.currentThread().getName()+"wait");
            condition.await();
           System.out.println(Thread.currentThread().getName()+"wait end");

        }

        number++;
        System.out.println(Thread.currentThread().getName()+number);
        //通知唤醒
        condition.signalAll();
        lock.unlock();

    }

    public void decrease() throws Exception{
        //1 判断
        lock.lock();
        System.out.println(Thread.currentThread().getName()+"has lock");

        while (number ==0){ //等待,不生产
            System.out.println(Thread.currentThread().getName()+"wait");
            condition.await();
            System.out.println(Thread.currentThread().getName()+"wait end");

        }

        number--;
        System.out.println(Thread.currentThread().getName()+number);
        //通知唤醒

        condition.signalAll();
        lock.unlock();
    }
}

//初始,两个变量,一个+1,一个-1 来五轮
// 线程操作资源类
//判断干活 通知
//防止虚假唤醒
public class Pro_Consumer_Traditional {
    public static void main(String[] args) {
        ShareData s=new ShareData();
        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    s.increase();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    s.decrease();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    s.increase();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "CC").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    s.decrease();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "DD").start();
    }

}

注意wait 条件不能用if 判断
因为如果有两个increase线程都在wait
有一个decrease线程把他们唤醒了
那么这两个线程都会抢占lock
然后都有可能会add number

如果用while的话
他们被唤醒后
只有一个线程可以获得锁,然后结束while
另一个线程还会继续wait

package JavaBsic.BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

// volatile cas automaticInteger
class MyResource{
    private volatile boolean Flag=true;
    private AtomicInteger atomicInteger = new AtomicInteger();
    BlockingQueue<String> queue = null;

    //高手一定传接口,不传类
    public MyResource(BlockingQueue<String> queue) {
        this.queue = queue;
        System.out.println(queue.getClass().getName());
    }

    public void myProd() throws Exception{
        String data=null;
        boolean returnValue;
        while(Flag){
            data=atomicInteger.incrementAndGet()+"";
            returnValue=queue.offer(data,2, TimeUnit.SECONDS);
            if (returnValue)
                System.out.println(Thread.currentThread().getName()+"插入成功"+data);
            else
                System.out.println(Thread.currentThread().getName()+"插入失败"+data);
            Thread.sleep(300);

        }
        System.out.println("生产结束");
    }
    public void stop(){
        this.Flag=false;
    }

    public void consume() throws Exception{
        String data=null;
        String returnValue;
        while(Flag){
            data=atomicInteger.incrementAndGet()+"";
            returnValue=queue.poll(2, TimeUnit.SECONDS);
            if(null==returnValue || returnValue.equalsIgnoreCase("")){
                Flag=false;
                System.out.println("超时");
                return;
            }

            System.out.println(Thread.currentThread().getName()+"消费队列成功"+data);


        }
        System.out.println("消费结束");
    }


}

public class ProdDonsumer_Block {
    public static void main(String[] args) throws InterruptedException {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
        new Thread(()->{
            try {
                myResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"consumer").start();



        new Thread(()->{
            try {
                myResource.consume();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"consum").start();

        Thread.sleep(2000);
        myResource.stop();
    }

}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容