【netty学习笔记十七】Mpsc高性能无锁队列

Mpsc(Multi producer single consumer)即多生产者单消费者队列,是Jctools中的高性能队列,也是netty经常的队列,如EventLoop中的事件队列就用Mpsc而不是jdk自带的队列。
本文主要介绍二类Mpsc队列:MpscArrayQueue、MpscChunkedArrayQueue

MpscArrayQueue

MpscArrayQueue是定长队列,底层用环形数组实现。

// 计算下标辅助值,初始为容量-1,这样可以用&运算
protected final long mask;
// 存放数据的数组
protected final E[] buffer;
// 生产者的索引
private volatile long producerIndex;
// 生产者的下标限制值,用来判断队列是否已满
private volatile long producerLimit;
// 消费者的索引
protected long consumerIndex;

接下来看offer方法:

public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

        // use a cached view on consumer index (potentially updated in loop)
        final long mask = this.mask;
        //lvProducerLimit直接返回生产者索引最大限制
        long producerLimit = lvProducerLimit();
        long pIndex;
        do
        {
            //获取生产者索引
            pIndex = lvProducerIndex();
            if (pIndex >= producerLimit)
            {
                final long cIndex = lvConsumerIndex();
                //重新计算生产者索引最大限制值,producerLimit=mask+1=容量,cIndex是消费者索引,cIndex不等于0说明有消费,那么producerLimit也要相应的增加
                producerLimit = cIndex + mask + 1;

                if (pIndex >= producerLimit)
                {
                    return false; // FULL :(
                }
                else
                {
                    // update producer limit to the next index that we must recheck the consumer index
                    // this is racy, but the race is benign
                    soProducerLimit(producerLimit);
                }
            }
        }
        //死循环CAS设置pIndex下标实际内存偏移地址
        while (!casProducerIndex(pIndex, pIndex + 1));
        
        //计算pIndex下标实际内存偏移地址
        final long offset = calcCircularRefElementOffset(pIndex, mask);
        //将pIndex下标实际内存偏移地址设置为要插入的值
        soRefElement(buffer, offset, e);
        return true; // AWESOME :)
    }

final boolean casProducerIndex(long expect, long newValue)
    {
        return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
    }

public static long calcCircularRefElementOffset(long index, long mask)
    {
        return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
    }

这里计算pIndex下标实际内存偏移地址方法要注意下:

final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale)
        {
            REF_ELEMENT_SHIFT = 2;
        }
        else if (8 == scale)
        {
            REF_ELEMENT_SHIFT = 3;
        }
       
REF_ARRAY_BASE=UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)

其中REF_ARRAY_BASE是数组内存初始偏移量,scale是每个数组每个元素的内存增量REF_ELEMENT_SHIFT是转成2的n次方,好利用位运算来计算。

最后看下poll方法:

public E poll()
    {
        final long cIndex = lpConsumerIndex();
        //根据cIndex计算实际的内存偏移值
        final long offset = calcCircularRefElementOffset(cIndex, mask);
        // Copy field to avoid re-reading after volatile load
        final E[] buffer = this.buffer;

        // If we can't see the next available element we can't poll
        //获取当前可消费的元素
        E e = lvRefElement(buffer, offset);
        //e=null则一直循环获取
        if (null == e)
        {
            
            if (cIndex != lvProducerIndex())
            {
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            }
            else
            {
                return null;
            }
        }
        //消费成功,将元素设为null
        soRefElement(buffer, offset, null);
        //消费者索引+1
        soConsumerIndex(cIndex + 1);
        return e;
    }
public static long calcCircularRefElementOffset(long index, long mask)
    {
        return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
    }
public static <E> E lvRefElement(E[] buffer, long offset)
    {
        return (E) UNSAFE.getObjectVolatile(buffer, offset);
    }
public static <E> void soRefElement(E[] buffer, long offset, E e)
    {
        UNSAFE.putOrderedObject(buffer, offset, e);
    }
final void soConsumerIndex(long newValue)
    {
        UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
    }

MpscChunkedArrayQueue

MpscChunkedArrayQueue是一个非定长的队列,适合无法预测队列长度的场景。基于数组+链表的结构,不会像链表那样分配过多的Node,吞吐量比传统的链表高。

属性:

//消费者辅助计算值=(容量-1)/2
protected long consumerMask;
//和生产者一样,指向数组引用
protected E[] consumerBuffer;
//消费者索引
protected long consumerIndex;

protected long producerMask;
protected long producerIndex;
private volatile long producerLimit;
protected E[] producerBuffer;
//最大容量,默认为Pow2.roundToPowerOfTwo(maxCapacity)) << 1
protected final long maxQueueCapacity;

构造方法:

public BaseMpscLinkedArrayQueue(final int initialCapacity)
    {
        RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
        //比initialCapacity大的最近的2^n值
        int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
        // leave lower bit of mask clear
        long mask = (p2capacity - 1) << 1;
        // need extra element to point at next array
        //初始数组大小=p2capacity + 1
        E[] buffer = allocateRefArray(p2capacity + 1);
        producerBuffer = buffer;
        producerMask = mask;
        consumerBuffer = buffer;
        consumerMask = mask;
        soProducerLimit(mask); // we know it's all empty to start with
    }

先看poll方法:

public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

        long mask;
        E[] buffer;
        long pIndex;

        while (true)
        {
            long producerLimit = lvProducerLimit();
            pIndex = lvProducerIndex();
            // lower bit is indicative of resize, if we see it we spin until it's cleared
            //和MpscArrayQueue不一样,pIndex每次会加2,低位是识别扩容用的,如果是扩容,则等待扩容完毕(扩容完会设置pIndex为2的倍数)
            if ((pIndex & 1) == 1)
            {
                continue;
            }
            // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)


            mask = this.producerMask;
            buffer = this.producerBuffer;
            //当阈值小于生产者索引时,需要扩容,否则pIndex+2
            if (producerLimit <= pIndex)
            {
                //返回状态值,根据状态值处理新元素
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result)
                {
                    case CONTINUE_TO_P_INDEX_CAS: //继续尝试CAS设置pIndex+2
                        break;
                    case RETRY: //可能CAS并发失败,继续
                        continue;
                    case QUEUE_FULL: //队列满了
                        return false;
                    case QUEUE_RESIZE: //队列需要扩容
                        resize(mask, buffer, pIndex, e, null);
                        return true;
                }
            }
            //CAS设置pIndex+2
            if (casProducerIndex(pIndex, pIndex + 2))
            {
                break;
            }
        }
        // 获取pIndex实际内存偏移值并设置,和MSPCArrayQueue一样
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        soRefElement(buffer, offset, e); // release element e
        return true;
    }

继续看offerSlowPath方法:

private int offerSlowPath(long mask, long pIndex, long producerLimit)
    {
        final long cIndex = lvConsumerIndex(); //计算消费者索引
        long bufferCapacity = getCurrentBufferCapacity(mask); //获取buffer容量
        //如果消费者索引+buff容量>生产者索引,说明当前容量不够用了
        if (cIndex + bufferCapacity > pIndex)
        {
            //尝试CAS设置producerLimit=cIndex + bufferCapacity,成功返回继续,失败返回重试
            if (!casProducerLimit(producerLimit, cIndex + bufferCapacity))
            {
                // retry from top
                return RETRY;
            }
            else
            {
                // continue to pIndex CAS
                return CONTINUE_TO_P_INDEX_CAS;
            }
        }
        //超过最大容量,满了
        // full and cannot grow
        else if (availableInQueue(pIndex, cIndex) <= 0)
        {
            // offer should return false;
            return QUEUE_FULL;
        }
        // grab index for resize -> set lower bit
        //设置pIndex+1,成功的话返回扩容,注意这个+1操作,前面就有pIndex&1操作来判断是否扩容
        else if (casProducerIndex(pIndex, pIndex + 1))
        {
            // trigger a resize
            return QUEUE_RESIZE;
        }
        else
        {
            // failed resize attempt, retry from top
            return RETRY;
        }
    }

protected long availableInQueue(long pIndex, long cIndex)
    {
        return maxQueueCapacity - (pIndex - cIndex);
    }

继续看扩容方法resize

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
    {
        assert (e != null && s == null) || (e == null || s != null);
        //获取oldBuffer长度值
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer;
        try
        {
            newBuffer = allocateRefArray(newBufferLength);
        }
        catch (OutOfMemoryError oom)
        {
            assert lvProducerIndex() == pIndex + 1;
            soProducerIndex(pIndex);
            throw oom;
        }

        producerBuffer = newBuffer;
        final int newMask = (newBufferLength - 2) << 1;
        producerMask = newMask;
        //分别根据oldMask、newMask获取偏移位置值
        final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
        final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
        //将元素e设置到新的缓冲区newBuffer的offsetInNew位置处
        soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
        // 将oldBuffer中最后一个元素的位置指向新的缓冲区newBuffer
        soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

        // ASSERT code
        final long cIndex = lvConsumerIndex();
        final long availableInQueue = availableInQueue(pIndex, cIndex);
        RangeUtil.checkPositive(availableInQueue, "availableInQueue");

        // 重新计算阈值
        soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

        // make resize visible to the other producers
        soProducerIndex(pIndex + 2);

        // INDEX visible before ELEMENT, consistent with consumer expectation

        // make resize visible to consumer
        //用一个空对象JUMP来连接新老缓冲区,消费遇到JUMP就要获取新数组地址了
        soRefElement(oldBuffer, offsetInOld, JUMP);
    }

继续看poll方法:

public E poll()
    {
        final E[] buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final long mask = consumerMask;
        //获取消费者索引实际内存偏移值
        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        Object e = lvRefElement(buffer, offset);
        if (e == null)
        {
            if (index != lvProducerIndex())
            {
                // poll() == null iff queue is empty, null element is not strong enough indicator, so we must
                // check the producer index. If the queue is indeed not empty we spin until element is
                // visible.
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            }
            else
            {
                return null;
            }
        }
        //如果e为JUMP,说明扩容过,要找下一个数组
        if (e == JUMP)
        {
            final E[] nextBuffer = nextBuffer(buffer, mask);
            return newBufferPoll(nextBuffer, index);
        }
        //设置元素为null并更新消费者索引
        soRefElement(buffer, offset, null); // release element null
        soConsumerIndex(index + 2); // release cIndex
        return (E) e;
    }

private E newBufferPoll(E[] nextBuffer, long index)
    {
        final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
        final E n = lvRefElement(nextBuffer, offset);
        if (n == null)
        {
            throw new IllegalStateException("new buffer must have at least one element");
        }
        soRefElement(nextBuffer, offset, null);
        soConsumerIndex(index + 2);
        return n;
    }

总结下优化点:

  1. 大量的位运算
  2. 使用Unsafe.putOrderedXXX(以前是putXXXVolitaile),Volitaile语义会让其他线程立刻看到值,使用的是store-load屏障,性能差些,在Mpsc场景没有使用的必要
  3. 无锁化
  4. 伪共享(本文没展现去掉了填充代码)
  5. 底层结构主要使用数组,性能更好
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351