Java中的wait/notify/notifyAll可用来实现线程间通信,是Object类的方法,这三个方法都是native方法,是平台相关的,常用来实现生产者/消费者模式。先来我们来看下相关定义:
- wait() :调用该方法的线程进入WATTING状态,只有等待另外线程的通知或中断才会返回,调用wait()方法后,会释放对象的锁。
- wait(long):超时等待最多long毫秒,如果没有通知就超时返回。
- notify() : 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁。
- notifyAll():通知所有等待在该对象上的线程。
实例代码
class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
synchronized (this) {
try {
Thread.sleep(1000); // 使当前线阻塞 1 s,确保主程序的 t1.wait(); 执行之后再执行 notify()
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" call notify()");
// 唤醒当前的wait线程
this.notify();
}
}
}
public class WaitTest {
public static void main(String[] args) {
ThreadA t1 = new ThreadA("t1");
synchronized(t1) {
try {
// 启动“线程t1”
System.out.println(Thread.currentThread().getName()+" start t1");
t1.start();
// 主线程等待t1通过notify()唤醒。
System.out.println(Thread.currentThread().getName()+" wait()");
t1.wait(); // 不是使t1线程等待,而是当前执行wait的线程等待
System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
LinkedBlockingQueue 实现
public class Consumer extends Thread {
private Storage storage;
public Consumer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
while (true) {
try {
Storage.Goods goods = storage.goods.take();
System.out.printf(String.valueOf(storage.goods.size())+"\\n");
Thread.sleep(300);
System.out.println("消费" + " " + goods.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Producer extends Thread {
private Storage storage;
private int i = 0;
public Producer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
while (true) {
try {
storage.goods.put(new Storage.Goods("苹果"+ i));
Thread.sleep(100);
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Storage {
public static int MAX_COUNT = 5; //容量为5
public BlockingQueue<Goods> goods = new LinkedBlockingQueue<Goods>(MAX_COUNT);
public Storage() {
}
public static class Goods {
int id;
String name;
public Goods(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
public class TestThread {
public static Object obj = new Object();
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
Storage storage = new Storage();
Consumer consumer1 = new Consumer(storage);
Consumer consumer2 = new Consumer(storage);
Producer producer = new Producer(storage);
service.submit(consumer1);
service.submit(consumer2);
service.submit(producer);
}
}