哈喽,大家好,我们都知道线程的重要性,其中线程间通信可以使得线程更加的灵活,所以我们这次来聊聊线程间是如何通信的。
等待/通知机制
等待/通知机制简单来说就是当一个线程在等待的时候,由其他地方通知它不用等待了,然后该线程就继续执行下面的代码。举一个生活中的例子,等待/通知机制就好比我们网购,我们在网上买了东西并付钱过后我们就进入了等待的状态,需要等待商家的发货,等待快递员的送货。当快递员送货完成后,他们会打电话通知我们,我们买的东西已经送到,然后我们就知道下去拿快递。这就是一个简单的等待/通知机制。
在Java中等待/通知机制我们可以用wait方法和notify/notifyAll方法来实现。下面我们先用一个简单的例子来看下他们是如何使用的:
public class TestTool {
private final Object object = new Object();
public void user() {
synchronized (object) {
try {
System.out.println("用户已经购买了商品,开始等待");
object.wait();
System.out.println("用户拿到了商品");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void process() {
synchronized (object) {
try {
System.out.println("商品开始运送");
Thread.sleep(5000);
object.notify();
System.out.println("商品运送完成");
Thread.sleep(1000);
System.out.println("通知用户来拿");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class ConmuMain {
public static void main(String[] args) {
try {
TestTool testTool = new TestTool();
ThreadA threadA = new ThreadA(testTool);
ThreadB threadB = new ThreadB(testTool);
threadA.start();
Thread.sleep(1000);
threadB.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class ThreadA extends Thread {
private TestTool testTool;
public ThreadA(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
testTool.user();
}
}
public static class ThreadB extends Thread {
private TestTool testTool;
public ThreadB(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
testTool.process();
}
}
}
结果为:
用户已经购买了商品,开始等待
商品开始运送
商品运送完成
通知用户来拿
用户拿到了商品
我们根据结果来分析一下过程。
- 线程A中先调用user方法,拿到锁后又继续调用了wait方法进入了等待。
- 过了1秒,调用了线程B的process方法,我们可以看见user方法和process方法都是以object为对象监视器加了锁的,在调用wait方法后代码进去了process说明了线程A中已经释放了锁,所以线程B的方法才能正常运行,我们得出结论在调用wait方法后会立刻释放当前的锁。
- 经过了5秒,线程B中调用了notify方法,但这个时候线程A并没有立马执行,而是继续执行线程B后面的方法。
- 等线程B执行完成后线程A才开始执行wait下面的代码,我们可以发现在调用notify方法后线程B继续执行,而且这个时候还没有释放锁,等到线程B执行完成并销毁后,才释放锁,这个时候线程A获取锁,然后继续执行wait后面的代码。
这就是wait,notify的整个流程。
notify是随机通知一个等待的线程
我们发现在调用nofity过后会通知一个线程停止等待,那么这个通知的线程是有一定规律还是随机的呢,这个我们用代码来看一下:
public class TestTool {
private final Object object = new Object();
public void inWait() {
synchronized (object) {
try {
System.out.println(Thread.currentThread().getName() + "进入等待状态");
object.wait();
System.out.println(Thread.currentThread().getName() + "解除等待状态");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void inNotify() {
synchronized (object) {
try {
System.out.println("解除一个线程");
object.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class ConmuMain {
public static void main(String[] args) {
try {
TestTool testTool = new TestTool();
for (int i = 0; i < 5; i++) {
ThreadA threadA = new ThreadA(testTool);
threadA.start();
}
Thread.sleep(1000);
for (int i = 0; i < 2; i++) {
ThreadB threadB = new ThreadB(testTool);
threadB.start();
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static class ThreadA extends Thread {
private TestTool testTool;
public ThreadA(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
testTool.inWait();
}
}
public static class ThreadB extends Thread {
private TestTool testTool;
public ThreadB(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
testTool.inNotify();
}
}
}
结果为:
Thread-0进入等待状态
Thread-2进入等待状态
Thread-1进入等待状态
Thread-3进入等待状态
Thread-4进入等待状态
解除一个线程
Thread-0解除等待状态
解除一个线程
Thread-2解除等待状态
我们可以看到这次通知的是线程0和2,并且我在多次运行后,发现通知的线程也不一样,没有规律,所以notify是随机通知一个正在等待的线程。
notifyAll
notifyAll方法就是通知所有的等待线程,我们修改下上面的代码来看看:
public void inNotify() {
synchronized (object) {
try {
System.out.println("解除所有线程");
object.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
try {
TestTool testTool = new TestTool();
for (int i = 0; i < 5; i++) {
ThreadA threadA = new ThreadA(testTool);
threadA.start();
}
Thread.sleep(1000);
ThreadB threadB = new ThreadB(testTool);
threadB.start();
} catch (Exception e) {
e.printStackTrace();
}
}
结果为:
Thread-0进入等待状态
Thread-1进入等待状态
Thread-2进入等待状态
Thread-3进入等待状态
Thread-4进入等待状态
解除所有线程
Thread-4解除等待状态
Thread-3解除等待状态
Thread-2解除等待状态
Thread-1解除等待状态
Thread-0解除等待状态
在调用notifyAll方法过后,所有等待线程都结束了等待。
wait(long timeout)
这个方式是在等待多少秒后,如果没有其他线程通知这个线程,那么这个线程就自己取消等待,下面我们来看个例子:
public class TestTool {
private final Object object = new Object();
public void inWait() {
synchronized (object) {
try {
System.out.println(Thread.currentThread().getName() + "进入等待状态");
object.wait(2000);
System.out.println(Thread.currentThread().getName() + "解除等待状态");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ConmuMain {
public static void main(String[] args) {
try {
TestTool testTool = new TestTool();
ThreadA threadA = new ThreadA(testTool);
threadA.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class ThreadA extends Thread {
private TestTool testTool;
public ThreadA(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
testTool.inWait();
}
}
}
结果为:
Thread-0进入等待状态
Thread-0解除等待状态
我们并没有用其他线程来通知该线程可以取消等待,但它还是自己取消了。
一对一生产消费模式
我们来举个例子,一个生产者,一个消费者,生产者每次只生产一个产品,等待消费者来消费,消费者每次只消费一个产品,下面我们来看看代码:
public class TestTool {
private List<Integer> list = new ArrayList<>();
private final Object object = new Object();
public void creatOne() {
synchronized (object) {
try {
if (list.size() == 1) {
object.wait();
}
list.add(1);
System.out.println("生产者生产一个产品:" + list.size());
object.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void consumeOne() {
synchronized (object) {
try {
if (list.isEmpty()) {
object.wait();
}
list.remove(0);
System.out.println("消费者消费一个产品:" + list.size());
object.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class ConmuMain {
public static void main(String[] args) {
try {
TestTool testTool = new TestTool();
ThreadA threadA = new ThreadA(testTool);
ThreadB threadB = new ThreadB(testTool);
threadA.start();
threadB.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class ThreadA extends Thread {
private TestTool testTool;
public ThreadA(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
while (true) {
testTool.creatOne();
}
}
}
public static class ThreadB extends Thread {
private TestTool testTool;
public ThreadB(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
while (true) {
testTool.consumeOne();
}
}
}
}
结果为:
消费者消费一个产品:0
生产者生产一个产品:1
消费者消费一个产品:0
生产者生产一个产品:1
消费者消费一个产品:0
生产者生产一个产品:1
消费者消费一个产品:0
生产者生产一个产品:1
消费者消费一个产品:0
生产者生产一个产品:1
消费者消费一个产品:0
生产者生产一个产品:1
消费者消费一个产品:0
生产者生产一个产品:1
消费者消费一个产品:0
生产者生产一个产品:1
消费者消费一个产品:0
生产者生产一个产品:1
......
我们可以看到其中生产者和消费者都是按照预先设定好的模式来运行的,生产者生产的产品最大就是1。
一个生产者多个消费者
上面的写法在一个生产者一个消费者的情况下没有任何问题,那么在一个生产者多个消费者的情况下是否可以呢,我们下面来看看代码:
public class TestTool {
private List<Integer> list = new ArrayList<>();
private final Object object = new Object();
public void creatOne() {
synchronized (object) {
try {
if (list.size() == 1) {
object.wait();
}
list.add(1);
System.out.println("生产者生产一个产品:" + list.size());
object.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void consumeOne() {
synchronized (object) {
try {
if (list.isEmpty()) {
object.wait();
System.out.println(Thread.currentThread().getName() + "取消等待");
}
list.remove(0);
System.out.println(Thread.currentThread().getName() + "消费一个产品:" + list.size());
object.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class ConmuMain {
public static void main(String[] args) {
try {
TestTool testTool = new TestTool();
ThreadA threadA = new ThreadA(testTool);
ThreadB threadB1 = new ThreadB(testTool);
ThreadB threadB2 = new ThreadB(testTool);
ThreadB threadB3 = new ThreadB(testTool);
ThreadB threadB4 = new ThreadB(testTool);
ThreadB threadB5 = new ThreadB(testTool);
threadA.start();
threadB1.start();
threadB2.start();
threadB3.start();
threadB4.start();
threadB5.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static class ThreadA extends Thread {
private TestTool testTool;
public ThreadA(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
while (true) {
testTool.creatOne();
}
}
}
public static class ThreadB extends Thread {
private TestTool testTool;
public ThreadB(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
super.run();
while (true) {
testTool.consumeOne();
}
}
}
}
结果为:
生产者生产一个产品:1
Thread-2消费一个产品:0
生产者生产一个产品:1
Thread-1取消等待
Thread-1消费一个产品:0
Thread-2取消等待
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.remove(ArrayList.java:492)
at conmuDemo.TestTool.consumeOne(TestTool.java:33)
at conmuDemo.ConmuMain$ThreadB.run(ConmuMain.java:54)
结果报错了,我们通过日志可以发现在线程1消费了一个产品后生产者并没有生产产品,线程2又开始消费产品,这就导致了报错。我们修改下代码:
public class TestTool {
private List<Integer> list = new ArrayList<>();
private final Object object = new Object();
public void creatOne() {
synchronized (object) {
try {
while (list.size() == 1) {
object.wait();
}
list.add(1);
System.out.println("生产者生产一个产品:" + list.size());
object.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void consumeOne() {
synchronized (object) {
try {
while (list.isEmpty()) {
object.wait();
System.out.println(Thread.currentThread().getName() + "取消等待");
}
list.remove(0);
System.out.println(Thread.currentThread().getName() + "消费一个产品:" + list.size());
object.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
我们将原来的if判断变成了while判断,这样在取消等待过后会再次判断一次list的size,避免了上面的错误,我们运行下,结果为:
生产者生产一个产品:1
Thread-2消费一个产品:0
生产者生产一个产品:1
Thread-3消费一个产品:0
Thread-1取消等待
生产者生产一个产品:1
Thread-2消费一个产品:0
Thread-1取消等待
Thread-3取消等待
我们发现这里并没有报错了,但是出现了假死情况,我们再次修改代码:
public class TestTool {
private List<Integer> list = new ArrayList<>();
private final Object object = new Object();
public void creatOne() {
synchronized (object) {
try {
while (list.size() == 1) {
object.wait();
}
list.add(1);
System.out.println("生产者生产一个产品:" + list.size());
object.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void consumeOne() {
synchronized (object) {
try {
while (list.isEmpty()) {
object.wait();
System.out.println(Thread.currentThread().getName() + "取消等待");
}
list.remove(0);
System.out.println(Thread.currentThread().getName() + "消费一个产品:" + list.size());
object.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
我们将notify方法修改为notifyAll方法,来看看运行结果:
生产者生产一个产品:1
Thread-3取消等待
Thread-3消费一个产品:0
Thread-1取消等待
生产者生产一个产品:1
Thread-2取消等待
Thread-2消费一个产品:0
Thread-5取消等待
Thread-4取消等待
生产者生产一个产品:1
Thread-1取消等待
Thread-1消费一个产品:0
Thread-3取消等待
生产者生产一个产品:1
Thread-4取消等待
Thread-4消费一个产品:0
Thread-5取消等待
Thread-2取消等待
生产者生产一个产品:1
Thread-3取消等待
Thread-3消费一个产品:0
Thread-1取消等待
生产者生产一个产品:1
Thread-2取消等待
Thread-2消费一个产品:0
当修改了notify方法过后程序就可以完美运行了。
join
当我们使用主线程启动一个子线程的时候,如果子线程中有大量的耗时操作,那么子线程会晚于主线程结束。我们如果想让主线程在子线程执行完成后再执行的话,就可以用到join方法。
下面我们用简单的代码来看下join是如何使用的:
public class TestTool {
public void test() {
try {
System.out.println("test start");
Thread.sleep(5000);
System.out.println("test finish");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ConmuMain {
public static void main(String[] args) {
try {
TestTool testTool = new TestTool();
ThreadA threadA = new ThreadA(testTool);
threadA.start();
threadA.join();
System.out.println("main finish");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static class ThreadA extends Thread {
private TestTool testTool;
public ThreadA(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
testTool.test();
}
}
}
结果为:
test start
test finish
main finish
在使用join过后主线程在子线程执行完成后才继续执行,join内部是使用wait方法来实现的。
join与interrupt
如果在线程中使用join方法,那么该线程在调用interrupt方法后会报错,下面我们来看看代码:
public class ConmuMain {
public static void main(String[] args) {
try {
ThreadB threadB = new ThreadB();
threadB.start();
Thread.sleep(2000);
threadB.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static class ThreadA extends Thread {
@Override
public void run() {
try {
while (true) {
System.out.println("while");
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class ThreadB extends Thread {
@Override
public void run() {
try {
ThreadA threadA = new ThreadA();
threadA.start();
threadA.join();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("ThreadB exception");
}
}
}
}
结果为:
while
while
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1252)
at java.lang.Thread.join(Thread.java:1326)
at conmuDemo.ConmuMain$ThreadB.run(ConmuMain.java:38)
ThreadB exception
我们在调用interrupt方法后,线程B出现了报错,这里我们进入join源码看看:
public final void join() throws InterruptedException
我们发现造成报错的原因是因为join方法抛出了InterruptedException。
join(long millis)
join(long millis)方法是在限定时间内,如果线程还是堵塞状态,就取消堵塞,执行后面的代码,我们来看个例子:
public class TestTool {
public void test() {
try {
System.out.println("test start");
Thread.sleep(5000);
System.out.println("test finish");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ConmuMain {
public static void main(String[] args) {
try {
TestTool testTool = new TestTool();
ThreadA threadA = new ThreadA(testTool);
threadA.start();
threadA.join(2000);
System.out.println("main finish");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static class ThreadA extends Thread {
private TestTool testTool;
public ThreadA(TestTool testTool) {
this.testTool = testTool;
}
@Override
public void run() {
testTool.test();
}
}
}
结果为:
test start
main finish
test finish
我们看到主线程先与子线程执行完。
线程间的通信基本上就讲完了,如果上文有错误的地方欢迎大家指出。