多线程开发

在讲AQS之前,我们先熟悉一下,多线程开发中要注意的问题,这也是AQS致力于要解决的问题

在多线程开发的过程中会涉及到两类问题:

  • 多个线程之间资源竞争的问题
  • 线程之间的通信问题

java最原始的解决方案:

  • 使用synchronized关键字给资源加锁,以解决资源竞争问题
  • 使用Object.wait,notify,以解决线程之间的通信问题 调用之前必须先获得锁

下面使用最经典的生产者和消费者模式,来解释这两个问题:

需求:
1.多个生产者同时生产,多个消费者同时消费
2.一共可以生产5000个商品,生产完成之后,生产者自动停止
3.一共可以消费5000个商品,消费完成之后,消费者自动停止
4.存储商品的容器大小为100

商品:

public class Product {

    /**
     * 生产者名称
     */
    private String producer;

    /**
     * 序列号
     */
    private Integer serialNumber;

    public Product(String producer, Integer serialNumber) {
        this.producer = producer;
        this.serialNumber = serialNumber;
    }

    @Override
    public String toString() {
        return "生产者:" + producer + ", 序列号:" + serialNumber;
    }
}

生产者:
当容器满的时候,调用lock.wait,当前线程会阻塞,直到被其他线程唤醒,才会继续执行;
当生产了一件之后,调用lock.notifyAll,唤醒其他等待的线程

public class Producer {

    /**
     * 生产的数量
     */
    private AtomicInteger produceNum = new AtomicInteger(0);

    private final Object lock;

    private List<Product> products;

    public Producer(Object lock, List<Product> products) {
        this.lock = lock;
        this.products = products;
    }

    /**
     *
     * @return 生产了几个
     * @throws Exception
     */
    public int produce() throws Exception {
        synchronized(lock)  {

            // 如果已经生产了5000个,则直接终端当前线程,并通知其他线程
            if(produceNum.get() >= 5000) {
                lock.notifyAll();
                throw new InterruptedException(Thread.currentThread().getName() + "已经生产了5000件,无需再生产了");
            }

            // 如果容器中的数量大于或者等于100,等待消费者消费
            if(products.size() >= 100) {
                System.out.println("#####################生产者开始等待#####################");
                lock.wait();
                return 0;
            }

            // 生产一件,并通知所有的等待者
            produceNum.getAndIncrement();
            Product product = new Product(Thread.currentThread().getName(), produceNum.get());
            products.add(product);
            System.out.println("++++++++++++++++++生产[" + product.toString() + "]++++++++++++++++++,size=" + products.size());
            lock.notifyAll();
            return 1;
        }
    }
}

消费者:
当容器为空时,调用lock.wait,当前线程会阻塞,直到被其他线程唤醒,才会继续执行;
当消费完一件后,需要调用lock.notifyAll去通知其他等待的线程

public class Consumer {

    /**
     * 生产的数量
     */
    private AtomicInteger consumeNum = new AtomicInteger(0);

    private final Object lock;

    private List<Product> products;

    public Consumer(Object lock, List<Product> products) {
        this.lock = lock;
        this.products = products;
    }

    public int consumer() throws Exception {
        synchronized(lock)  {

            // 如果已经消费了5000个,则直接终端当前线程,并通知其他线程
            if(consumeNum.get() >= 5000) {
                throw new InterruptedException(Thread.currentThread().getName() + "已经消费了5000件,无需再消费了");
            }

            // 如果容器中的没有产品了
            if(products.size() <= 0) {
                System.out.println("********************消费者开始等待********************");
                lock.wait();
                return 0;
            }

            // 消费一件,并通知所有的等待者
            consumeNum.getAndIncrement();
            Product product = products.remove(0);
            System.out.println("----------------------消费[" + product.toString() + "]----------------------,size=" + products.size());
            lock.notifyAll();

            return 1;
        }
    }
}

执行逻辑:

@Test
    public void producerAndConsumerTest() {
        // 用于装产品的容器
        List<Product> products = Lists.newArrayList();

        Object lock = new Object();

        final Producer producer = new Producer(lock, products);

        final StringBuilder msg = new StringBuilder();

        Thread p1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (true) {
                    try {
                        i += producer.produce();
                    } catch (Exception e) {
                        msg.append("*********************p1共生产了" + i + "件产品\n");
                        System.out.println(e.getMessage());
                        break;
                    }
                }

            }
        }, "p1");

        Thread p2 = new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (true) {
                    try {
                        i += producer.produce();
                    } catch (Exception e) {
                        msg.append("*********************p2共生产了" + i + "件产品\n");
                        System.out.println(e.getMessage());
                        break;
                    }
                }
            }
        }, "p2");

        final Consumer consumer = new Consumer(lock, products);

        Thread c1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (true) {
                    try {
                        i += consumer.consumer();
                    } catch (Exception e) {
                        msg.append("*********************c1共消费了" + i + "件产品\n");
                        System.out.println(e.getMessage());
                        break;
                    }
                }
            }
        }, "c1");

        Thread c2 = new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (true) {
                    try {
                        i += consumer.consumer();
                    } catch (Exception e) {
                        msg.append("*********************c2共消费了" + i + "件产品\n");
                        System.out.println(e.getMessage());
                        break;
                    }
                }
            }
        }, "c2");

        p1.start();
        p2.start();
        c1.start();
        c2.start();

        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            // ignore
        }

        System.out.println(msg.toString());

    }

运行结果截图:


图片.png

其实使用java原生的方法,存在一个小的问题,当生产者生产了一件商品之后,需要调用notifyAll方法去唤醒所有的等待线程,我们无法对等待的线程做一个分组,好在AQS解决了这个问题,以ReentrantLock为例,来解释一下AQS如何解决这个问题的

使用ReentrantLock实现生产者和消费者模型

未完待续。。。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容