Java集合--非阻塞队列(ConcurrentLinkedQueue实现原理)

ConcurrentLinkedQueue实现原理

上文,笔者介绍了非阻塞队列的基础知识,对于其代表类ConcurrentLinkedQueue做了个简单阐述。

本篇,我们就继续对ConcurrentLinkedQueue进行学习,来看看ConcurrentLinkedQueue的底层实现!

链表结点

在ConcurrentLinkedQueue中,元素保存在结点中,对外以元素形式存在,对内则以结点形式存在。

每一个结点中,都有指向下一个结点的指针,依次向后排列,形成链表结构。

在ConcurrentLinkedQueue中,有一个内部类--Node,此类代表队列的结点。

在Node中,item表示元素,next为指向下一个元素的指针,并且都被volatitle所修饰。

之前,我们说了ConcurrentLinkedQueue是使用CAS来实现非阻塞入队出队。在Node结点中,我们也使用了CAS来实现结点的操作。

使用CAS来替换本结点中的元素,使用CAS来替换本结点中指向下一个元素的指针。

如果你对CAS的概念不太理解,建议可先去CAS进行学习;

在本篇幅中,我们不对CAS进行过多的介绍!

//单向链表中:结点对象Node:
private static class Node<E> {
    //链表中存储的元素:
    volatile E item;

    //本结点指向下一结点的引用:
    volatile Node<E> next;

    //结点构造:使用UNSAFE机制(CAS)实现元素存储
    Node(E item) {UNSAFE.putObject(this, itemOffset, item);}

    //替换本结点中的元素值:cmp是期望值,val是目标值。当本结点中元素的值等于cmp的时,则将其替换为val
    boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}

    //将本结点中指向下一个结点的引用指向自己
    void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}

    //替换本结点中指向下一个结点的引用:cmp是期望值,val是目标值。当本结点中的指向下一个结点的引用等于cmp时,则将其替换为指向val
    boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}

    //JDK提供的Unsafe对象,底层CAS原理实现
    private static final sun.misc.Unsafe UNSAFE;
    
    private static final long itemOffset;
    
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = Node.class;

            //结点中元素的起始地址偏移量:
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            
            //结点中指向下一个元素引用的起始地址偏移量:
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

总体来说,在Node类中,元素的替换,指针的改变都是通过CAS来实现的。在方法中,如果执行成功则返回true,执行失败则返回false;

可能有的朋友对itemOffset、nextOffset不太理解,这块我给大家稍微做个解释!

其实,你可以理解为当一个对象创建后,JVM在内存中为这个对象分配了一片区域,该区域用来存储该类的一些信息,这些信息中就包括item、next等属性。而为了能更快的从内存中,对这些属性获取修改,我们就需要使用Unsafe类,该类可以帮助获取到这些属性所在内存中具体的位置,有了位置的信息,我们的程序就能更快的进行操作!

成员变量

在ConcurrentLinkedQueue中,head、tail属性就是队列中常见的头指针、尾指针。值得注意的是,head、tail属性都被volatitle所修饰。

volatitlte是一个轻量级的同步机制,当有线程对其所修饰的属性进行更新时,被更新的值会立刻同步到内存中去,并且使其他cpu所缓存的值置为无效。当其他线程对该属性操作时,必须从主存中获取。

此外,与Node类类似,在ConcurrentLinkedQueue中也包含了Unsafe类,以及headOffset--头结点偏移量,tailOffset--尾结点偏移量。

//队列中头结点:
private transient volatile Node<E> head;

//队列中尾结点:
private transient volatile Node<E> tail;

private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class k =ConcurrentLinkedQueue.class;
        headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
        tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

构造函数

在默认构造中,队列的头尾结点指针都指向同一个结点,并且结点元素为null;

//默认构造,指定头尾结点元素为null:
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

此时,队列结构,如图所示:

[图片上传失败...(image-24612f-1514418648920)]

入队

在ConcurrentLinkedQueue中,入队操作包含两种方法,一个是add(E e),一个是offer(E e)。

add(E e)底层调用offer(E e),所以我们主要说说offer(E e).

//向队列尾部添加元素(底层调用offer):
public boolean add(E e) {
    return offer(e);
}

//入队:向队列尾部添加元素:
public boolean offer(E e) {
    //不能添加为空元素:抛异常
    checkNotNull(e);

    //创建新结点:
    final Node<E> newNode = new Node<E>(e);
    
    //p的类型为Node<E>(这块需要注意,不需要显式声明)
    for (Node<E> t = tail, p = t;;) {
        //获取链表中尾部结点的下一个结点:
        Node<E> q = p.next;
        
        //并判断下一个结点是否为null(正常情况下均为null),为null则说明p是链表中的最后一个节点
        if (q == null) {------------------------⑴
            //将p节点中指向下一个结点的引用指向newNode节点(向链表中插入元素)
            if (p.casNext(null, newNode)) {
                if (p != t) 
                    casTail(t, newNode);  
                return true;
            }//CAS插入失败,则进入下次循环
            
        } else if (p == q){------------------------⑵
            p = (t != (t = tail)) ? t : head;
        
        } else {------------------------⑶
            p = (p != t && t != (t = tail)) ? t : q;
        }
    }
}

在offer(E e)的判断中,由于使用了三目运算符,导致可读性较差,对于有的朋友来说可能难以理解。

我们对其进行了优化,由三目运算符修改为if/else的形式:

p = (t != (t = tail)) ? t : head 替换为:

Node<E> tmp=t;
t = tail;
if (tmp==t) {
    p=head;
} else {
    p=t;
}

p = (p != t && t != (t = tail)) ? t : q 替换为:

if (p==t) {
    p = q;
} else {
    Node<E> tmp=t;
    t = tail;
    if (tmp==t) {
        p=q;
    } else {
        p=t;
    }
}

结合上面的源码,我们来具体说说入队的流程:

当插入的元素为空时候,会抛出异常,禁止向队列中插入尾空元素;

创建插入元素的新结点newNode,从tail指针处遍历链表结构。

例如:
    向队列中插入第一个元素时,元素="1111",tail=Node(null)、t=tail、p=tail、p.next=null、q=null。此处需要注意,由于是插入队列的第一个元素,所以需要回过去看下队列的默认构造是如何实现。

    q=null,进入判断条件,p.casNext(null,newNode),调用p结点的cas方法,判断p结点的next属性是否为null,如果为null,则将next属性指向newNode结点。调用成功,返回true;调用失败,返回false。由于底层使用CAS实现,所以casNext()方法将是一个原子性操作。

    如果调用失败,则进行下一次循坏,直至插入成功为止。而调用成功,则进入if内部,判断p和t是否相同,此时是何含义呢?

    在ConcurrentLinkedQueue中,当插入一个结点时,并不会每插入一次都改变一次tail结点的指向,当我们发现p/t不同时,也就是说最后一个结点和tail结点不为同一个时,我们就需要调用casTail()方法,来修改tail结点的指向。

    例如,当我们向队列中,插入第一个元素时候,直至插入结束,我们也并没有修改tail结点的指向,当第二次插入时候会进行修改。
    
    下面,我们来看下第二个元素的插入情况,元素="2222"。

    同样,判断为null,创建新结点。进入第一次循环:tail=Node(null)、t=tail、p=tail、p.next=Node(1111)、q=Node(1111)。
    进入第一个判断q==null不成立,第二个判断p==q不成立,进入else:看上面的简化代码,发现此时p==t,所以将p=q,结束循环,进入下一次。

    进入第二次循环,t=tail,p=q=Node(1111),p.next=null,q=null。进入第一个循环判断q=null成立,此时与第一次插入情况相同。插入完成后,判断p!=t,此时p=Node(1111)/t=tail,进入判断,调用casTail(t, newNode),将tail指针指向newNode结点,至此插入成功,返回true。

    上面我们说到了⑴⑶两种情况,接下来我们说说⑶。什么情况下,回进入⑵的判断中呢?
    
    当我们再添加完首个元素后,立即进行出队操作,此时再去添加一个元素,那么就会在循环中直接进入⑵的判断中。此时需要结合出队代码一块学习。

    tail=Node(null),t=tail,p=tail,p.next=p=Node(null),q=p=Node(null),也就是此时的tail节点元素为null,而指向下一个元素的指针也指向了自己,这是由于元素出队导致的。

    进入第二个循环p==q,回看上面的优化代码,得到p=head;开始第二次循环,head在出队时被设置成了指向第一次插入的元素(此时该元素的值为null,但结点依旧存在)。p.next=null,q=null,进入第一个判断p==null,并进入p!=t,重新设置tail指针。

说了很多,想必不少人已经看蒙了,下面我们用图片来进行下简单的描述!!!

[图片上传失败...(image-caac51-1514418648920)]

出队

//出队:移除队列中头结点中的元素
public E poll() {
    restartFromHead:
    for (;;) {
        //p的类型为Node<E>(这块需要注意,不需要显式声明)
        for (Node<E> h = head, p = h, q;;) {
            //头部结点的元素:
            E item = p.item;
            //如果p节点的元素不为空,使用CAS设置p节点引用的元素为null
            if (item != null && p.casItem(item, null)) {
                if (p != h) 
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            } else if ((q = p.next) == null) {
                //如果p的下一个结点为null,则说明队列中没有元素,更新头结点
                updateHead(h, p);
                return null;
            } else if (p == q) {
                continue restartFromHead;
            } else {
                p = q;
            }
        }
    }
}

与入队流程相比,出队流程的操作也同样复杂,需要我们静下心来细细学习!!

我们假设,此时队列中已经存在了4个元素。如图:

[图片上传失败...(image-3ae987-1514418648920)]

出队第一个元素:

进入for循环:head=Node(null),h=head,p=head,q=null;item=p.item=null;进行if判断,第一个判断不满足,第二个判断中将q进行了修改:q=p.next=Node(1111),第三个判断p==q不满足,直接进入最后的else,将p=q=Node(1111);

开始第二次循环,item=p.item=1111,进入第一个判断p.casItem(item,null),将p节点中的元素值1111置为null;进行判断p!=h,成立调用update(h,q)方法,将head指针指向Node(2222)处,并将原来head指向结点Node(null)的next属性指向自己;

[图片上传失败...(image-31a4da-1514418648920)]

出队第二个元素:

进入for循环:head=Node(2222),h=head,p=head,q=null;item=p.item=2222;进行if判断,满足条件,进行p.casItem(item,null),p=Node(null)--head=Node(null);判断p!=h,此时结果为等于,则不修改head指针;

[图片上传失败...(image-a8b43c-1514418648920)]

出队第三个元素:

进入for循环:head=Node(null),h=head,p=head,q=null;item=p.item=null;进行if判断,第一个判断不满足,第二个判断中将q进行了修改:q=p.next=Node(3333),第三个判断p==q不满足,直接进入最后的else,将p=q=Node(3333);

开始第二次循环,item=p.item=3333,进入第一个判断p.casItem(item,null),将p节点中的元素值3333置为null;进行判断p!=h,成立调用update(h,q)方法,将head指针指向Node(4444)处,并将原来head指向结点Node(null)的next属性指向自己;

[图片上传失败...(image-a82e19-1514418648920)]

出队第四个元素:

重复出队第二个元素的步骤,将第四个结点元素置为null;

[图片上传失败...(image-5157b1-1514418648920)]

通过上面5张图片,希望你能对ConcurrentLinkedQueue的出队流程有一个清晰的思路。

获取

//获取头部元素,不移除队列中头结点中的元素
public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

peek()方法与poll()方法类似,返回的都是头结点中的元素。

但有一点不同的是,peek()方法并不会移除头结点中的元素,而poll()在改变head指向的同时还移除了头结点中的元素,将其置为null。

以上便是ConcurrentLinkedQueue的全部内容!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容