大飞老师带你看线程(并发容器-SynchronousQueue)上

本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处。

概述

SynchronousQueue 是一个特殊的阻塞BlockingQueue队列(实现类), 但是它跟BlockingQueue又有显著不同:
1>SynchronousQueue没有容量,算是一个不存储元素的BlockingQueue。每次put操作之后,当前线程(生产者)会挂起,等待另外一个线程(消费者)执行take操作后,才会唤醒挂起线程(生产者)执行,否则阻塞,不运行添加元素.反之亦然。
2>SynchronousQueue 因为没有容量,所以遍历,size, isEmpty 这些常规具有遍历性质的方法就没意义啦
3>SynchronousQueue分公平队列和非公平队列,默认是false,非公平队列. 当然,我也也可以使用SynchronousQueue(boolean fair) 构造器额外指定.
外部结构:


外部结构

内部结构:(有点复杂)
构造器:

    public SynchronousQueue() {
        this(false);
    }
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

带boolean类型参数的构造器, fair 为true时, 是公平策略, 使用TransferQueue 对象实现, fairl为fasle时, 非公平策略队列所, 使用TransferStack 对象实现.

//内部api: TransferQueue  跟 TransferStack  类的父类
//目的:规范跟统一SynchronousQueue 队列公平与非公平策略操作api
abstract static class Transferer<E> {
    //take 跟 put 操作
    abstract E transfer(E e, boolean timed, long nanos);
}

TransferStack: 非公平队列

static final class TransferStack<E> extends Transferer<E> {
        static final int REQUEST    = 0;      //当前线程是消费者
        static final int DATA       = 1;           //当前线程是生成者
        static final int FULFILLING = 2;     //其他情况

        //使用SNode对象辅助完成队列实现
        static final class SNode {
            volatile SNode next;       
            volatile SNode match;     
            volatile Thread waiter;    
            Object item;              
            int mode;
           //.....................
        }
        E transfer(E e, boolean timed, long nanos) {
            //.....
        }
}

SynchronousQueue队列实现非公平策略核心类:TransferStack, 内部定义SNode作为容器内容节点.

TransferQueue: 公平队列

static final class TransferQueue<E> extends Transferer<E> {
      static final class QNode {
            volatile QNode next;         
            volatile Object item;        
            volatile Thread waiter;       
            final boolean isData;    //标记生产者还是消费者
            //..................
      }

       E transfer(E e, boolean timed, long nanos) {
          //......................
       }
}

SynchronousQueue队列实现公平策略核心类:TransferQueue, 内部定义QNode 作为容器内容节点.

基本使用

1:take/put 操作线程阻塞,等待配对的put/take

public class App {

    public static void main(String[] args) throws InterruptedException {

        final  SynchronousQueue<String> queue = new SynchronousQueue<String>();
        queue.put("a");
        queue.take();
        System.out.println("end....");
    }
}

运行后, 并控制台中并没有打印出"end.....", 前面提到了, SynchronousQueue 没有容量的概念, 生产线程put元素之后,线程会自动挂起, 等待消费线程take唤醒.执行put方法之后, main线程会挂起, 所以执行不了end....., 执行take操作也一样. 结论:SynchronousQueue take跟put 必须是配对的, 否则总一个线程被挂起.

2: 查询性质方法无意义, 必须等待take 与put 配对之后线程才可以继续执行

public class App {
    public static void main(String[] args) throws InterruptedException {
        final  SynchronousQueue<String> queue = new SynchronousQueue<String>();
        queue.put("a");  //线程被挂起
        System.out.println("大小:" + queue.size());
        System.out.println("为空:" + queue.isEmpty());
    }
}

3: SynchronousQueue 执行规则: 一个put 对应一个take,反之亦然

public class App {

    public static void main(String[] args) throws InterruptedException {
        final  SynchronousQueue<String> queue = new SynchronousQueue<String>();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t1 begin...");
                    queue.put("a");
                    System.out.println("t1 end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }).start();

        Thread.sleep(1000);

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t2 begin...");
                    System.out.println(queue.take());
                    System.out.println("t2 end....");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }).start();

    }
}

输出结果:

t1 begin...
t2 begin...
a
t2 end....
t1 end...

多执行几次, 结果差不多, 但每次a的输出必须是在t2 begin... 执行之后, 原因: t1 线程添加a元素之后马上挂起, 等t2线程执行take消费并唤醒之后, t1才有机会继续执行.

4:SynchronousQueue分公平队列和非公平队列
公平队列: put线程入队时, 会依次挂起. 当执行take线程时,挂起的put线程按FIFO原则,谁先挂起,谁先唤醒.

非公平队列: put线程入队时, 会依次挂起. 当执行take线程时, 随机唤醒挂起的put线程.

public class App{

    public static void main(String[] args) throws InterruptedException {
        //非公平
        final  SynchronousQueue<String> queue = new SynchronousQueue<String>(false);
        //公平
        //final  SynchronousQueue<String> queue = new SynchronousQueue<String>(true);

        //创建5个偶数put线程
        for (int i = 0; i < 10; i+= 2){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " begin...");
                        //用于识别队列中元素由哪个线程入列的
                        queue.put("put:" + Thread.currentThread().getName());
                        System.out.println(Thread.currentThread().getName() + " end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }, "t" + i).start();

        }
        //休眠4s保证 put线程全部挂起等待
        Thread.sleep(4000);

        //创建5个奇数take线程
        for (int i = 1; i < 10; i+=2){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " begin...");
                        //查看take 与 put 线程配对
                        System.out.println(queue.take());
                        System.out.println(Thread.currentThread().getName() + " end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            },"t" + i).start();
        }
    }
}

结果分析:

//公平
final  SynchronousQueue<String> queue = new SynchronousQueue<String>(true);
-------------------------------------------------
t2 begin...
t6 begin...
t0 begin...
t4 begin...
t8 begin...
t1 begin...
put:t2
t1 end...
t3 begin...
put:t6
t3 end...
t9 begin...
put:t0
t9 end...
t0 end...
t7 begin...
put:t4
t7 end...
t5 begin...
t4 end...
t2 end...
t8 end...
t6 end...
put:t8
t5 end...

将SynchronousQueue 切换成公平锁策略时, put线程挂起顺序依次是: t2 t6 t0 t4 t8, 4s之后, take线程执行后的配对的顺序是: put:t2 put:t6 put:t0 put:t4 put:t8 , put的顺序跟take的顺序完全一致.

//非公平
final  SynchronousQueue<String> queue = new SynchronousQueue<String>(false);
-------------------------------------------------
t2 begin...
t6 begin...
t0 begin...
t4 begin...
t8 begin...
t1 begin...
t3 begin...
t7 begin...
put:t0
t7 end...
t0 end...
put:t4
t1 end...
put:t8
t3 end...
t4 end...
t8 end...
t5 begin...
t9 begin...
put:t2
t9 end...
t6 end...
put:t6
t5 end...
t2 end...

SynchronousQueue 采用非公平锁策略时, put线程挂起顺序依次是: t2 t6 t0 t4 t8, 4s之后, take线程执行后的配对的顺序是: put:t0 put:t4 put:t8 put:t2 put:t6 , 很明显put的顺序跟take的顺序不一致.

源码分析

1:公平锁策略- put / take

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

put 跟 take 方法有都调用:
调用transfer 方法:
put : transferer.transfer(e, false, 0)
take: transferer.transfer(null, false, 0);
从前面内部结构看: SynchronousQueue 公平策略底层实际上委托给TransferQueue 链表队列实现, 而内部节点QNode, 就是链表节点啦, 来看下节点源码:

static final class QNode {
    volatile QNode next;   //下一个节点        
    volatile Object item;    //节点内容     
    volatile Thread waiter; //线程挂起与唤醒控制: park /unpark      
    //模式控制变量:
    //true: 数据模式, put操作为true    
    //false: 请求模式, take操作时为false
    //队列队列的take操作与put操作, 使用同一个  transfer 方法 , 所以isData变量来区分是take操作还是put操作       
    final boolean isData; 
    //构造器
    QNode(Object item, boolean isData) {...}
    //cas原子操作, 设置next节点
    boolean casNext(QNode cmp, QNode val) {...}
    //cas 原子操作, 设置及节点内容
    boolean casItem(Object cmp, Object val) {...}
    //取消节点, 将节点内容设置为自己
    void tryCancel(Object cmp) {...}
    //判断是否操作结束
    boolean isCancelled() {...}
}

此处研究的是公平锁策略, 所以, 此时的transfer变量执项的是: TransferQueue 类的实例

E transfer(E e, boolean timed, long nanos) {
            QNode s = null; 
            //e != null, isData 为true , 表示数据模式
            boolean isData = (e != null);
            for (;;) { //非锁并发中, 自旋到满足条件为止
                //TransferQueue 初始化时, tail 跟 head 变量执行同一个节点: h
                //QNode tail, head  --> new QNode(null, false);
                QNode t = tail; //尾节点
                QNode h = head; //头节点
                if (t == null || h == null)  //初始化出问题, 跳过       
                    continue;   
                //h==t 为true时, 表示SynchronousQueue队列此时没数据, 可以添加         
                //t.isData == isData  h != t时, 队列不为空,需要进行判断此时为take操作还是put操作
                if (h == t || t.isData == isData) {  
                    //进入表示put操作 : 队列此时可能为空, 也可能不为空
                    QNode tn = t.next;
                    //前面刚设置t=tail, 此时若不等,表示其他线程修改tail值, 放弃此次操作,进入下次循环       
                    if (t != tail)          
                        continue;
                    if (tn != null) { 
                        // tn != null, 表示此时队列已有值, 尝试推进节点后移
                       //advanceTail 方法内部做了限制, 当 t == tail 才进行
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0) //过期,没必要操作      
                        return null;
                    if (s == null) //执行到这, 表示所有操作合法
                        //创建节点
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))  //在链表上挂载节点, 若失败,重来      
                        continue;
                    //推进节点后移, 尝试将s设置为尾节点
                    //注意:但且仅当t == tail才执行
                    //一旦执行, 表示s节点入列
                    advanceTail(t, s);             
                    //advanceTail 操作成功之后, awaitFulfill
                    //awaitFulfill : 此用于自旋/阻塞节点,直到节点被匹配返回或者取消、中断。
                    Object x = awaitFulfill(s, e, timed, nanos);
                    //awaitFulfill 方法只有一个出口:  s.item == e, 而出现这情况,   
                    //前提必须是s.tryCancel 方法 ,2种情形调用: 1>等待时间到 2>线程中断
                    //执行到这步, 有几种可能: 1>线程被中断  2>等待时间到 3>有消费/生产线程配对了
                   //当x == s 表示线程中断或等待时间到
                    if (x == s) {                  
                        //清除s节点
                        clean(t, s);
                        return null;
                    }
                    //到这: 表示有消费/生产线程配对了
                    //用于判断节点是否已经从队列中离开了 
                    if (!s.isOffList()) {          
                        //尝试将s节点设置为head,移出t
                        advanceHead(t, s);       
                        if (x != null)            
                            s.item = s;
                        // 释放线程 
                        s.waiter = null;
                    }
                    //返回: 
                    return (x != null) ? (E)x : e;

                } else { 
                    //能进入到这个分支, 表示等待队列中有等待线程
                    //可能是take1 然后在等put1  也可能是put1在等take1                           
                    QNode m = h.next; 
                    //当等待队列中有挂起线程, m != null
                    if (t != tail || m == null || h != head)
                        continue;                
                    Object x = m.item; //拿到第一个节点元素
                    if (isData == (x != null) ||   
                        x == m ||    //如果相等表示已经有线程跟挂起的线程配置了               
                        !m.casItem(x, e)) {   //交换item的内容      
                        advanceHead(h, m);  //推动头节点移动         
                        continue;
                    }
                    //推动头节点移动
                    //这里注意: advanceHead 有个动作 h.next = h
                    //节点被匹配之后, next节点指向自己, 这种节点会在clean 方法中被删除
                    advanceHead(h, m);      
                    //唤醒等待队列中被配对的线程       
                    LockSupport.unpark(m.waiter);
                    //返回
                    return (x != null) ? (E)x : e;
                }
            }
        }

公平策略源码操作流程简化:


put操作
take操作

总结:
结合代码: transferer执行流程可归纳为以下:
1: transferer调用transfer方法实现SynchronousQueue 公平队列的take跟put操作
2:为区分take与put操作, 设计控制变量 isData区分 true:put操作, fasle:take操作
3:如果队列为null或者isData一致(为true), 队列尝试将节点添加到等待队列中, 知道被其他线程匹配, 超时 或者取消.
4:如果队列不为null, 队列尝试配对, 一旦配对成功, 按顺序唤醒挂起的线程, 调用clean方法清除配对节点.

想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容