ConcurrentLinkedQueue源码分析(Java8)

最近一段时间在看一本书《Java并发编程的艺术》,在P164讲到了关于ConcurrentLinkedQueue的源码分析,但是这部分源码非常复杂,于是我又顺手看了一下IDEA的Java源码,发现在Java8中,该部分的源码已经被更新过了,正好读一读顺带做个笔记。

基本介绍

ConcurrentLinkedQueue是一个列表实现,包括一个head和tail引用,该类的初始化过程中,头尾引用都被初始化成一个空的Node,下面我们可以看到相关代码:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

        private static class Node<E> {
            volatile E item;
            volatile Node<E> next;   
        }

        private transient volatile Node<E> head;
        
        private transient volatile Node<E> tail;
        
        public ConcurrentLinkedQueue() {
            head = tail = new Node<E>(null);
        }
}

入队流程

单线程下的入队流程为:

  1. 将新节点加入到tail引用的next中
  2. 将新节点赋值给tail引用

但是在多线程环境中,需要保障其他线程入队和出队不受影响,ConcurrentLinkedQueue由CAS算法实现了无锁入队,下面是加入节点的关键代码:

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    // 循环开始,p和t都指向tail,q指向tail的next
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // q为null代表目前tail后面没有其他线程插入的节点,即p确实是最后的节点
            if (p.casNext(null, newNode)) {
                // 这里casNext函数的作用是当p的next节点为null时,用newNode更新p的next节点,更新成功返回true

                // 如果casNext更新成功,证明newNode已经成功插入到队尾
                if (p != t)
                    // 这一步判断表明,t即tail已经不是真正的队尾引用,这是减少cas操作的一步优化

                    // 这里casTail函数的作用是当tail与t相等时,用newNode更新tail,在这里CAS失败也没有关系
                    casTail(t, newNode);
                return true;
            }
            // 如果casNext更新失败,则重新将p的next赋值给q
        }
        else if (p == q)
            // 当p==q只有一种情况,即p==p.next,在这种情况下就表明当前节点已经离队,因为在出队操作之后,ConcurrentLinkedQueue会将出队节点的next设为它本身

            // 在遇到当前节点已经是出队节点的情况下,表明当前节点已经在head之前,因此根据如下逻辑进行更新当前节点:1、如果tail已经更新,那么将当前节点设为tail;2、否则,将当前节点设为head,因为不能保证tail指向的节点是否已经离队
            p = (t != (t = tail)) ? t : head;
        else
            // 当tail更新且p不在tail时,用tail更新p,否则用q更新p
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

如果觉得上述方法过于复杂,我们可以用一种更简单的方案来进行结果相同的操作:

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (; ; ) {
        Node<E> t = tail;
        if (t.casNext(null, newNode)) {
            // 参照单线程的入队流程,casNext成功表明newNode已经成功插入到了队列里

            // 如果casTail失败了也没有关系,失败了证明有其他的线程在进行casTail,至少有一根线程可以成功
            casTail(t, newNode);
            return true;
        }
    }
}

而在JDK源码中,加入了一步优化,这步优化是:在插入一个新节点时,不着急将tail指向这个新节点,而是在插入第二个新节点的时候,才对tail进行cas操作。
这样做会导致两个问题:

  1. tail并不在保持原有的一定指向队尾的性质;
  2. 从tail开始需要进过几步查找next才能寻找到真正的队尾;

但是这样做有一个好处:减少了至少一半的cas操作,虽然增加了普通的赋值操作,但是在多线程情况下cas操作的耗时要远远大于一般赋值操作的耗时,因此这部分优化可以增大该容器类的并发量。而剩下部分的判断都是为了在进行这一步优化的情况下,保证程序的正确性所做的。

出队流程

单线程情况下的出队流程为:

  1. 如果head==tail,证明队列为空,返回null
  2. 将队首元素的值取出,作为返回值
  3. 将head指向head.next

如果按照这种思路,我们可以直接写出一个简单写法的无锁出队方案:

public E poll() {
    for (; ; ) {
        Node<E> h = head;
        if (h.next == null) {
            return null;
        } else {
            if (casHead(h, h.next)) {
                if (h.next != null)
                    return h.next.item;
            }
        }
    }
}

我们再来看JDK源码中的poll函数实现,在这个poll函数中,使用了和offer函数中类似的优化方式,在出队的时候并不着急更新head的值,而是缓慢更新,然后用一部分操作来保证出队的正确性:

public E poll() {
    restartFromHead:
    for (; ; ) {
        for (Node<E> h = head, p = h, q; ; ) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                if (p != h)
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            } else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            } else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

性能测试

这里不光是性能测试,同样有针对上述两种简单的无锁入队和出队的正确性测试。我分别开了2根入队线程和2根出队线程,每根入队线程循环入队1000W的数据,下面展示了测试结果(因为我的电脑是4核i5,比较弱鸡,如果线程开多了那么大量的时间都在线程切换上,测试结果就不准确了):

使用JDK源码

Test Started: 11:15 25:839
Get thread finished, Total: 10809006
Get thread finished, Total: 9190994
Test Finished: 11:15 31:487
Total Time Cost: 5s 648ms

使用自定义的offer函数

Test Started: 11:17 36:963
Get thread finished, Total: 9335745
Get thread finished, Total: 10664255
Test Finished: 11:17 41:627
Total Time Cost: 4s 664ms

使用自定义的poll函数

Test Started: 11:18 17:412
Get thread finished, Total: 9714954
Get thread finished, Total: 10285046
Test Finished: 11:18 21:669
Total Time Cost: 4s 257ms

同时使用自定义的offer和poll函数

Test Started: 11:18 51:663
Get thread finished, Total: 10219132
Get thread finished, Total: 9780868
Test Finished: 11:18 56:602
Total Time Cost: 4s 939ms

有点尴尬的是好像优化过的源码是跑的最慢的,应该和我只有2根读写线程有关,争抢的情况比较少,争抢情况越严重,线程越多,源码的速度应该是更快的。如果谁有更好的机器可以拿代码试一下,下面是我的测试代码:

public class TestQueue {

    private static int TOTAL_COUNT = 10000000;
    private static int TOTAL_WRITE = 2;
    private static int TOTAL_READ = 2;
    private static SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("HH:mm ss:SSS");

    public static void main(String[] args) {
        AtomicInteger flag = new AtomicInteger(0);
        ConcurrentHashMap<Integer, AtomicInteger> total = new ConcurrentHashMap<>(TOTAL_COUNT);
        for (int i = 0; i != TOTAL_COUNT; i++) {
            total.put(i, new AtomicInteger(0));
        }
        CustomQueue<Integer> customQueue = new CustomQueue<>();
        ExecutorService executor = Executors.newCachedThreadPool();

        Date startTime = new Date();
        System.out.println("Test  Started: " + DATE_FORMAT.format(startTime));

        for (int i = 0; i != TOTAL_WRITE; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i != TOTAL_COUNT; i++) {
                        customQueue.add(i);
                    }
                }
            });
        }

        for (int i = 0; i != TOTAL_READ; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {

                    int sum = 0;

                    while (flag.get() != TOTAL_WRITE * TOTAL_COUNT) {
                        Integer num = customQueue.poll();
                        if (num != null) {
                            sum++;
                            flag.incrementAndGet();
                            total.get(num).incrementAndGet();
                        }
                    }

                    System.out.println("Get thread finished, Total: " + sum);
                }
            });
        }

        executor.shutdown();

        try {
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
            Date endTime = new Date();
            long totalTime = endTime.getTime() - startTime.getTime();
            for (int i = 0; i != TOTAL_COUNT; i++) {
                if (total.get(i).get() != TOTAL_WRITE) {
                    System.out.println("Test Failed: " + i + " " + total.get(i));
                    break;
                }
            }
            System.out.println("Test Finished: " + DATE_FORMAT.format(endTime));
            System.out.printf("Total Time Cost: %ds %dms", totalTime / 1000, totalTime % 1000);
        } catch (InterruptedException e) {
            System.out.println("Failure: " + flag.get());
            e.printStackTrace();
        }

    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354

推荐阅读更多精彩内容