多线程中wait/notify的一些理解,以及最经典的生产者和消费者

前言

说到wait,很容易想到一个常问的面试题,wait和sleep的区别。简单的理解就是,wait是object的方法,调用wait(),线程会进去等待状态,并且会释放出锁,非阻塞的,等待持有锁的线程调用notify(),才会继续执行。sleep是Thread的方法,sleep()会让当前线程暂停一段时间,但是不会释放锁,是阻塞的。

Wait/notify在线程中的通信

我们先看下面一段代码:
class Data:

        package com.notrace;
        import java.util.ArrayList;
        import java.util.List;
        public class Data {
    private List list=new ArrayList();
      public void add() {
        list.add("notrace");
    }
      public int getSize() {
    return list.size();
    }
  }

ThreadA:

  package com.notrace;     
  public class ThreadA extends Thread{
 private Data data;
 public ThreadA(Data data) {
    super();
    this.data=data;
}

@Override
public void run() {
    try {
        for (int i = 0; i < 10; i++) {
            data.add();
            System.out.println("添加了"+(i+1)+"个元素");
            Thread.sleep(1000);
            
        }
    } catch (Exception e) {
    }
}   
 }

ThreadB:

package com.notrace;
public class ThreadB extends Thread{

private Data data;
public ThreadB(Data data) {
    super();
    this.data=data;
}

@Override
public void run() {
    try {
        while (true) {
    System.out.print("");
            if(data.getSize()==6) {
                System.out.println("==6,线程B要退出了");
                throw new InterruptedException();
            }
        }

    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

Test:

  package com.notrace;
public class Test {
public static  void  main(String []args) {
    Data data=new Data();
    ThreadA threadA=new ThreadA(data);
    threadA.setName("A");
    threadA.start();
    ThreadB threadB=new ThreadB(data);
    threadB.setName("B");
    threadB.start();
}
}

运行test,结果如下:
1.png

两个线程之间实现了通信,但是有一个弊端,就是线程B,要不停的循环,这样会很浪费资源,那么有没有什么好的办法去实现多个线程通信呢,答案是肯定的,就是wait/notify的机制。

wait/notify机制简单介绍

举个栗子,就说我们日常生活中的餐厅吧,厨师和服务员之间的交互,
厨师做完菜,放到传递台上,服务员来传递台上取,往桌上送。
1)厨师做完一道菜的时间是不确定的,所以把菜放到传递台上的时间也是不确定的。
2)服务员取菜的时间,取决于厨师,所以,厨师做完菜之前,服务员,就有了等待(wait)的状态。
3)服务员是怎么取到菜呢?这取决于厨师,简单点来说,厨师做完菜,放到传递台上,然后大喊一声:“红烧肉好了,来上菜。”然后服务员就过来取了,这个大喊一声,就相当于notify的过程。
4)这个情景中就有wait/notify的机制了,那么如何用代码来实现呢?

wait/notify机制简单实现

方法wait()的作用是使当前执行代码的线程进行等待,wait()方法是Object类的方法,该方法用来将当前线程置入“预执行队列”中,并且在wait()所在的代码行处停止执行,直到接到通知或被中断为止。在调用wait()之前,线程必须获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法。在执行wait()方法后,当前线程释放锁。 在从wait()返回前,线程与其他线程竞争重新获得锁。如果调用wait()时没有持有适当的锁,则抛出IllegalMonitorStateException,它是RuntimeException的一个子类,因此,不需要try-catch语句进行捕捉异常。
方法notify()也要在同步方法或同步块中调用,即在调用前,线程也必须获得该对象的对象级别锁。如果调用notify()时没有持有适当的锁,也会抛出IllegalMonitorStateException。该方法用来通知那些可能等待该对象的对象锁的其他线程,如果有多个线程等待,则由线程规划器随机挑选出其中一个呈wait状态的线程,对其发出通知notify,并使它等待获取该对象的对象锁。需要说明的是,在执行notify()方法后,当前线程不会马上释放该对象锁,呈wait状态的线程也并不能马上获取该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出synchronized代码块后,当前线程才会释放锁,而呈wait状态所在的线程才可以获取该对象锁。当第一个获得了该对象锁的wait线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用notify语句,则即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,还会继续阻塞在wait状态,直到这个对象发出一个notify或notifyAll。
用一句话来总结一下wait和notify:wait使线程停止运行,而notify使停止的线程继续运行。
代码测试如下:
ThreadA:

      package com.notrace;

public class ThreadA extends Thread {
private Object lock;

public ThreadA(Object lock) {
    super();
    this.lock = lock;
}

@Override
public void run() {
    try {
        synchronized (lock) {
            System.out.println("begin wait time" +       System.currentTimeMillis());
            lock.wait();
            System.out.println("end wait time" + System.currentTimeMillis());
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

ThreadB:

        package com.notrace;

public class ThreadB extends Thread {

private Object lock;

public ThreadB(Object lock) {
    super();
    this.lock = lock;
}

@Override
public void run() {

    try {

        synchronized (lock) {
            System.out.println("begin notify time:" + System.currentTimeMillis());
            lock.notify();
            System.out.println("end nofity time:" + System.currentTimeMillis());
        }

    } catch (Exception e) {
        e.printStackTrace();
        System.out.println(e.toString());
    }
}

}

Test:

        package com.notrace;

public class Test {
public static void main(String[] args) {

    try {
        Object lock = new Object();
        ThreadA threadA = new ThreadA(lock);
        threadA.setName("A");
        threadA.start();

        Thread.sleep(3000);
        ThreadB threadB = new ThreadB(lock);
        threadB.setName("B");
        threadB.start();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

  }

运行结果如下图:


2.png

从打印结果来看,3S后线程别nofity唤醒.
我们再来实现下之前的上面那个例子:
ThreadA:

        package com.notrace;

    public class ThreadA extends Thread {
private Object lock;

public ThreadA(Object lock) {
    super();
    this.lock = lock;
}

@Override
public void run() {
    try {
        synchronized (lock) {
            if(Data.getSize()!=6)
            System.out.println("begin wait time" +       System.currentTimeMillis());
            lock.wait();
            System.out.println("end wait time" + System.currentTimeMillis());
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

ThreadB:

        package com.notrace;
  public class ThreadB extends Thread{

    private Object  lock;
  public ThreadB(Object lock) {
    super();
    this.lock=lock;
}

@Override
public void run() {
    try {

    synchronized (lock) {
        for (int i = 0; i < 10; i++) {
            Data.add();
            if(Data.getSize()==6) {
                lock.notify();
                System.out.println("发出通知");
            }
            System.out.println("添加了"+(i+1)+"个元素");
            Thread.sleep(1000);
        }        
      
    }

} catch (Exception e) {
    e.printStackTrace();
    System.out.println(e.toString());
}
}

}

Data:

        package com.notrace;

  import java.util.ArrayList;
  import java.util.List;

  public class Data {
private static List list = new ArrayList();

public static void add() {
    list.add("notrace");
}

public static int getSize() {
    return list.size();
}
    }

Test:

      package com.notrace;

    public class Test {
public static void main(String[] args) {

    try {
        Object lock = new Object();
        ThreadA threadA = new ThreadA(lock);
        threadA.setName("A");
        threadA.start();

        Thread.sleep(50);
        ThreadB threadB = new ThreadB(lock);
        threadB.setName("B");
        threadB.start();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

}

运行结果如下:


3.png

日志说明了一个问题,notify()调用之后,并不会立即释放锁,而是会接着执行后面的代码,执行完了,才会释放锁。

synchronized可以将任何一个Object对象作为同步对象来看待,每个Object都实现了wait()和notify()。他们必须用被用在synchronized同步的区域内,wait()让区域内的线程进入等待状态,释放同步锁,notify()可以唤醒一个因调用了wait()而处于等待状态中的线程。被唤醒的线程继续执行wait()后面的代码。如果notify之后没有处于阻塞等待中的线程,就会被忽略。
关于释放锁,有以下几个结论,
1)执行完同步代码块,就会释放锁。
2)执行代码块的过程中,如果遇到异常而导致线程终止,也会释放锁。
3)调用wait()这个线程会释放锁。

另外还有一点,需要提一下。notify()只会唤醒一个线程,所以在多线程的时候,可能会出现一些问题,所以,尽量用notifyAll()唤醒所有等待的线程 比较好。下面来看看另一个经典问题。

生产者/消费者模式

一个生产者,一个消费者

废话不多说,直接看code.
生产者P:

            package com.notrace;

  //生产者
  public class P {
private String lock;
public P(String lock) {
    super();
    this.lock=lock;
}
public void setValue() {
    try {
        synchronized (lock) {
            if(!ValueObj.value.equals("")) {
                lock.wait();
            }
            String value=System.currentTimeMillis()+"";
            System.out.println("set的值:"+value);
            ValueObj.value=value;
            lock.notify();
        }
    } catch (Exception e) {
        // TODO: handle exception
    }
}
}

消费者C:

  package com.notrace;

    //消费者
    public class C {
private String lock;
public C(String lock) {
    super();
    this.lock=lock;
}
public void getValue() {
try {
    synchronized (lock) {
        if(ValueObj.value.equals("")) {
            lock.wait();
        }
        System.out.println("get的值:"+ValueObj.value);
        ValueObj.value="";
        lock.notify();
    }
} catch (Exception e) {
    // TODO: handle exception
}
}

}

线程P:

    package com.notrace;

public class ThreadP extends Thread{

private P p;
public ThreadP(P p) {
    super();
    this.p=p;
}
@Override
public void run() {
    while(true) {
        p.setValue();
    }
}
}

线程C:

            package com.notrace;

  public class ThreadC extends Thread{
private C c;
public ThreadC(C c) {
    super();
    this.c=c;
}
@Override
public void run() {
    while(true) {
        c.getValue();
    }
}
}

Run:

          package com.notrace;

public class Run {
public static void main(String []args) {
    String lock=new String("");
    P p=new P(lock);
    C c=new C(lock);
    ThreadP tP=new ThreadP(p);
    ThreadC tC=new ThreadC(c);
    tP.start();
    tC.start();
}

}

运行结果:


4.png

可以看出来,是生产者,消费者,交替运行的。

接下来变化一下:

多生产者,多消费者

P:

    package com.notrace;

  //生产者
  public class P {
private String lock;
public P(String lock) {
    super();
    this.lock=lock;
}
public void setValue() {
    try {
        synchronized (lock) {
            while(!ValueObj.value.equals("")) {
                System.out.println("生产者:"+Thread.currentThread().getName()+"watting*");
                lock.wait();
            }
            System.out.println("生产者:"+Thread.currentThread().getName()+"run*");
            String value =System.currentTimeMillis()+"";
            ValueObj.value=value;
            lock.notify();
        }
    } catch (Exception e) {
        // TODO: handle exception
    }
}
}

C:

  package com.notrace;

//消费者
  public class C {
private String lock;
public C(String lock) {
    super();
    this.lock=lock;
}
public void getValue() {
try {
    synchronized (lock) {
        while(ValueObj.value.equals("")) {
            System.out.println("消费者:"+Thread.currentThread().getName()+"waitting");
            lock.wait();
        }
        System.out.println("消费者:"+Thread.currentThread().getName()+"run");
        System.out.println("get的值:"+ValueObj.value);
        ValueObj.value="";
        lock.notify();
    }
} catch (Exception e) {
    // TODO: handle exception
}
}

}

threadP,threadC一样:

Run:

        package com.notrace;

  public class Run {
public static void main(String []args) {
    try {
        String lock=new String("");
        P p=new P(lock);
        C c=new C(lock);
        
        ThreadP[] pThreadPs=new ThreadP[2];
        ThreadC[]cThreadCs=new ThreadC[2];
        for(int i=0;i<2;i++) {
            pThreadPs[i]=new ThreadP(p);
            pThreadPs[i].setName("生产者"+(i+1));
            cThreadCs[i]=new ThreadC(c);
            cThreadCs[i].setName("消费者"+(i+1));
            pThreadPs[i].start();
            cThreadCs[i].start();
        }

        Thread.sleep(5000); 
    } catch (Exception e) {
        // TODO: handle exception
    }
    
    
}

  }

执行完:


5.png

两个都是等待,而且还没执行完,是不是很奇怪。。。产生这种情况是因为唤醒的早了点, 生产者1唤醒了生产者2,然而生产者2发现产品并未被消费,所以生产者2也是wait状态。另外,还有种原因,就是连续唤醒同类。解决这种问题,就需要将P和C中的notify改成notifyAll(),将生产者和消费者一起唤醒,就ok了。

咱接着变

一生产一消费:来操作栈

这个场景就是生产者向堆栈List中放数据,消费者从List中取数据。list最大容量是1:
p:

        package com.notrace;

  //生产者
  public class P {


private MyStack stack;
public P(MyStack stack) {
    super();
    this.stack=stack;
    
}
public void pushService() {
    stack.push();
}
}

C:

        package com.notrace;

  //消费者
public class C {

private MyStack stack;
public C(MyStack stack) {
    super();
    this.stack=stack;
}

public void popService() {
    System.out.println("pop="+stack.pop());
}
  }

MyStack:

        package com.notrace;

  import java.util.ArrayList;
  import java.util.List;

  public class MyStack {
  private List list=new ArrayList<>();
  synchronized public void push() {
try {
    if(list.size()==1) {
        this.wait();
    }
    list.add("notrace:"+Math.random());
    this.notify();
    System.out.println("push="+list.size());
} catch (Exception e) {
    // TODO: handle exception
}
  }

      synchronized public String pop() {
String value="";
try {
    if(list.size()==0) {
        System.out.println("pop操作"+Thread.currentThread().getName());
        this.wait();
    }
    value=""+list.get(0);
    list.remove(0);
    this.notify();
    System.out.println("pop"+list.size());
} catch (Exception e) {
    // TODO: handle exception
}
return value;
  }
}

ThreadP:

        package com.notrace;

  public class ThreadP extends Thread{

private P p;
public ThreadP(P p) {
    super();
    this.p=p;
}
@Override
public void run() {
    while(true) {
        System.out.print("");
        p.pushService();
//          p.setValue();
        
    }
}
}

ThreadC:

        package com.notrace;

public class ThreadC extends Thread{
private C c;
public ThreadC(C c) {
    super();
    this.c=c;
}
@Override
public void run() {
    while(true) {
        System.out.print("");
        c.popService();
  //            c.getValue();
    }
}
  }

Run:

        package com.notrace;

public class Run {
public static void main(String []args) {

    MyStack stack=new MyStack();
    P p=new P(stack);
    C c=new C(stack);
    ThreadP threadP=new ThreadP(p);
    ThreadC threadC=new ThreadC(c);
    threadP.start();
    threadC.start();
}
}

执行结果:


6.png

程序的结果是size不会大于1|。

一生产多消费:来操作栈

代码还是上面的那些,我们改一下Run:

        package com.notrace;

public class Run {
public static void main(String []args) {
    
    
    MyStack stack=new MyStack();
    P p=new P(stack);
    C c1=new C(stack);
    C c2=new C(stack);
    C c3=new C(stack);
    C c4=new C(stack);
    C c5=new C(stack);
    ThreadP threadP=new ThreadP(p);
    ThreadC threadC1=new ThreadC(c1);
    ThreadC threadC2=new ThreadC(c2);
    ThreadC threadC3=new ThreadC(c3);
    ThreadC threadC4=new ThreadC(c4);
    ThreadC threadC5=new ThreadC(c5);
    threadP.start();
    threadC1.start();
    threadC2.start();
    threadC3.start();
    threadC4.start();
    threadC5.start();
  }
  }

执行结果:
7.png

假死状态了,多次执行。还可能会出现indexoutofbounce,这是因为条件改变时并没有得到及时的响应,引起了list.remove(0)报错,解决办法是将MyStack中的if()判断改为while()。多消费一生产,多消费多生产,也是一样的。改一下run里面的代码就行了

好了,基本完事。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容