【转】高性能队列Disruptor

Java内置队列

介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

队列底层一般分为三种:数组、链表和堆。其中,堆是一般情况下是为了实现带有优先级特性队列,暂且不考虑。

接下来我们从数组和链表两种数据结构来考虑,基于数组的安全队列有ArrayBlickingQueue队列,它主要是通过加锁的方式来保证线程安全;基于链表的线程安全队列分为LinkBlickingQueue和ConcurrentLlikedQueue两大类,前者也是通过加锁的方式来保证现场安全,而ConcurrentLindkeQueue和LinkTransferQueue都是通过原子变量compare and swap(比较和交换 以下简称“CAS”)这种不加锁,自旋的方式来实现线程安全。

通过不加锁实现的队列都是无界队列(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求比较高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同事,为了减少Java垃圾回收队系统性能的影响,会尽量选择array/heap,通过比较下来,符合条件的只有ArrayBlockingQueue。

ArrayBlockingQueue和Disruptor比较

ArrayBlockingQueue是通过加锁的方式来实现线程安全的,会因为加锁伪共享等出现严重的问题。

伪共享
什么是伪共享

下图是计算的基本结构。L1,L2,L3分别表示一级缓存、二级缓存、三级缓存,越靠近cup的运算速率越快,但是内存也就越低。L1、L2内存比较快,并且只能被一个单独的cpu核使用。L3更大,更慢,能被单个插槽上的所有cpu核共享;最后是主存,能被所有插槽上的所有CPU核共享。


image.png

当cup运行的时候,会先去查找L1,然后去L2,然后去L3,然后去内存中取。走的越远,所需要耗时就越长。所有需要做一个很频繁的事情,尽量在L1中完成。

缓存行

Cache是由多个cache line组成的,每个cache line是由64位字节,并且有效的引用内存中的一块地址,
java中的long类型变量是8个字节,因此一个缓存行中能存8个long类型的变量。
在访问一个long类型的数组时候,当你访问其中一个,会自动访问同个cache line 中7个,速度非常的快。

什么是伪共享。

ArraryBlockingQueue有三个成员变量。

  • takeIndex:需要被取走的元素下标。
  • putIndex : 可被元素插入的位置的下标
  • count:队列中元素的数量
    这三个数据很容易放在一个缓存中,之间有没有太多的联系,导致每次修改都会使之前缓存的数据失效,从而达不到完全共享的效果。


    ArrayBlockingQueue伪共享示意图.png

如图所示,当put一个元素到ArrayBlockingQueue中,putIndex会修改,会导致缓存中的消费者takeIndex失效,会重新到主内存中获取。
这种无法使用缓存行的现象,称为伪共享。

Disruptor

Disruptor通过以下设计来解决队列速度慢的问题。

  1. 环形数据结构
    为避免垃圾回收,采用数组结构,而非链表结构。数组对处理器的缓存机制更加友好
  2. 元素定位的位置
    数组长度2^n,通过位运算,加快定位的速度,下标采用递增的形式,不用担心超过index溢出的问题。采用long类型作为下标,就算100万QPS,也会有30万年才能用完
  3. 无锁设计
    每个生产者或者消费者线程,会先申请元素在数组中的位置,申请位置后,直接在该位置插入或者读取元素。
    下面忽略环形数组,通过CAS来实现原子性操作。来保证线程安全。
/**
 * @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
 */
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;


public class DisruptorMain
{
    public static void main(String[] args) throws Exception
    {
        // 队列中的元素
        class Element {

            private int value;

            public int get(){
                return value;
            }

            public void set(int value){
                this.value= value;
            }

        }

        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory(){
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>(){
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch)
            {
                System.out.println("Element: " + element.get());
            }
        };

        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 16;

        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 设置EventHandler
        disruptor.handleEventsWith(handler);

        // 启动disruptor的线程
        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; true; l++)
        {
            // 获取下一个可用位置的下标
            long sequence = ringBuffer.next();  
            try
            {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence); 
                // 设置该位置元素的值
                event.set(l); 
            }
            finally
            {
                ringBuffer.publish(sequence);
            }
            Thread.sleep(10);
        }
    }
}
等待策略
生产者的等待策略

暂时只有休眠1ns。

LockSupport.parkNanos(1);

消费者的等待策略
名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

转自:https://tech.meituan.com/disruptor.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,884评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,212评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,351评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,412评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,438评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,127评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,714评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,636评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,173评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,264评论 3 339
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,402评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,073评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,763评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,253评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,382评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,749评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,403评论 2 358

推荐阅读更多精彩内容