Mpsc(Multi producer single consumer)即多生产者单消费者队列,是Jctools中的高性能队列,也是netty经常的队列,如EventLoop中的事件队列就用Mpsc而不是jdk自带的队列。
本文主要介绍二类Mpsc队列:MpscArrayQueue、MpscChunkedArrayQueue
MpscArrayQueue
MpscArrayQueue是定长队列,底层用环形数组实现。
// 计算下标辅助值,初始为容量-1,这样可以用&运算
protected final long mask;
// 存放数据的数组
protected final E[] buffer;
// 生产者的索引
private volatile long producerIndex;
// 生产者的下标限制值,用来判断队列是否已满
private volatile long producerLimit;
// 消费者的索引
protected long consumerIndex;
接下来看offer方法:
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}
// use a cached view on consumer index (potentially updated in loop)
final long mask = this.mask;
//lvProducerLimit直接返回生产者索引最大限制
long producerLimit = lvProducerLimit();
long pIndex;
do
{
//获取生产者索引
pIndex = lvProducerIndex();
if (pIndex >= producerLimit)
{
final long cIndex = lvConsumerIndex();
//重新计算生产者索引最大限制值,producerLimit=mask+1=容量,cIndex是消费者索引,cIndex不等于0说明有消费,那么producerLimit也要相应的增加
producerLimit = cIndex + mask + 1;
if (pIndex >= producerLimit)
{
return false; // FULL :(
}
else
{
// update producer limit to the next index that we must recheck the consumer index
// this is racy, but the race is benign
soProducerLimit(producerLimit);
}
}
}
//死循环CAS设置pIndex下标实际内存偏移地址
while (!casProducerIndex(pIndex, pIndex + 1));
//计算pIndex下标实际内存偏移地址
final long offset = calcCircularRefElementOffset(pIndex, mask);
//将pIndex下标实际内存偏移地址设置为要插入的值
soRefElement(buffer, offset, e);
return true; // AWESOME :)
}
final boolean casProducerIndex(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
public static long calcCircularRefElementOffset(long index, long mask)
{
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
这里计算pIndex下标实际内存偏移地址方法要注意下:
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
REF_ARRAY_BASE=UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
其中REF_ARRAY_BASE是数组内存初始偏移量,scale是每个数组每个元素的内存增量REF_ELEMENT_SHIFT是转成2的n次方,好利用位运算来计算。
最后看下poll方法:
public E poll()
{
final long cIndex = lpConsumerIndex();
//根据cIndex计算实际的内存偏移值
final long offset = calcCircularRefElementOffset(cIndex, mask);
// Copy field to avoid re-reading after volatile load
final E[] buffer = this.buffer;
// If we can't see the next available element we can't poll
//获取当前可消费的元素
E e = lvRefElement(buffer, offset);
//e=null则一直循环获取
if (null == e)
{
if (cIndex != lvProducerIndex())
{
do
{
e = lvRefElement(buffer, offset);
}
while (e == null);
}
else
{
return null;
}
}
//消费成功,将元素设为null
soRefElement(buffer, offset, null);
//消费者索引+1
soConsumerIndex(cIndex + 1);
return e;
}
public static long calcCircularRefElementOffset(long index, long mask)
{
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
public static <E> E lvRefElement(E[] buffer, long offset)
{
return (E) UNSAFE.getObjectVolatile(buffer, offset);
}
public static <E> void soRefElement(E[] buffer, long offset, E e)
{
UNSAFE.putOrderedObject(buffer, offset, e);
}
final void soConsumerIndex(long newValue)
{
UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
}
MpscChunkedArrayQueue
MpscChunkedArrayQueue是一个非定长的队列,适合无法预测队列长度的场景。基于数组+链表的结构,不会像链表那样分配过多的Node,吞吐量比传统的链表高。
属性:
//消费者辅助计算值=(容量-1)/2
protected long consumerMask;
//和生产者一样,指向数组引用
protected E[] consumerBuffer;
//消费者索引
protected long consumerIndex;
protected long producerMask;
protected long producerIndex;
private volatile long producerLimit;
protected E[] producerBuffer;
//最大容量,默认为Pow2.roundToPowerOfTwo(maxCapacity)) << 1
protected final long maxQueueCapacity;
构造方法:
public BaseMpscLinkedArrayQueue(final int initialCapacity)
{
RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
//比initialCapacity大的最近的2^n值
int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
// leave lower bit of mask clear
long mask = (p2capacity - 1) << 1;
// need extra element to point at next array
//初始数组大小=p2capacity + 1
E[] buffer = allocateRefArray(p2capacity + 1);
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
consumerMask = mask;
soProducerLimit(mask); // we know it's all empty to start with
}
先看poll方法:
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}
long mask;
E[] buffer;
long pIndex;
while (true)
{
long producerLimit = lvProducerLimit();
pIndex = lvProducerIndex();
// lower bit is indicative of resize, if we see it we spin until it's cleared
//和MpscArrayQueue不一样,pIndex每次会加2,低位是识别扩容用的,如果是扩容,则等待扩容完毕(扩容完会设置pIndex为2的倍数)
if ((pIndex & 1) == 1)
{
continue;
}
// pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
mask = this.producerMask;
buffer = this.producerBuffer;
//当阈值小于生产者索引时,需要扩容,否则pIndex+2
if (producerLimit <= pIndex)
{
//返回状态值,根据状态值处理新元素
int result = offerSlowPath(mask, pIndex, producerLimit);
switch (result)
{
case CONTINUE_TO_P_INDEX_CAS: //继续尝试CAS设置pIndex+2
break;
case RETRY: //可能CAS并发失败,继续
continue;
case QUEUE_FULL: //队列满了
return false;
case QUEUE_RESIZE: //队列需要扩容
resize(mask, buffer, pIndex, e, null);
return true;
}
}
//CAS设置pIndex+2
if (casProducerIndex(pIndex, pIndex + 2))
{
break;
}
}
// 获取pIndex实际内存偏移值并设置,和MSPCArrayQueue一样
final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
soRefElement(buffer, offset, e); // release element e
return true;
}
继续看offerSlowPath方法:
private int offerSlowPath(long mask, long pIndex, long producerLimit)
{
final long cIndex = lvConsumerIndex(); //计算消费者索引
long bufferCapacity = getCurrentBufferCapacity(mask); //获取buffer容量
//如果消费者索引+buff容量>生产者索引,说明当前容量不够用了
if (cIndex + bufferCapacity > pIndex)
{
//尝试CAS设置producerLimit=cIndex + bufferCapacity,成功返回继续,失败返回重试
if (!casProducerLimit(producerLimit, cIndex + bufferCapacity))
{
// retry from top
return RETRY;
}
else
{
// continue to pIndex CAS
return CONTINUE_TO_P_INDEX_CAS;
}
}
//超过最大容量,满了
// full and cannot grow
else if (availableInQueue(pIndex, cIndex) <= 0)
{
// offer should return false;
return QUEUE_FULL;
}
// grab index for resize -> set lower bit
//设置pIndex+1,成功的话返回扩容,注意这个+1操作,前面就有pIndex&1操作来判断是否扩容
else if (casProducerIndex(pIndex, pIndex + 1))
{
// trigger a resize
return QUEUE_RESIZE;
}
else
{
// failed resize attempt, retry from top
return RETRY;
}
}
protected long availableInQueue(long pIndex, long cIndex)
{
return maxQueueCapacity - (pIndex - cIndex);
}
继续看扩容方法resize
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
{
assert (e != null && s == null) || (e == null || s != null);
//获取oldBuffer长度值
int newBufferLength = getNextBufferSize(oldBuffer);
final E[] newBuffer;
try
{
newBuffer = allocateRefArray(newBufferLength);
}
catch (OutOfMemoryError oom)
{
assert lvProducerIndex() == pIndex + 1;
soProducerIndex(pIndex);
throw oom;
}
producerBuffer = newBuffer;
final int newMask = (newBufferLength - 2) << 1;
producerMask = newMask;
//分别根据oldMask、newMask获取偏移位置值
final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
//将元素e设置到新的缓冲区newBuffer的offsetInNew位置处
soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
// 将oldBuffer中最后一个元素的位置指向新的缓冲区newBuffer
soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
// ASSERT code
final long cIndex = lvConsumerIndex();
final long availableInQueue = availableInQueue(pIndex, cIndex);
RangeUtil.checkPositive(availableInQueue, "availableInQueue");
// 重新计算阈值
soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
// make resize visible to the other producers
soProducerIndex(pIndex + 2);
// INDEX visible before ELEMENT, consistent with consumer expectation
// make resize visible to consumer
//用一个空对象JUMP来连接新老缓冲区,消费遇到JUMP就要获取新数组地址了
soRefElement(oldBuffer, offsetInOld, JUMP);
}
继续看poll方法:
public E poll()
{
final E[] buffer = consumerBuffer;
final long index = lpConsumerIndex();
final long mask = consumerMask;
//获取消费者索引实际内存偏移值
final long offset = modifiedCalcCircularRefElementOffset(index, mask);
Object e = lvRefElement(buffer, offset);
if (e == null)
{
if (index != lvProducerIndex())
{
// poll() == null iff queue is empty, null element is not strong enough indicator, so we must
// check the producer index. If the queue is indeed not empty we spin until element is
// visible.
do
{
e = lvRefElement(buffer, offset);
}
while (e == null);
}
else
{
return null;
}
}
//如果e为JUMP,说明扩容过,要找下一个数组
if (e == JUMP)
{
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
//设置元素为null并更新消费者索引
soRefElement(buffer, offset, null); // release element null
soConsumerIndex(index + 2); // release cIndex
return (E) e;
}
private E newBufferPoll(E[] nextBuffer, long index)
{
final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
final E n = lvRefElement(nextBuffer, offset);
if (n == null)
{
throw new IllegalStateException("new buffer must have at least one element");
}
soRefElement(nextBuffer, offset, null);
soConsumerIndex(index + 2);
return n;
}
总结下优化点:
- 大量的位运算
- 使用Unsafe.putOrderedXXX(以前是putXXXVolitaile),Volitaile语义会让其他线程立刻看到值,使用的是store-load屏障,性能差些,在Mpsc场景没有使用的必要
- 无锁化
- 伪共享(本文没展现去掉了填充代码)
- 底层结构主要使用数组,性能更好