引言
生产者与消费者问题是典型的多线程同步问题。生产者与消费者分别是两个角色,需要维护一个公共队列,生产者向队列中放入生产的物品,消费者从队列中获取物品进行消费。
代码实现时,生产者与消费者属于两种不同类型线程,公共队列被多线程共有。当公共队列满时,生产者线程需要阻塞等待消费者消费;当公共队列空时,消费者线程需要阻塞等待生产者生产新物品。
wait()和notify()
public class Demo1 {
private static Queue<Integer> queue = new LinkedList<>();
public static void main(String[] args) {
Demo1 test1 = new Demo1();
new Thread(test1.new Producer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (queue) {
while (queue.size() == 10) {
try {
System.out.println(Thread.currentThread().getName() + "生产者等待(队列已满)");
queue.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
Integer number = new Random().nextInt()%10;
queue.add(number);
System.out.println(Thread.currentThread().getName() + "生产者生产了一个数字" + number);
queue.notifyAll();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (queue) {
while (queue.size() == 0) {
try {
System.out.println(Thread.currentThread().getName() + "消费者等待(队列已空)");
queue.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "消费者消费了一个数字" + queue.poll());
queue.notifyAll();
}
}
}
}
}
可重入锁ReentrantLock
public class Demo2 {
private static Queue<Integer> queue = new LinkedList<>();
private Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
Demo2 demo2 = new Demo2();
new Thread(demo2.new Producer()).start();
new Thread(demo2.new Producer()).start();
new Thread(demo2.new Producer()).start();
new Thread(demo2.new Producer()).start();
new Thread(demo2.new Consumer()).start();
new Thread(demo2.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//获取锁
lock.lock();
try {
while (queue.size() == 10) {
try {
System.out.println(Thread.currentThread().getName() + "生产者等待(队列已满)");
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer number = new Random().nextInt()%10;
queue.add(number);
System.out.println(Thread.currentThread().getName() + "生产者生产了一个数字" + number);
//唤醒消费者
notEmpty.signal();
} finally {
//释放锁
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
lock.lock();
try {
while (queue.size() == 0) {
try {
System.out.println(Thread.currentThread().getName() + "消费者等待(队列已空)");
notEmpty.await();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "消费者消费了一个数字" + queue.poll());
notFull.signal();
} finally {
lock.unlock();
}
}
}
}
}
阻塞队列BlockingQueue
public class Demo3 {
final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
Demo3 demo3 = new Demo3();
new Thread(demo3.new Producer()).start();
new Thread(demo3.new Producer()).start();
new Thread(demo3.new Producer()).start();
new Thread(demo3.new Producer()).start();
new Thread(demo3.new Consumer()).start();
new Thread(demo3.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
try {
blockingQueue.put(i);
System.out.println(Thread.currentThread().getName() + "生产者生产了数字" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
System.out.println(Thread.currentThread().getName()
+ "消费者消费了数字" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}