Distruptor总结

Disruptor结构图.png

disruptor的核心设计思路

  • 使用循环数组的方式代替队列,使用预先填充数据的方式来避免GC
  • 使用CPU缓存行填充的方式来避免极端情况下的数据争用导致的性能下降
  • 多线程编程中尽量避免锁争用的编码技巧

生成端减少资源的竞争
多线程在队列也好,循环数组也好,必然存在对标志位的竞争。无论是使用锁来避免竞争,还是使用cas来进行无锁算法。只要争用的情况存在,并且线程较多,都会出现对资源的不断消耗。争用的对象越多,争用中消耗掉的资源也就越多。为了避免这样的情况,减少争用的资源就是一个手段。比如在循环数组中只保留一个标志位,也就是下一个可以写入数据位置的标志位。而尾部标志位则在各个消费者线程中保存(具体的编程手法后续细讲)

如果存在多个生产者,则可写入的标志位需要用cas算法来进行争夺,避免锁的使用。多个线程通过cas得到唯一的不冲突的下一个可写序号。由于需要获得序号后才能进行写入,而写入完成才可以让消费者线程进行消费。所以才获得序号后,完成写入前,必须有一种方式让消费者检测是否完成。以避免消费者拿到还未填入输入的数组位。

具体编程手法
主要的争夺环节集中在多线程发布中,序号大的线程发布需要等到序号小的线程发布完成后才能发布。那我们的优化的点也在这个地方。如果只有一个地方可以写入完成信息,必然需要争夺。为了避免争夺,我们可以使用标志数组(长度和内容数组相同,每一位表示相同下标的内容数组是否发布)来表示每一个位置是否写入。这样就可以避免发布的争夺(大家的标志位都不在一起了)。这也是Disruptor里面的availableSequence

但是又来带来一个问题,用什么数字来表示是否已经发布完成?如果只是0和1,那么写过1轮以后,标志数组位上就都是1了。又无法区分。所以标志数组上的数字应该在循环数组的每一轮循环的值都不同。比如一开始都是-1,第一轮中是0的表示已发布,第二轮中是0表示没发布,是1的表示已发布。

Disruptor中的具体处理
发布的算法步骤
1.将序号除以标志数组长度(因为长度是2的次方幂,这一步可以通过右移来完成)得到填入值x。(序号每次都会增长,所以每次都应该不同)
2.将序号和标志数组长度减一进行并运算得到填入位置index。即类似于hash&(length-1)获取填入位置
4.将x写入index位置。

消费端减少对锁的使用
生产者消费者中最容易出现争夺的,采用的优化手段包括

  • 使用循环数组代替队列,使用cas算法来代替锁争夺
  • 消费者各自保存自己当前已经处理过的序号,而不是将这个序号的信息在队列中来存储,避免多线程争用。
  • 生产者线程则需要持有消费者的类的信息,好用来判断所有消费者中消费的最小的序号,以避免在数据写入时覆盖了某个消费者尚未处理的数据信息。(生成端是怎么来获取消费者消费最小的序号的,后面讲)

伪共享
这个主要跟计算机的基本结构。L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。

当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中。

如果不同的线程变量处于同一个缓存行,及时其中一个的缓存数据是最新的,如果另外一个变量需要更新,那么整个缓存行都会更新,具体可以参见计算机MESI协议.

解决伪共享的一种方法是通过补齐(Padding),使得每一条缓存行只存一个多线程变量
32位的计算机上一个缓存行是64个字节,而一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量

CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line

在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个
所以一般都是左右补7个。但这种方法是有局限性就是和计算机位数有关系以及jvm可能会对无用的变量做优化导致补齐并不能生效

解决伪共享的第二种方法就是使用@ Contended注解,这个是java给我们提供的方案


下面看源码
单生产者生产数据
1.申请写入m个元素
2.若是有m个元素可以写入,则返回最大的序列号。这儿需要注意的是是否会覆盖未读的元素
3.若是返回正确,则生产者开始写入元素

public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue; 上一次生产的

        long nextSequence = nextValue + n;  
        long wrapPoint = nextSequence - bufferSize;  同一起跑线进行比较
        long cachedGatingSequence = this.cachedValue;

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence 

            long minSequence;
           判断是否覆盖了最慢的消费者
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?   进行阻塞
            }

            this.cachedValue = minSequence;  将最慢的消费者缓存起来
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }
image.png

多个生产者

防止多个线程重复写同一个元素

解决方案:

  • 每个线程获取不同的一段数组空间进行操作,在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可

防止读取的时候,读到还未写的元素

解决方案
引入了一个与Ring Buffer大小相同的buffer
即位数组available Buffer
当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功
读取的时候,会遍历available Buffer,来判断元素是否已经就绪

代码如下,引入了CAS操作以及原子变量

public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();
            next = current + n;

            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);  通过这个方法获取到消费端最慢的序号

                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }

再来看消费者是怎么消费的

单消费者

com.lmax.disruptor.dsl.Disruptor.start
 for循环
 ->com.lmax.disruptor.dsl.EventProcessorInfo.start
   ->com.lmax.disruptor.BatchEventProcessor.run

BatchEventProcessor.run()方法如下

public void run()
    {
        if (running.compareAndSet(IDLE, RUNNING))
        {
            sequenceBarrier.clearAlert();

            notifyStart();
            try
            {
                if (running.get() == RUNNING)
                {
                    processEvents();
                }
            }
            finally
            {
                notifyShutdown();
                running.set(IDLE);
            }
        }
        else
        {
            // This is a little bit of guess work.  The running state could of changed to HALTED by
            // this point.  However, Java does not have compareAndExchange which is the only way
            // to get it exactly correct.
            if (running.get() == RUNNING)
            {
                throw new IllegalStateException("Thread is already running");
            }
            else
            {
                earlyExit();
            }
        }
    }

processEvents()方法如下

private void processEvents()
    {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true)
        {
            try
            {
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);  从位数组中获取可消费的序号
                if (batchStartAware != null)
                {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);   获取做大的可消费序号,批量消费
                }

                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);  调用消费者消费的逻辑
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (running.get() != RUNNING)
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }

多消费者情况

com.lmax.disruptor.dsl.Disruptor.start
 for循环
 ->com.lmax.disruptor.dsl.EventProcessorInfo.start
   ->com.lmax.disruptor.WorkProcessor.run

WorkProcessor.run()方法

public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();  使用了原子变量
        T event = null;
        while (true)
        {
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
              
                使用CAS来获取可消费的序号
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }

                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

所以,单消费者和多消费者的区别是什么?

单消费者时:

1.直接获取欲发布的
long nextSequence = sequence.get() + 1L;

2.满足条件时进行数据处理
while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

3.更新序号
sequence.set(availableSequence);

这里是不需要考虑并发问题的

多消费者时

消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。

1.使用cas操作获取欲发布的
do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);//备注  直接在这里更新序号了
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));

2.满足条件时进行数据处理
 if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }

3.更新序号


消费者的等待策略

名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,809评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,189评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,290评论 0 359
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,399评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,425评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,116评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,710评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,629评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,155评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,261评论 3 339
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,399评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,068评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,758评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,252评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,381评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,747评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,402评论 2 358

推荐阅读更多精彩内容

  • 转载自https://tech.meituan.com/disruptor.html Disruptor是英国外汇...
    檀文渊阅读 2,665评论 0 25
  • 本文是笔者在研究Disruptor过程中对Disruptor官方介绍与入门指南的翻译,有些部分做了适当编辑和增减。...
    coder_jerry阅读 8,624评论 2 56
  • disruptor是一个高性能的内存队列,之所以高性能,因为有以下几个特点: 1 整个disruptor的实现在并...
    划水者阅读 683评论 0 3
  • LMAX是一种新型零售金融交易平台,它能够以很低的延迟(latency)产生大量交易(吞吐量). 这个系统是建立在...
    举头明鉴阅读 6,311评论 2 10
  • Disruptor提供了一种线程之间信息交换的方式。 锁的缺点 并发的问题 想象有两个线程尝试修改同一个变量val...
    jiangmo阅读 1,688评论 0 2