【netty学习笔记十七】Mpsc高性能无锁队列

Mpsc(Multi producer single consumer)即多生产者单消费者队列,是Jctools中的高性能队列,也是netty经常的队列,如EventLoop中的事件队列就用Mpsc而不是jdk自带的队列。
本文主要介绍二类Mpsc队列:MpscArrayQueue、MpscChunkedArrayQueue

MpscArrayQueue

MpscArrayQueue是定长队列,底层用环形数组实现。

// 计算下标辅助值,初始为容量-1,这样可以用&运算
protected final long mask;
// 存放数据的数组
protected final E[] buffer;
// 生产者的索引
private volatile long producerIndex;
// 生产者的下标限制值,用来判断队列是否已满
private volatile long producerLimit;
// 消费者的索引
protected long consumerIndex;

接下来看offer方法:

public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

        // use a cached view on consumer index (potentially updated in loop)
        final long mask = this.mask;
        //lvProducerLimit直接返回生产者索引最大限制
        long producerLimit = lvProducerLimit();
        long pIndex;
        do
        {
            //获取生产者索引
            pIndex = lvProducerIndex();
            if (pIndex >= producerLimit)
            {
                final long cIndex = lvConsumerIndex();
                //重新计算生产者索引最大限制值,producerLimit=mask+1=容量,cIndex是消费者索引,cIndex不等于0说明有消费,那么producerLimit也要相应的增加
                producerLimit = cIndex + mask + 1;

                if (pIndex >= producerLimit)
                {
                    return false; // FULL :(
                }
                else
                {
                    // update producer limit to the next index that we must recheck the consumer index
                    // this is racy, but the race is benign
                    soProducerLimit(producerLimit);
                }
            }
        }
        //死循环CAS设置pIndex下标实际内存偏移地址
        while (!casProducerIndex(pIndex, pIndex + 1));
        
        //计算pIndex下标实际内存偏移地址
        final long offset = calcCircularRefElementOffset(pIndex, mask);
        //将pIndex下标实际内存偏移地址设置为要插入的值
        soRefElement(buffer, offset, e);
        return true; // AWESOME :)
    }

final boolean casProducerIndex(long expect, long newValue)
    {
        return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
    }

public static long calcCircularRefElementOffset(long index, long mask)
    {
        return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
    }

这里计算pIndex下标实际内存偏移地址方法要注意下:

final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale)
        {
            REF_ELEMENT_SHIFT = 2;
        }
        else if (8 == scale)
        {
            REF_ELEMENT_SHIFT = 3;
        }
       
REF_ARRAY_BASE=UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)

其中REF_ARRAY_BASE是数组内存初始偏移量,scale是每个数组每个元素的内存增量REF_ELEMENT_SHIFT是转成2的n次方,好利用位运算来计算。

最后看下poll方法:

public E poll()
    {
        final long cIndex = lpConsumerIndex();
        //根据cIndex计算实际的内存偏移值
        final long offset = calcCircularRefElementOffset(cIndex, mask);
        // Copy field to avoid re-reading after volatile load
        final E[] buffer = this.buffer;

        // If we can't see the next available element we can't poll
        //获取当前可消费的元素
        E e = lvRefElement(buffer, offset);
        //e=null则一直循环获取
        if (null == e)
        {
            
            if (cIndex != lvProducerIndex())
            {
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            }
            else
            {
                return null;
            }
        }
        //消费成功,将元素设为null
        soRefElement(buffer, offset, null);
        //消费者索引+1
        soConsumerIndex(cIndex + 1);
        return e;
    }
public static long calcCircularRefElementOffset(long index, long mask)
    {
        return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
    }
public static <E> E lvRefElement(E[] buffer, long offset)
    {
        return (E) UNSAFE.getObjectVolatile(buffer, offset);
    }
public static <E> void soRefElement(E[] buffer, long offset, E e)
    {
        UNSAFE.putOrderedObject(buffer, offset, e);
    }
final void soConsumerIndex(long newValue)
    {
        UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
    }

MpscChunkedArrayQueue

MpscChunkedArrayQueue是一个非定长的队列,适合无法预测队列长度的场景。基于数组+链表的结构,不会像链表那样分配过多的Node,吞吐量比传统的链表高。

属性:

//消费者辅助计算值=(容量-1)/2
protected long consumerMask;
//和生产者一样,指向数组引用
protected E[] consumerBuffer;
//消费者索引
protected long consumerIndex;

protected long producerMask;
protected long producerIndex;
private volatile long producerLimit;
protected E[] producerBuffer;
//最大容量,默认为Pow2.roundToPowerOfTwo(maxCapacity)) << 1
protected final long maxQueueCapacity;

构造方法:

public BaseMpscLinkedArrayQueue(final int initialCapacity)
    {
        RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
        //比initialCapacity大的最近的2^n值
        int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
        // leave lower bit of mask clear
        long mask = (p2capacity - 1) << 1;
        // need extra element to point at next array
        //初始数组大小=p2capacity + 1
        E[] buffer = allocateRefArray(p2capacity + 1);
        producerBuffer = buffer;
        producerMask = mask;
        consumerBuffer = buffer;
        consumerMask = mask;
        soProducerLimit(mask); // we know it's all empty to start with
    }

先看poll方法:

public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

        long mask;
        E[] buffer;
        long pIndex;

        while (true)
        {
            long producerLimit = lvProducerLimit();
            pIndex = lvProducerIndex();
            // lower bit is indicative of resize, if we see it we spin until it's cleared
            //和MpscArrayQueue不一样,pIndex每次会加2,低位是识别扩容用的,如果是扩容,则等待扩容完毕(扩容完会设置pIndex为2的倍数)
            if ((pIndex & 1) == 1)
            {
                continue;
            }
            // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)


            mask = this.producerMask;
            buffer = this.producerBuffer;
            //当阈值小于生产者索引时,需要扩容,否则pIndex+2
            if (producerLimit <= pIndex)
            {
                //返回状态值,根据状态值处理新元素
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result)
                {
                    case CONTINUE_TO_P_INDEX_CAS: //继续尝试CAS设置pIndex+2
                        break;
                    case RETRY: //可能CAS并发失败,继续
                        continue;
                    case QUEUE_FULL: //队列满了
                        return false;
                    case QUEUE_RESIZE: //队列需要扩容
                        resize(mask, buffer, pIndex, e, null);
                        return true;
                }
            }
            //CAS设置pIndex+2
            if (casProducerIndex(pIndex, pIndex + 2))
            {
                break;
            }
        }
        // 获取pIndex实际内存偏移值并设置,和MSPCArrayQueue一样
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        soRefElement(buffer, offset, e); // release element e
        return true;
    }

继续看offerSlowPath方法:

private int offerSlowPath(long mask, long pIndex, long producerLimit)
    {
        final long cIndex = lvConsumerIndex(); //计算消费者索引
        long bufferCapacity = getCurrentBufferCapacity(mask); //获取buffer容量
        //如果消费者索引+buff容量>生产者索引,说明当前容量不够用了
        if (cIndex + bufferCapacity > pIndex)
        {
            //尝试CAS设置producerLimit=cIndex + bufferCapacity,成功返回继续,失败返回重试
            if (!casProducerLimit(producerLimit, cIndex + bufferCapacity))
            {
                // retry from top
                return RETRY;
            }
            else
            {
                // continue to pIndex CAS
                return CONTINUE_TO_P_INDEX_CAS;
            }
        }
        //超过最大容量,满了
        // full and cannot grow
        else if (availableInQueue(pIndex, cIndex) <= 0)
        {
            // offer should return false;
            return QUEUE_FULL;
        }
        // grab index for resize -> set lower bit
        //设置pIndex+1,成功的话返回扩容,注意这个+1操作,前面就有pIndex&1操作来判断是否扩容
        else if (casProducerIndex(pIndex, pIndex + 1))
        {
            // trigger a resize
            return QUEUE_RESIZE;
        }
        else
        {
            // failed resize attempt, retry from top
            return RETRY;
        }
    }

protected long availableInQueue(long pIndex, long cIndex)
    {
        return maxQueueCapacity - (pIndex - cIndex);
    }

继续看扩容方法resize

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
    {
        assert (e != null && s == null) || (e == null || s != null);
        //获取oldBuffer长度值
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer;
        try
        {
            newBuffer = allocateRefArray(newBufferLength);
        }
        catch (OutOfMemoryError oom)
        {
            assert lvProducerIndex() == pIndex + 1;
            soProducerIndex(pIndex);
            throw oom;
        }

        producerBuffer = newBuffer;
        final int newMask = (newBufferLength - 2) << 1;
        producerMask = newMask;
        //分别根据oldMask、newMask获取偏移位置值
        final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
        final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
        //将元素e设置到新的缓冲区newBuffer的offsetInNew位置处
        soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
        // 将oldBuffer中最后一个元素的位置指向新的缓冲区newBuffer
        soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

        // ASSERT code
        final long cIndex = lvConsumerIndex();
        final long availableInQueue = availableInQueue(pIndex, cIndex);
        RangeUtil.checkPositive(availableInQueue, "availableInQueue");

        // 重新计算阈值
        soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

        // make resize visible to the other producers
        soProducerIndex(pIndex + 2);

        // INDEX visible before ELEMENT, consistent with consumer expectation

        // make resize visible to consumer
        //用一个空对象JUMP来连接新老缓冲区,消费遇到JUMP就要获取新数组地址了
        soRefElement(oldBuffer, offsetInOld, JUMP);
    }

继续看poll方法:

public E poll()
    {
        final E[] buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final long mask = consumerMask;
        //获取消费者索引实际内存偏移值
        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        Object e = lvRefElement(buffer, offset);
        if (e == null)
        {
            if (index != lvProducerIndex())
            {
                // poll() == null iff queue is empty, null element is not strong enough indicator, so we must
                // check the producer index. If the queue is indeed not empty we spin until element is
                // visible.
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            }
            else
            {
                return null;
            }
        }
        //如果e为JUMP,说明扩容过,要找下一个数组
        if (e == JUMP)
        {
            final E[] nextBuffer = nextBuffer(buffer, mask);
            return newBufferPoll(nextBuffer, index);
        }
        //设置元素为null并更新消费者索引
        soRefElement(buffer, offset, null); // release element null
        soConsumerIndex(index + 2); // release cIndex
        return (E) e;
    }

private E newBufferPoll(E[] nextBuffer, long index)
    {
        final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
        final E n = lvRefElement(nextBuffer, offset);
        if (n == null)
        {
            throw new IllegalStateException("new buffer must have at least one element");
        }
        soRefElement(nextBuffer, offset, null);
        soConsumerIndex(index + 2);
        return n;
    }

总结下优化点:

  1. 大量的位运算
  2. 使用Unsafe.putOrderedXXX(以前是putXXXVolitaile),Volitaile语义会让其他线程立刻看到值,使用的是store-load屏障,性能差些,在Mpsc场景没有使用的必要
  3. 无锁化
  4. 伪共享(本文没展现去掉了填充代码)
  5. 底层结构主要使用数组,性能更好
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。