Java并发编程基础-线程间通信

线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,这将会带来巨大的价值。

volatile和synchronized关键字

Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。

以上引用自《Java并发编程的艺术》,同时也说明了我们在 Java并发机制的底层实现原理-volatile 中所提到的那个问题,为什么另一个线程会无法跳出循环。而volatile关键字用来修饰字段或成员变量,就是告知程序该变量在任何时候都需要从共享内存中获取,并且对他的改变必须同步刷新回共享内存,所以能保证对所有线程访问的可见性。

关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。

详情参考之前写的另一篇文章 Java并发机制的底层实现原理-synchronized

等待/通知机制

一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个 过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模 式隔离了“做什么”(what)和“怎么做”(How),在功能层面上实现了解耦,体系结构上具备了良 好的伸缩性,但是在Java语言中如何实现类似的功能呢?

最简单的办法是写一个循环,检查是否当前需要执行对应操作,直到满足条件执行,退出循环。

while (value != desire) {
     Thread.sleep(1000);
}
doSomeThing();

上面这段伪代码在条件不满足的时候睡眠一段时间,这样做的目的是防止过快的“无效”尝试,这种方式看似可以实现所需的功能,但是会存在如下问题:

  • 难以确保及时性,比如无法及时发现条件已经变化,也就是及时性难以保证
  • 难以降低开销,如果将睡眠时间缩短,比如睡眠1毫秒,这样能更迅速的发现条件变化,但是如果条件长时间没有变化,则又会加剧消耗更多的处理器资源,造成无端的浪费

如何解决以上两个问题?Java通过内置的等待/通知机制能够很好的解决这个矛盾并且实现上述需求。
下表为等待/通知机制的相关方法:

方法名称 方法描述
notify() 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提
是该线程获取到了对象的锁
notifyAll() 通知所有等待在该对象上的线程
wait() 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被
中断才会返回,需要注意,调用wait()方法后,会释放对象的锁
wait(long) 超时等待一段时间,这里的参数时间是毫秒也就是等待长达n毫秒,
如果没有通知就超时返回
wait(long, int) 对于超时时间更细力度的控制,可以控制到纳秒

等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

如下所示代码中,首先启动了waitThread线程,该线程获取到了lock对象的锁,随后调用了wait方法释放锁,等待其他线程通知唤醒自身,随后等待一毫秒启动notifyThread线程,因为在一定情况下两个线程同时启动是有可能后启动的线程先抢到CPU资源而先执行。由于waitThread释放了锁,所以在notifyThread内一定可以获取到锁,随后notifyThread调用notifyAll方法唤醒等待的waitThread,此时waitThread不能立即获取到锁,需要等待notifyThread释放锁时候才可以获取到锁,即在调用notifyAll方法5毫秒之后waitThread重新获取到锁并且结束循环,执行最后的代码,程序结束。

public class WaitNotify {

    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        Thread.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {
        @Override
        public void run() {
            // 加锁,拥有lock对象的锁
            synchronized (lock) {
                // 当条件不满足时,继续wait,同时释放了lock的锁
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " flag is true. wait at " + System.currentTimeMillis());
                        lock.wait();
                    } catch (InterruptedException ex) {
                    }
                }
                // 条件满足时,跳出循环,执行最后的代码
                System.out.println(Thread.currentThread().getName() + " flag is false. running at " + System.currentTimeMillis());
            }
        }
    }

    static class Notify implements Runnable {
        @Override
        public void run() {
            // 加锁,拥有lock对象的锁
            synchronized (lock) {
                // 获取lock对象的锁,然后进行通知,通知时不会释放lock的锁,
                // 知道当前线程释放了lock后,WaitThread才能从wait方法中返回
                System.out.println(Thread.currentThread().getName() + " hold lock. notify at " + System.currentTimeMillis());
                lock.notifyAll();
                flag = false;
                try {
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                }
            }
        }
    }
    
}

细节如下:

  • 使用wait()notify()notifyAll()时需要先对调用对象加锁
  • 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的 等待队列
  • notify()notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()notifAll()的线程释放锁之后,等待线程才有机会从wait()返回
  • notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED
  • wait()方法返回的前提是获得了调用对象的锁

总结

从上述WaitNotify实例中可以提炼出等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知方(生产者)
等待方遵循如下原则:

  • 获取对象的锁
  • 如果条件不满足,那么调用wait()方法等待,被通知后仍要检查条件
  • 条件满足则执行对应的逻辑

对应伪代码如下:

synchronized(对象) {
    while(条件不满足) {
        对象.wait();
    }
}

通知方遵循如下原则:

  • 获得对象的锁
  • 改变条件
  • 通知所有等待在对象上的线程

对应的伪代码如下:

synchronized(对象) {
    改变条件
    对象.notifyAll();
}

管道输入/输出流

管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下4种具体实现:PipedOutputStreamPipedInputStreamPipedReaderPipedWriter,前两种面向字节,而后两种面向字符。

以下代码使用PipedReaderPipedWriter为例,从控制台接收用户输入再通过输入流传递到另一线程中的输出流,从而实现了线程间以流的方式的数据传输:

public class Piped {

    public static void main(String[] args) throws Exception {
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader(); // 将输出流和输入流进行连接,否则在使用时会抛出IOException
        out.connect(in);
        Thread printThread = new Thread(new Print(in), "PrintThread");
        printThread.start();
        int receive = 0;
        try {
            // 从控制台接收输入
            while ((receive = System.in.read()) != -1) {
                out.write(receive);
            }
        } finally {
            out.close();
        }
    }

    static class Print implements Runnable {
        private PipedReader in;

        public Print(PipedReader in) {
            this.in = in;
        }

        public void run() {
            int receive = 0;
            try {
                while ((receive = in.read()) != -1) {
                    System.out.print((char) receive);
                }
            } catch (IOException ex) {
            }
        }
    }
}

注意对于Piped类型的流,必须先要进行绑定,也就是调用connect()方法,如果没有将输入/输 出流绑定起来,对于该流的访问将会抛出异常。

Thread.join()的使用

如果A线程执行了Thread.join(),那么表示当前线程需要等待A线程执行完毕返回之后才会继续执行其后的代码,除了该方法以外,Thread还提供了了join(long millis)join(long millis,int nanos)方法,这两个方法具有超时性质,如果在规定等待时间内A线程未能执行结束,那么程序将从该方法中返回,继续执行其后的代码。

以下代码为Thread.join()方法的示例:

public class Join {

    public static void main(String[] args) throws InterruptedException {
        Runnable runnableA = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " run at " + System.currentTimeMillis());
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                }
            }
        };
        Runnable runnableB = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " run at " + System.currentTimeMillis());
                    Thread.sleep(10);
                    System.out.println(Thread.currentThread().getName() + " end at " + System.currentTimeMillis());
                } catch (InterruptedException e) {
                }
            }
        };

        Thread threadA = new Thread(runnableA, "Thread-A");
        Thread threadB = new Thread(runnableB, "Thread-B");
        threadA.start();
        threadA.join();
        System.out.println(threadA.getName() + " end at " + System.currentTimeMillis());
        threadB.start();
        threadB.join(5);
        System.out.println(threadB.getName() + " returned at " + System.currentTimeMillis());
    }

}

该方法执行结果如下:

Thread-A run at 1593665701058
Thread-A end at 1593665701064
Thread-B run at 1593665701064
Thread-B returned at 1593665701070
Thread-B end at 1593665701075

可以看到A线程从执行到结束经过了6毫秒左右,而从线程B执行到线程B返回也只经过了6毫秒左右,然而实际B线程执行到结束经过了11毫秒左右。这就说明B线程由于等待超时已经从join()方法中返回。

上面的例子实际执行时间会比我们程序设置的等待时间多出1-2毫秒,这是由于线程等待和唤醒之间的切换所消耗的时间。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352