线程之间的通信

前言

最近在看并发编程艺术这本书,对看书的一些笔记及个人工作中的总结。

timg.jpeg

前言

每个线程单独的运行,没有多大的价值。线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体,线程间的通信就成为整体的必用方式之一。当线程存在通信指挥,系统间的交互性会更强大,在提高cpu利用率的同时还会使开发对线程任务在处理的过程中进行有效的把控和监督。

等待/通知机制

方法名词 描述
notify() 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是改线程获取到了对象的锁
notifyAll() 通知所有等待在该对象上的线程
wait() 调用该方法的线程进入waiting状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁。
wait(long) 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回
wait(long,int) 对于超时时间更细粒度的控制,可以达到纳秒

先看一个demo:

/**
 * wait notfiy 方法,wait释放锁,notfiy不释放锁
 * 为什么先让t2线程先执行?
 *   因为t2执行了,那么lock.wait()释放锁,t1进入执行,,然后执行到第5次的时候,发出通知,执行lock.notify();但是lock.notify();并没有释放锁,所以继续执行,执行10次之后释放锁,
 *   t2执行,弊端就是没有实时通知,解决方案看第三个列子
 *
 * 如果t1先执行,t2后执行,那么执行10次之后list长度等于10,释放锁,执行t2,然后wait等待,下面的代码就不执行了
 *
 */
public class ListAdd2 {
    private volatile static List list = new ArrayList();

    public void add(){
        list.add("bjsxt");
    }
    public int size(){
        return list.size();
    }

    public static void main(String[] args) {

        final ListAdd2 list2 = new ListAdd2();

        // 1 实例化出来一个lock
        //当使用wait 和 notify 的时候 , 一定要配合着synchronized关键字去使用
        final Object lock = new Object();


        Thread t1 = new Thread(() -> {
            try {
                synchronized (lock) {
                    for(int i = 0; i <10; i++){
                        list2.add();
                        System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
                        Thread.sleep(500);
                        if(list2.size() == 5){
                            System.out.println("已经发出通知..");
                            lock.notify();
                        }
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }, "t1");

        Thread t2 = new Thread(() -> {
            synchronized (lock) {
                if(list2.size() != 5){
                    try {
                        //System.out.println("t2进入...");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
                throw new RuntimeException();
            }
        }, "t2");

        t2.start();
        t1.start();


    }

}

执行结果:

当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
已经发出通知..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t2收到通知线程停止..
Exception in thread "t2" java.lang.RuntimeException
    at com.zhihao.miao.day04.ListAdd2.lambda$main$1(ListAdd2.java:64)
    at java.lang.Thread.run(Thread.java:745)

总结
1.使用wait()、notify()和notifyAll()时需要先对调用对象加锁。
2.调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。 释放锁。
3.notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。notify()或notifyAll()不释放锁。
4.notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
5.从wait()方法返回的前提是获得了调用对象的锁。
6.wait和notify必须配合synchronized关键字使用

上面的demo有个弊端就是当list.size==5,执行了lock.notify();但是并没有释放锁,在执行完list遍历之后释放了锁,而我们的需求是list.size等于5的时候就执行等待的操作,怎样解决这样的场景呢?

/**
 *CountDownLatch解决了实时性,当我们在list.size()等于5的时候就唤醒countDownLatch.await()的线程,也就保存了实时性
 *而使用notify却不能释放锁,等待循环结束释放锁,
 */
public class ListAdd3 {
    private volatile static List list = new ArrayList();

    public void add(){
        list.add("bjsxt");
    }
    public int size(){
        return list.size();
    }

    public static void main(String[] args) {

        final ListAdd2 list2 = new ListAdd2();

        final CountDownLatch countDownLatch = new CountDownLatch(1); //这边的参数为几,下面的countDown()就要执行几次才能唤醒

        Thread t1 = new Thread(() -> {
            try {
                for(int i = 0; i <10; i++){
                    list2.add();
                    System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
                    Thread.sleep(500);
                    if(list2.size() == 5){
                        System.out.println("已经发出通知..");
                        countDownLatch.countDown();  //释放锁
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }, "t1");

        Thread t2 = new Thread(() -> {
            if(list2.size() != 5){
                try {
                    countDownLatch.await();//等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
            throw new RuntimeException();
        }, "t2");

        t2.start();
        t1.start();

    }
}

执行结果:

当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
已经发出通知..
Exception in thread "t2" java.lang.RuntimeException
当前线程:t1添加了一个元素..
    at com.zhihao.miao.day04.ListAdd3.lambda$main$1(ListAdd3.java:53)
当前线程:t2收到通知线程停止..
    at java.lang.Thread.run(Thread.java:745)
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..

实现了我们的业务逻辑

Thread.join()的使用

如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(long millis,int nanos)两个具备超时特性的方法。这两个超时方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。

public class Join {
    public static void main(String[] args) throws Exception {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }

        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
    }

    static class Domino implements Runnable {
        private Thread thread;

        public Domino(Thread thread) {
            this.thread = thread;
        }

        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    }
}
main terminate.
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.

执行结果很明显,因为main线程先启动,调用了join表名正在执行完的线程执行结果才执行下面的线程。

查看了join的源码可知:

 public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。

Condition的实现分析

1.等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。

2.等待
调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

3.通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。

public class TestCondition {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void method1(){
        try {
            lock.lock();
            System.out.println("当前线程:" + Thread.currentThread().getName() + "进入等待状态..");
            Thread.sleep(3000);
            System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..");
            condition.await();  // Object wait
            System.out.println("当前线程:" + Thread.currentThread().getName() +"继续执行...");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void method2(){
        try {
            lock.lock();
            System.out.println("当前线程:" + Thread.currentThread().getName() + "进入..");
            Thread.sleep(3000);
            System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..");
            condition.signal();     //Object notify
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {

        final TestCondition uc = new TestCondition();
        Thread t1 = new Thread(() -> uc.method1(), "t1");
        Thread t2 = new Thread(() -> uc.method2(), "t2");

        t1.start();   //t1先执行,然后阻塞,然后t2线程执行await释放锁,
        t2.start();
    }

}
当前线程:t1进入等待状态..
当前线程:t1释放锁..
当前线程:t2进入..
当前线程:t2发出唤醒..
当前线程:t1继续执行...

测试signal或者signalAll方法是否也和notify一样不释放锁?

public class TestCondition2 {
    private volatile static List list = new ArrayList();

    public void add(){
        list.add("miaozhihao");
    }
    public int size(){
        return list.size();
    }

    public static void main(String[] args) {

        final ListAdd2 list2 = new ListAdd2();

        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        Thread t1 = new Thread(() -> {
            try {
                    lock.lock();
                    for(int i = 0; i <10; i++){
                        list2.add();
                        System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
                        Thread.sleep(500);
                        if(list2.size() == 5){
                            System.out.println("已经发出通知..");
                            condition.signal();
                        }
                    }
                    lock.unlock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }, "t1");

        Thread t2 = new Thread(() -> {
                lock.lock();
                if(list2.size() != 5){
                    try {
                        System.out.println("线程进入await方法");
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
                lock.unlock();
                throw new RuntimeException();
            }
        , "t2");

        t2.start();
        t1.start();
    }
}

执行结果:

线程进入await方法
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
已经发出通知..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t1添加了一个元素..
当前线程:t2收到通知线程停止..
Exception in thread "t2" java.lang.RuntimeException
    at com.zhihao.miao.day10.TestCondition2.lambda$main$1(TestCondition2.java:63)
    at java.lang.Thread.run(Thread.java:745)

由此可见答案是肯定的,即signal合signalAll也是不释放锁的。

多Condition的使用

public class TestManyCondition {

    private ReentrantLock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();

    public void m1(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
            c1.await();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m2(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
            c1.await();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m3(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
            c2.await();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m4(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
            c1.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m5(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
            c2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {


        final TestManyCondition umc = new TestManyCondition();
        Thread t1 = new Thread(() -> umc.m1(),"t1");
        Thread t2 = new Thread(() -> umc.m2(),"t2");
        Thread t3 = new Thread(() -> umc.m3(),"t3");
        Thread t4 = new Thread(() -> umc.m4(),"t4");
        Thread t5 = new Thread(() -> umc.m5(),"t5");

        t1.start(); // c1
        t2.start(); // c1
        t3.start(); // c2


        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t4.start(); // 唤醒了c1
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t5.start(); // 唤醒了c2

    }
}
当前线程:t1进入方法m1等待..
当前线程:t2进入方法m2等待..
当前线程:t3进入方法m3等待..
当前线程:t4唤醒..
当前线程:t1方法m1继续..
当前线程:t2方法m2继续..
当前线程:t5唤醒..
当前线程:t3方法m3继续..

t4唤醒了t1和t2,而t5唤醒了t3,实际工作中根据业务的不同来选择使用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容