Java多线程(4)-- 协作之线程通信

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

 

join():

在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。

对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join()方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。

public classJoinExample {

    private class A extends Thread {

        @Override

        public void run() {

            System.out.println("A");

        }

    }


    private class B extends Thread {

        private A a;

        B(A a) {

            this.a = a;

        }


        @Override

        public void run() {

            try {

                a.join();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            System.out.println("B");

        }

    }


    public void test() {

        A a = new A();

        B b = new B(a);

        b.start();

        a.start();

    }

}


public static void main(String[] args) {

    JoinExample example = new JoinExample();

    example.test();

}

A

B


探讨一下Java中线程协作的最常见的两种方式:利用Object.wait()、Object.notify()和使用Condition。

waitnotify/notifyAll

Object.wait()、Object.notify()方式可见wait与notify/notifyAll一节内容。

https://www.jianshu.com/writer#/notebooks/38812463/notes/51069661

调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。

它们都属于 Object 的一部分,而不属于 Thread。

只能用在同步方法或者同步控制块中使用,否则会在运行时抛出IllegalMonitorStateException。

使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。

public class WaitNotifyExample {

    public synchronized voidbefore() {

       System.out.println("before");

        notifyAll();

    }


    public synchronized voidafter() {

        try {

            wait();

        } catch(InterruptedException e) {

           e.printStackTrace();

        }

        System.out.println("after");

    }

}


public static void main(String[] args) {

    ExecutorServiceexecutorService = Executors.newCachedThreadPool();

    WaitNotifyExampleexample = new WaitNotifyExample();

   executorService.execute(() -> example.after());

    executorService.execute(()-> example.before());

}

before

after


wait() 和 sleep() 的区别

[if !supportLists]·      [endif]wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;

[if !supportLists]·      [endif]wait() 会释放锁,sleep() 不会。


await() signal() signalAll()

      Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作可以指定等待的条件,因此更加灵活、安全和高效。因此通常来说比较推荐使用Condition,在阻塞队列那一篇博文中就讲述到了,阻塞队列实际上是使用了Condition来模拟线程间协作。

* Condition是个接口,基本的方法就是await()和signal()方法;

* Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()

* 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用


Conditon中的await()对应Object的wait();

Condition中的signal()对应Object的notify();

Condition中的signalAll()对应Object的notifyAll()。

示例:

public class AwaitSignalExample {

    private Lock lock = newReentrantLock();

    private Conditioncondition = lock.newCondition();


    public void before() {

        lock.lock();

        try {

           System.out.println("before");

           condition.signalAll();

        } finally {

            lock.unlock();

        }

    }


    public void after() {

        lock.lock();

        try {

           condition.await();

           System.out.println("after");

        } catch(InterruptedException e) {

           e.printStackTrace();

        } finally {

            lock.unlock();

        }

    }

}


public static void main(String[] args) {

    ExecutorServiceexecutorService = Executors.newCachedThreadPool();

    AwaitSignalExampleexample = new AwaitSignalExample();

   executorService.execute(() -> example.after());

   executorService.execute(() -> example.before());

}

before

after


Condition原理:

Lock的本质是AQS,AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空。而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列。但是,两个队列的作用不同的,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:

1. 线程1调用reentrantLock.lock时,尝试获取锁。如果成功,则返回,从AQS的队列中移除线程;否则阻塞,保持在AQS的等待队列中。

2. 线程1调用await方法时,对应操作是被加入到Condition的等待队列中,等待signal信号,同时释放锁。

3. 锁被释放后,会唤醒AQS队列中的头结点,所以线程2会获取到锁。

4. 线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候线程1并没有被唤醒,只是被加入AQS等待队列。

5. signal方法执行完毕,线程2调用unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是线程1被唤醒,线程1恢复执行。

所以:

发送signal信号只是将Condition队列中的线程加到AQS的等待队列中。只有到发送signal信号的线程调用reentrantLock.unlock()释放锁后,这些线程才会被唤醒。

可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程,本质是将节点从Condition队列中取出来一个还是所有节点放到AQS的等待队列。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待。


AQS原理:

在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),内部实现都依赖AbstractQueuedSynchronizer类。

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

红色节点是默认head节点,其实是一个空节点,我觉得可以理解成代表当前持有锁的线程,每当有线程竞争失败,都是插入到队列的尾节点,tail节点始终指向队列中的最后一个元素。

示例:

public abstract class AbstractQueuedSynchronizer extends

   AbstractOwnableSynchronizer implements java.io.Serializable {

    //等待队列的头节点

    private transientvolatile Node head;

    //等待队列的尾节点

    private transientvolatile Node tail;

    //同步状态

    private volatile intstate;

    protected final intgetState() { return state;}

    protected final voidsetState(int newState) { state = newState;}

    ...

}


      队列同步器AQS是用来构建锁或其他同步组件的基础框架,内部使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,其中内部状态state,等待队列的头节点head和尾节点tail,都是通过volatile修饰,保证了多线程之间的可见。

AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。state的访问方式有三种:

getState()

setState()

compareAndSetState()

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

每个节点中, 除了存储了当前线程,前后节点的引用以外,还有一个waitStatus变量,用于描述节点当前的状态。多线程并发执行时,队列中会有多个节点存在,这个waitStatus其实代表对应线程的状态:有的线程可能获取锁因为某些原因放弃竞争;有的线程在等待满足条件,满足之后才能执行等等。一共有4中状态:

CANCELLED 取消状态

SIGNAL 等待触发状态

CONDITION 等待条件状态

PROPAGATE 状态需要向后传播

等待队列是FIFO先进先出,只有前一个节点的状态为SIGNAL时,当前节点的线程才能被挂起。


线程获取锁过程:

下列步骤中线程A和B进行竞争。

1)线程A执行CAS执行成功,state值被修改并返回true,线程A继续执行。

2)线程A执行CAS指令失败,说明线程B也在执行CAS指令且成功,这种情况下线程A会执行步骤3。

3)生成新Node节点node,并通过CAS指令插入到等待队列的队尾(同一时刻可能会有多个Node节点插入到等待队列中),如果tail节点为空,则将head节点指向一个空节点(代表线程B)

4)node插入到队尾后,该线程不会立马挂起,会进行自旋操作。因为在node的插入过程,线程B(即之前没有阻塞的线程)可能已经执行完成,所以要判断该node的前一个节点pred是否为head节点(代表线程B),如果pred == head,表明当前节点是队列中第一个“有效的”节点,因此再次尝试tryAcquire获取锁,

     1、如果成功获取到锁,表明线程B已经执行完成,线程A不需要挂起。

     2、如果获取失败,表示线程B还未完成,至少还未修改state值。进行步骤5。

5)前面我们已经说过只有前一个节点pred的线程状态为SIGNAL时,当前节点的线程才能被挂起。

     1、如果pred的waitStatus == 0,则通过CAS指令修改waitStatus为Node.SIGNAL。

     2、如果pred的waitStatus > 0,表明pred的线程状态CANCELLED,需从队列中删除。

     3、如果pred的waitStatus为Node.SIGNAL,则通过LockSupport.park()方法把线程A挂起,并等待被唤醒,被唤醒后进入步骤6。

6)线程每次被唤醒时,都要进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环。从无限循环的代码可以看出,并不是被唤醒的线程一定能获得锁,必须调用tryAccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。


线程释放锁过程:

1)如果头结点head的waitStatus值为-1,则用CAS指令重置为0;

2)找到waitStatus值小于0的节点s,通过LockSupport.unpark(s.thread)唤醒线程。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容