Disruptor在流程编排中的应用与探索

作者姓名:邓泽波


文章简介:Disruptor是一种高性能的异步事件处理框架,它通过无锁的方式实现了高效的并发处理,通过本文为大家介绍将优秀的框架引入流程编排中并落地

文章内容:

金融事业部同学为大家介绍将Disruptor这一框架引入流程编排中的应用探索


1、流程编排引入Disruptor背景

在业务开发中,经常会遇到需要将同步接收的请求转换为异步流程的场景,例如归集还款接收合作方的同步请求后,需要异步处理文件,并逐笔调用核心还款冲销。类似的场景很多,在流程编排中提供了异步链功能,可以通过配置来指定链的异步或同步方式。对于异步链,它会被加入到线程池中进行异步处理,这样在业务开发中无需关注异步线程池的具体实现,只需进行简单的配置即可完成异步流程的编排。这种优化方案能够简化异步流程的开发过程,提高效率和可维护性。

那为何选Disruptor的作为异步线程池实现方案,Disruptor是一种高性能的异步事件处理框架,最初由LMAX Exchange公司开发。它通过无锁(lock-free)的方式实现了高效的并发处理,可以在多核CPU环境下处理百万级别的事件。Disruptor通过将事件数据和处理逻辑分离,实现了高效的数据流转和处理,同时也提供了丰富的可配置参数,方便用户根据具体的应用场景进行定制化配置。

应用Disruptor的知名项目有如下的一些:Storm, Camel, Log4j2,还有目前的美团点评技术团队也有很多不少的应用,或者说有一些借鉴了它的设计机制。


2、Disruptor结构

Disruptor是一个开源的框架,可以在无锁的情况下对队列进行高并发操作,那么这个队列的设计就是Disruptor的核心所在;

环形数组RingBuffer


在Disruptor中,采用了RingBuffer来作为队列的数据结构,RingBuffer就是一个环形的数组,既然是数组,我们便可对其设置大小;

在这个ringBuffer中,除了数组之外,还有一个序列号,用来指向数组中的下一个可用元素,供生产者使用或者消费者使用,也就是生产者可以生产的地方,或者消费者可以消费的地方(序列号和数组索引是两个概念);

序列号=263次方-1. 30万年才能用完

RingBuffer的优点

[if !supportLists]1. [endif]高效的内存管理:RingBuffer是基于内存的实现,可以有效地管理内存,减少内存分配和回收的开销,提高系统性能。

[if !supportLists]2. [endif]无锁设计:RingBuffer采用无锁设计,可以避免线程间的锁竞争,提高系统的并发性能。

[if !supportLists]3. [endif]高效的数据结构:RingBuffer是一个环形缓冲区,可以高效地存储和访问数据,同时支持多生产者和多消费者模式。

[if !supportLists]4. [endif]支持批量读写:RingBuffer可以支持批量读写操作,可以减少系统的上下文切换开销,提高系统的性能。

[if !supportLists]5. [endif]易于扩展:RingBuffer可以很容易地扩展到多个消费者和生产者,支持动态增加和删除消费者和生产者,具有很好的可扩展性。

RingBuffer具有高效、无锁、高效的数据结构、支持批量读写和易于扩展等优点,可以帮助开发者构建高性能、可扩展的系统


3、Disruptor主要实现类

Disruptor

Disruptor的入口,主要封装了环形队列RingBuffer、消费者集合ConsumerRepository的引用,主要提供了获取环形队列、添加消费者、生产者向RingBuffer中添加事件(即生产者生产数据的操作)

RingBuffer

Disruptor中队列具体的实现,底层封装了Object[]数组,在初始化时,会使用Event事件对数组进行填充,填充的大小就是bufferSize设置的值,此外该对象内部还维护了Sequencer(序列生成器)具体的实现

Sequencer

序列生成器,分别有MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。在Sequencer中,维护了消费者的Sequence(序列对象)和生产者自己的Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略WaitStrategy

Sequence

序列对象,内部维护了一个long型的value,这个序列指向了RingBuffer中Object[]数组具体的角标,生产者和消费者各自维护自己的Sequence,但都是指向RingBuffer的Object[]数组;

WaitStrategy

等待策略,当没有可消费的事件时,消费者根据特定的策略进行等待,当没有可生产的地方时,生产者根据特定的策略进行等待

Event

事件对象,就是我们Ringbuffer中存在的数据,在Disruptor中用Event来定义数据,并不存在Event类,它只是一个定义,是一个概念,表示要传递的数据;

EventProcessor

事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用自定义的事件处理器,也就是是否可以进行消费

EventHandler

事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现EventHandler接口

Producer

事件生产者,我们定义的发送事件的对象

4、Disruptor的生产和消费

(1)当Disruptor框架启动:


(2)此时,还没有数据进行写入

(3)准备写入数据前的准备,获取可以写入数据的最大序列;


(4)写入数据完成,更新生产者序列对象的值;


以上就是单生产者写入数据的过程,要注意的是,无论是生产者还是消费者,序列的初始值都是-1

当引入消费者后,生产者在获取可写入的序列之前,都会判断消费者所处的序列。


5、Disruptor常用等待策略

com.lmax.disruptor.WaitStrategy

决定一个消费者如何等待生产者将Event置入Disruptor;

其所有实现都是针对消费者线程的;

主要策略有

com.lmax.disruptor.BlockingWaitStrategy

最低效的策略,但其对CPU的消耗最小,并且在各种部署环境中能提供更加一致的性能表现;

内部维护了一个重入锁ReentrantLock和Condition;


com.lmax.disruptor.SleepingWaitStrategy

性能表现和com.lmax.disruptor.BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;

是一种无锁的方式,比如log4j2使用了Disruptor框架;


com.lmax.disruptor.YieldingWaitStrategy

性能最好,适合用于低延迟的系统,在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中,推荐使用此策略,例如CPU开启超线程的特性;

6、Disruptor为什么这么快

Disruptor通过以下设计来解决队列速度慢的问题:

环形数组结构

为了避免垃圾回收,采用数组而非链表,同时,数组对处理器的缓存机制更加友好。

元素位置定位

数组长度2^n,通过位运算,加快定位的速度,下标采取递增的形式,不用担心index溢出的问题,index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。


[if !supportLists]7、 [endif]性能测试及内存分析

为了直观地感受Disruptor 有多快,设计了一个性能对比测试:Producer 发布 1 亿次事件,从发布第一个事件开始计时,捕捉 Consumer 处理完所有事件的耗时。

测试用例在Producer 如何将事件通知到 Consumer 的实现方式上,设计了两种不同的实现:

1. Producer 的事件发布和 Consumer 的事件处理在不同的线程,通过 ArrayBlockingQueue 传递给Consumer 进行处理;

2. Producer 的事件发布和 Consumer 的事件处理在不同的线程,通过 Disruptor 传递给 Consumer进行处理;


关键测试代码

a. 抽象类

进行一亿次CAS运算计算耗时


b. ArrayBlockingQueue性能测试代码


c. Disruptor性能测试代码



耗时测试对比结果

测试类运行次数耗时(ms)

DisruptorTest1000000003963

ArrayBlockingQueueTest10000000014430

 

堆内存对比结果

指定-Xms1024m -Xmx1024m 运行环境

ArrayBlockingQueueTest 测试结果


DisruptorTest 测试结果


gc对比:ArrayBlockingQueueTest触发了13次gc,而DisruptorTest使用ringBuffer避免了每次需要分配和释放,因此未触发gc;

新生代内存:ArrayBlockingQueueTest占用了177176K,而DisruptorTest占用82740k,节省40%的内存占用

老年的内存:ArrayBlockingQueueTest占用5%,而DisruptorTest占用0%

经过对比可以看出Disruptor对应内存的使用优化更优



8、实际场景应用

流程编排如何支持异步链,将同步链路转为异步链路,传统方式开启异步线程执行,线程数量超过核心线程数时,线程将放入java队列中等待唤起,java队列的缺点上面已经阐述,我们可以通过Disruptor的特性自定义实现线程池


流程编排增加Disruptor异步线程池扩展,丰富组件的功能


以上为根据Disruptor组件封装的自定义线程池,收发消息的公共模块

[if !supportLists]1. [endif]Disruptor中队列具体的实现,底层封装了Object[]数组

[if !supportLists]2. [endif]WorkPoolManager 提供Disruptor自定义线程组件初始化,缓存线程池

[if !supportLists]3. [endif]DisruptorWorkPool 实现自定义线程池的封装,线程池的一级缓存队列、二级缓存队列,

ringbuffer 虽然号称无界队列,但本质是一个数组(有界),只是消息在数组上可以反复覆盖。

当消息没有被覆盖的情况下,已经消费的消息一直被引用不会GC,所以建议ringbuffer的size不要太大。

但是异步任务很可能是一个耗时的长任务,所以在此引进了二级缓存的概念

[if !supportLists]4. [endif]WorkHandler 为消费者接口,DefaultWorkerHandler 封装消费handler执行的父类

针对组件的封装,使用者只需关注业务逻辑handler实现即可


以流程编排执行一个异步流程的handler为例,画如下时序图


[if !supportLists]1. [endif]服务启动时,RouterConfigrator加载链配置,循环handler调用WorkPoolManager.init()初始化线程池,并设置事件业务处理器对应消费者处理


[if !supportLists]2. [endif]用户发起请求后,业务系统接收到请求,RouterService解析请求参数的routerName,在spring容器中获取该bean

[if !supportLists]3. [endif]RouterPipleLine处理类循环遍历Router中的handler,判断async是否为异步标识,如上图配置所示。当async为false同步时,直接执行handler;当async为true异步时,向DisruptorWorkPool自定义线程池中加入该handler。

[if !supportLists]4. [endif]DefaultWorkerHandler消费监听到消息,解析消息信息,并执行对应handler,完成异步链路。

9、总结

       Disruptor 和传统的线程池相比,具有更高的并发性能和更低的延迟。这是因为 Disruptor 使用了无锁算法和基于序列的技术来实现数据共享和通信,避免了线程间的互斥和同步操作,从而提高了并发性能,并且由于没有线程切换的开销,也可以降低延迟。

     Disruptor 适用于需要高性能、低延迟、大规模并发、对数据顺序有要求等场景,例如高频交易系统、大规模数据处理系统、实时消息系统等。Disruptor 是一种本地内存消息传递机制,不适用于分布式系统。如果需要在分布式环境中使用 Disruptor,可以考虑使用类似于 Kafka 的分布式消息队列来代替。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352

推荐阅读更多精彩内容