Java等待-通知机制与生产者消费者模型

一、什么是等待-通知机制

如果线程要求的条件不满足,则线程阻塞自己,进入等待状态;当线程要求的条件满足后,通知等待的线程重新执行。用简单的代码来理解:

synchronized(锁对象){
while (条件不满足){
            wait();
        }
        //条件满足时
        //do sthing
            notify();
}

这可以看成是多线程编程中的一种范式,需要注意这里必须使用while,因为执行wait()方法进入阻塞后会停留在这里,当被其他线程唤醒后,需要再一次判断条件是否满足,因为这时有可能资源已经别的线程先抢占了。 还有一点需要注意wait()和notify()必须在同步代码块中使用(就是已经获得锁了的情况下)。这是因为线程状态转换是有规定的,比如线程只能从running状态进入waiting状态。如果不是这样的情况就会报 java.lang.IllegalMonitorStateException()错误。

二、生产者消费者模型

1.理解有缓冲区的生产者消费者模型。

image.png

生产者消费者可以说是等待通知机制的典型应用了,直接看代码,使用wait(),notify()实现一个简单的生产者消费者模型

/**
 * @author xiaobenneng@hotmail.com
 * @date 2020/3/22
 */
public class ProducerComsumer {

    LinkedList<Integer> list = new LinkedList<>();//缓冲区

    public static void main(String[] args) throws InterruptedException {
        ProducerComsumer producerComsumer = new ProducerComsumer();
        Thread thread1 = new Thread(()->{
            while (true) {
                try {
                    producerComsumer.producer();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        Thread thread2 = new Thread(()->{
            while (true) {
                try {
                    producerComsumer.comsumer();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        thread1.start();
        thread2.start();
    }
    public void producer() throws InterruptedException {
        Random random = new Random();
        synchronized (this) {
           //当容器满时就等待消费者消费数据,容器不满时就往里面放数据
            while (list.size() == 5) { 
               wait();
            }
            int i = random.nextInt(10);
            list.add(i);
            System.out.printf("当前线程%s生产了数据%d \n", Thread.currentThread().getName(), i);
            notifyAll();
        }
    }

    public void comsumer() throws InterruptedException {
        synchronized (this) {
            //当容器为空时,就等待生产者往里面放数据。容器不为空时就消费数据
            while (list.size() == 0) {
              wait();
            }
            Integer remove = list.removeFirst();
            System.out.printf("当前线程%s消费了%d \n", Thread.currentThread().getName(), remove);
            notifyAll();
        }
    }

}

如果对锁的机制还不太熟悉的话,理解这个例子需要注意的是wait(),notify()只能唤醒同一个对象锁中的等待线程。在这里这个锁对象是this。

使用Lock ,Condition实现生产者消费者模型

先看一下java.util.concurrent包中的阻塞队列是如何实现阻塞的(仅讨论take()和put()方法,这两个方法在条件不满足时会阻塞一直等待)。
几个关键的参数:

/** 这个队列中都使用这把锁 */
    private ReentrantLock lock;

    /** 存放元素的数组 */
    private Object[] items;

    /** 锁对应的数组元素为空时的条件 */
    private Condition notEmpty;

    /** 锁对应的数组元素为满的条件 */
    private Condition notFull;

    /** 数组中元素的数量 */
    int count;

这几个参数会在构造方法中初始化。

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

在生产者方面,调用存数据的方法,当数据满了时就调用notFull.await()(相当于Object.wait())进入等待,直到消费者消费了数据以后,通过调用notFull.signal()(相当于Object.notify())来唤醒生产者继续生产。下面抽取2个主要的方法:

 public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

在消费者方面,调用取数据的方法,当数据为空时就调用notEmpty.await()进入等待,直到生产者生产了数据以后,通过调用notEmpty.signal()来唤醒消费者者继续生产。下面看一下主要的两个方法:

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
 private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        notFull.signal();
        return x;
    }

可以看到这是很明显的等待-通知的形式了。
接下来可以模仿ArrayBlockingQueue这种方式实现生产者消费者模型。


/**
 * @author xiaobenneng@hotmail.com
 * @date 2020/3/22
 */
public class ProducerComsumer<E> {

    public static void main(String[] args) {
        ProducerComsumer<Integer> producerComsumer = new ProducerComsumer<Integer>(5);
        new Thread(()->{
            Random random = new Random();
            while (true) {
                int i = random.nextInt(10);
                try {
                    producerComsumer.put(i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();

        new Thread(()->{
            while (true) {
                try {
                    Integer take = producerComsumer.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }

    private ReentrantLock lock;

    /** 存放元素的链表 */
    private LinkedList<E> items;

    /** 锁对应的数组元素为空时的条件 */
    private Condition notEmpty;

    /** 锁对应的数组元素为满的条件 */
    private Condition notFull;

    /** 链表中最大元素的数量 */
    int capacity;

    public ProducerComsumer(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new LinkedList<E>();
        this.capacity = capacity;
        lock = new ReentrantLock();
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (capacity == items.size()) {
                notFull.await();
            }
            items.addLast(e);
            System.out.printf("当前线程%s生产数据%d,总数%d \n",Thread.currentThread().getName(), e,items.size());
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (items.size() == 0) {
                notEmpty.await();
            }

            E e = items.removeFirst();
            System.out.printf("当前线程%s消费数据%d,剩余总数%d \n",Thread.currentThread().getName(),e, items.size());
            notFull.signal();
            return e;
        } finally {
            lock.unlock();
        }
    }

}

测试结果

当前线程Thread-0生产数据5,总数1 
当前线程Thread-0生产数据1,总数2 
当前线程Thread-0生产数据0,总数3 
当前线程Thread-0生产数据7,总数4 
当前线程Thread-0生产数据8,总数5 
当前线程Thread-1消费数据5,剩余总数4 
当前线程Thread-1消费数据1,剩余总数3 
当前线程Thread-1消费数据0,剩余总数2 
当前线程Thread-1消费数据7,剩余总数1 
当前线程Thread-1消费数据8,剩余总数0 

模拟一种场景,当生产力大于消费力时,通过在消费者端sleep一段时间模拟这个场景

public static void main(String[] args) {
        ProducerComsumer<Integer> producerComsumer = new ProducerComsumer<Integer>(5);
 Random random = new Random();
        new Thread(()->{
            while (true) {
                int i = random.nextInt(10);
                try {
                    producerComsumer.put(i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
        new Thread(()->{
            while (true) {
                try {
                    Integer take = producerComsumer.take();
                    //处理数据花费时间
                    Thread.sleep(0,random.nextInt(500));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
        new Thread(()->{
            while (true) {
                try {
                    Integer take = producerComsumer.take();
                    //处理数据花费时间
                    Thread.sleep(0,random.nextInt(400));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }

运行结果

当前线程Thread-0生产数据6,总数4 
当前线程Thread-0生产数据7,总数5 
当前线程Thread-1消费数据0,剩余总数4 
当前线程Thread-2消费数据6,剩余总数3 
当前线程Thread-0生产数据5,总数4 
当前线程Thread-0生产数据9,总数5 
当前线程Thread-1消费数据5,剩余总数4 
当前线程Thread-2消费数据6,剩余总数3 
当前线程Thread-0生产数据7,总数4 
当前线程Thread-0生产数据4,总数5 

生产者生产数据的速度远远大过消费者消费数据速度,这体现了这种模式的优势,生产者不依赖消费者,生产的数据先放入缓存区,消费者再慢慢消费,处理结果使用异步返回。对于消费者来讲,因为消费者处理任务的能力是有限的,这时候也对服务器资源起到了保护的作用,比如可以避免过多的请求发送到mysql。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 原创文章&经验总结&从校招到A厂一路阳光一路沧桑 详情请戳www.codercc.com 生产者-消费者模式是一个...
    你听___阅读 45,087评论 9 60
  • 相关概念 面向对象的三个特征 封装,继承,多态.这个应该是人人皆知.有时候也会加上抽象. 多态的好处 允许不同类对...
    东经315度阅读 2,212评论 0 8
  • 本文出自 Eddy Wiki ,转载请注明出处:http://eddy.wiki/interview-java.h...
    eddy_wiki阅读 2,304评论 0 14
  • 面向对象的三个特征 封装,继承,多态.这个应该是人人皆知.有时候也会加上抽象. 多态的好处 允许不同类对象对同一消...
    Blizzard_liu阅读 1,838评论 0 6
  • 如果说亲子关系是每个父亲和儿子的鸿沟,那么在今天请跳过去看看。 那边的世界… 每个父亲和儿子的关系总是难以言说,我...
    征途_8371阅读 348评论 0 2

友情链接更多精彩内容