Java内置队列
介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
队列底层一般分为三种:数组、链表和堆。其中,堆是一般情况下是为了实现带有优先级特性队列,暂且不考虑。
接下来我们从数组和链表两种数据结构来考虑,基于数组的安全队列有ArrayBlickingQueue队列,它主要是通过加锁的方式来保证线程安全;基于链表的线程安全队列分为LinkBlickingQueue和ConcurrentLlikedQueue两大类,前者也是通过加锁的方式来保证现场安全,而ConcurrentLindkeQueue和LinkTransferQueue都是通过原子变量compare and swap(比较和交换 以下简称“CAS”)这种不加锁,自旋的方式来实现线程安全。
通过不加锁实现的队列都是无界队列(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求比较高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同事,为了减少Java垃圾回收队系统性能的影响,会尽量选择array/heap,通过比较下来,符合条件的只有ArrayBlockingQueue。
ArrayBlockingQueue和Disruptor比较
ArrayBlockingQueue是通过加锁的方式来实现线程安全的,会因为加锁
和伪共享
等出现严重的问题。
伪共享
什么是伪共享
下图是计算的基本结构。L1,L2,L3分别表示一级缓存、二级缓存、三级缓存,越靠近cup的运算速率越快,但是内存也就越低。L1、L2内存比较快,并且只能被一个单独的cpu核使用。L3更大,更慢,能被单个插槽上的所有cpu核共享;最后是主存,能被所有插槽上的所有CPU核共享。
当cup运行的时候,会先去查找L1,然后去L2,然后去L3,然后去内存中取。走的越远,所需要耗时就越长。所有需要做一个很频繁的事情,尽量在L1中完成。
缓存行
Cache是由多个cache line组成的,每个cache line是由64位字节,并且有效的引用内存中的一块地址,
java中的long类型变量是8个字节,因此一个缓存行中能存8个long类型的变量。
在访问一个long类型的数组时候,当你访问其中一个,会自动访问同个cache line 中7个,速度非常的快。
什么是伪共享。
ArraryBlockingQueue有三个成员变量。
- takeIndex:需要被取走的元素下标。
- putIndex : 可被元素插入的位置的下标
-
count:队列中元素的数量
这三个数据很容易放在一个缓存中,之间有没有太多的联系,导致每次修改都会使之前缓存的数据失效,从而达不到完全共享的效果。
ArrayBlockingQueue伪共享示意图.png
如图所示,当put一个元素到ArrayBlockingQueue中,putIndex会修改,会导致缓存中的消费者takeIndex失效,会重新到主内存中获取。
这种无法使用缓存行的现象,称为伪共享。
Disruptor
Disruptor通过以下设计来解决队列速度慢的问题。
- 环形数据结构
为避免垃圾回收,采用数组结构,而非链表结构。数组对处理器的缓存机制更加友好 - 元素定位的位置
数组长度2^n,通过位运算,加快定位的速度,下标采用递增的形式,不用担心超过index溢出的问题。采用long类型作为下标,就算100万QPS,也会有30万年才能用完 - 无锁设计
每个生产者或者消费者线程,会先申请元素在数组中的位置,申请位置后,直接在该位置插入或者读取元素。
下面忽略环形数组,通过CAS来实现原子性操作。来保证线程安全。
/**
* @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
*/
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
public class DisruptorMain
{
public static void main(String[] args) throws Exception
{
// 队列中的元素
class Element {
private int value;
public int get(){
return value;
}
public void set(int value){
this.value= value;
}
}
// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
// RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
// 处理Event的handler
EventHandler<Element> handler = new EventHandler<Element>(){
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch)
{
System.out.println("Element: " + element.get());
}
};
// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
// 指定RingBuffer的大小
int bufferSize = 16;
// 创建disruptor,采用单生产者模式
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
// 设置EventHandler
disruptor.handleEventsWith(handler);
// 启动disruptor的线程
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for (int l = 0; true; l++)
{
// 获取下一个可用位置的下标
long sequence = ringBuffer.next();
try
{
// 返回可用位置的元素
Element event = ringBuffer.get(sequence);
// 设置该位置元素的值
event.set(l);
}
finally
{
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
}
等待策略
生产者的等待策略
暂时只有休眠1ns。
LockSupport.parkNanos(1);
消费者的等待策略
名称 | 措施 | 适用场景 |
---|---|---|
BlockingWaitStrategy | 加锁 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
BusySpinWaitStrategy | 自旋 | 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定义策略 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 性能和CPU资源之间有很好的折中。延迟不均匀 |
TimeoutBlockingWaitStrategy | 加锁,有超时限制 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 性能和CPU资源之间有很好的折中。延迟比较均匀 |