Disruptor-cpp 简介


简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,使用JAVA语言编写,号称基于Disruptor开发的系统单线程能支撑每秒600万订单。不过,今天主要介绍的是Disruptor的一个C++版本,github地址:https://github.com/Abc-Arbitrage/Disruptor-cpp java版本:https://github.com/LMAX-Exchange/disruptor

编译安装

首先需要获取Disruptor-cpp源代码,地址在上面“简介”中。需要注意的是该版本的Disruptor引用了C++扩展库boost的部分内容,因此除了Disruptor-cpp本身的源代码以外,还需要额外下载一份boost源代码,地址:https://www.boost.org/

  • 编译boost
    现如今boost库的编译已经比较简单,不管是在Linux还是Windows平台,建议参考boost官方文档,当然你也可以通过百度找到无数的介绍boost编译的文章,不过这其中有很大一部分可参考性并不高,一方面是因为各种各样的转载和拷贝,导致文档排版混乱,可读性比较差;另一方面随着技术的不断更新,网上文章的描述或方法可能已经不再适用于当前的版本,可能会对初学者造成一定的误导(以上两点不限于boost编译问题,如果你也经常通过互联网搜索来解决问题,应该也会深有体会)。因此还是尽量参考官方文档吧,还能顺便提高英文阅读水平。
    终端运行(Windows为bootstrap.bat):
    bootstrap.sh
    b2
    b2 还可以通过参数指定不同的编译选项,编译完毕后生成的库文件会存放在stage目录下,如须进行安装继续运行命令:b2 install --prefix=安装目录 不指定该参数默认将安装在/usr/local目录下
  • 编译Disruptor-cpp
    如果你已经成功安装boost,那么编译安装Disruptor-cpp将变得异常简单,打开终端进入源代码目录依次执行以下命令即可完成编译安装(cmake会自动检测已经安装的boost库):
    mkdir build
    cd ./build
    cmake ../
    make
    make install
    Windows就更加简单了,源代码中附带有Visual Studio的工程文件(位于msvc目录),直接打开工程根据实际情况配置头文件和依赖库目录,根据需要配置编译选项,直接构建即可。有一点需要注意的是Windows下只需要配置boost库的目录即可,不需要明确指出需要链接那些库文件,boost会自动链接需要的库文件,因为boost库是Header-Only Libraries
    Visual Studio 工程文件

编码示例

利用Disruptor-cpp进行开发,首先需要理解它的基本结构(设计模式),换句话说也就是Disruptor-cpp都包含那些基本组件以及它们之间有着怎样的关系。

  1. Sequence: 表示事件的序号,可以理解为RingBuffer上的一个地址,拿到一个序号就获取了RingBuffer上一个事件操作的权限;
  2. Sequencer: Sequence只是一个事件的序号,要想获取一个序号就需要使用Sequencer向RingBuffer申请,因此Sequencer可以理解为生产者与缓存RingBuffer之间的桥梁;
  3. WaitStrategy: 表示等待策略,如需要发布事件时RingBuffer上已无可用位置或事件处理去获取事件时RingBuffer已无可消费事件时如何进行等待;
  4. SequenceBarrier: 消费者要获取一个事件并非直接访问RingBuffer,而是需要通过SequenceBarrier,因此SequenceBarrier可以理解为消费者与缓存RingBuffer之间的桥梁(注意与Sequencer的对比);
  5. EventProcessor: 事件处理器,是消费者线程池Executor的调度单元,是对事件处理EventHandler与异常处理ExceptionHandler等的一层封装;
  6. Event: 消费事件,具体实现由用户定义;
  7. RingBuffer: 基于数组的缓存实现,也是创建sequencer与定义WaitStrategy的入口;
  8. Disruptor: Disruptor的使用入口,持有RingBuffer、消费者线程池Executor、消费者集合ConsumerRepository等。
    更加相信信息还可以参考以下两篇文章:
    https://www.cnblogs.com/zhaohongtian/p/6801638.html
    https://www.cnblogs.com/daoqidelv/p/6995888.html

下面是Linux平台下示例代码:

// DisruptorDemo.cpp
#include "Disruptor/Disruptor.h"
#include "Disruptor/ThreadPerTaskScheduler.h"
#include <iostream>
#include <string>
#include <cstdint>
#include <memory>
#include <cstdlib>
#include <time.h>

// 定义事件体
struct Event
{
    int64_t id{ 0 };
    std::string str;
};

// 继承事件处理器接口
class Worker : public Disruptor::IWorkHandler< Event >
{
public:
    explicit Worker(){}
    // 重写事件处理回调函数
    void onEvent(Event& event) override
    {
        usleep(1000);
        _actuallyProcessed++;
    }

private:
    int32_t _actuallyProcessed{ 0 };
};

class Producer
{
public:
    Producer(std::int32_t ringBufferSize, std::int32_t workerCount)
    {
        m_ringBufferSize = ringBufferSize;
        m_workerCount = workerCount;
        // 创建调度器
        m_ptrTaskScheduler = std::make_shared< Disruptor::ThreadPerTaskScheduler >();
        // 创建Disruptor
        m_ptrDisruptor = std::make_shared< Disruptor::disruptor<Event> >([]() { return Event(); }
        , m_ringBufferSize, m_ptrTaskScheduler);
        // 创建事件处理器
        for (size_t i = 0; i < m_workerCount; i++)
        {
            m_workers.push_back(std::make_shared< Worker >());
        }
        // 绑定
        m_ptrDisruptor->handleEventsWithWorkerPool(m_workers);
    }

    ~Producer()
    {
        stop();
    }

    void start()
    {
        m_ptrTaskScheduler->start();
        m_ptrDisruptor->start();
    }

    void push(const Event &event)
    {
        auto ringBuffer = m_ptrDisruptor->ringBuffer();
        auto nextSequence = ringBuffer->next();
        (*ringBuffer)[nextSequence] = event;
        ringBuffer->publish(nextSequence);
    }

protected:
    void stop()
    {
        m_ptrDisruptor->shutdown();
        m_ptrTaskScheduler->stop();
    }

private:
    std::shared_ptr<Disruptor::ThreadPerTaskScheduler> m_ptrTaskScheduler;
    std::shared_ptr<Disruptor::disruptor<Event> > m_ptrDisruptor;
    std::vector<std::shared_ptr<Disruptor::IWorkHandler<Event> > > m_workers;
    std::int32_t m_ringBufferSize{ 1024 };
    std::int32_t m_workerCount{ 1 };
    std::int32_t m_schedulerCount{ 1 };
};


int main(int argc, char* argv[])
{
    timespec t1,t2;
    clock_gettime(CLOCK_MONOTONIC, &t1);

    if(argc < 4)
        return 0;

    Producer producer(atoi(argv[1]), atoi(argv[2]));
    producer.start();

    for (size_t i = 0; i < atoi(argv[3]); i++)
    {
        producer.push(Event());
    }

    clock_gettime(CLOCK_MONOTONIC, &t2);

    std::cout << "event num:" << argv[3] << std::endl;
    std::cout << "worker num:" << argv[2] << std::endl;
    std::cout << "buffer size:" << argv[1] << std::endl;
    std::cout << "time:" << 
    (t2.tv_nsec - t1.tv_nsec) / 1000000 + (t2.tv_sec - t1.tv_sec) * 1000
    << std::endl;
    
    return 0;
}
# DisruptorDemo CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(DisruptorDemo)

set(CMAKE_BUILD_TYPE Release)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")

add_executable(DisruptorDemo ${CMAKE_SOURCE_DIR}/src/DisruptorDemo.cpp)
target_link_libraries(DisruptorDemo Disruptor boost_system boost_thread boost_chrono)

下表记录了在不同Buffer Size和Worker Num时,程序处理2000个事件的总耗时(单位/毫秒,注意以下数均是在Windows平台下试验得到的,如果是在Linux平台下性能会好很多,但其变化趋势基本相同。另外队列不能设置太大,否则测得时间是不准确的,因为队列过大将导致很多数据被压入队列但实际上并没有被处理完毕,主函数就已经达到第二个计时点,因此队列大小相对于事件数量应该是可忽略的):

队列大小 \ 事件处理器数量 1 5 10 100
2 11391 6859 5922 7156
4 11344 3391 2797 3891
8 12953 2250 2094 2375
32 11782 1937 969 1297

分析表中数据大体能够得出以下结论(仅针对本文的示例程序):

  • 事件处理器为1时,队列大小对处理性能几乎没有影响;
  • 队列大小与处理性能大体成正比关系;
  • 随着事件处理器数量的增加,性能先升后降。

需要注意的是,队列也并非越大越好。因为随着队列的增大,其能够缓冲的事件量也会增加,在面对事件的生成速度大于处理速度时,就会导致大量的事件被缓冲在队列中,从而可能引发事件处理超时的情况;虽然事件处理器数量越大在应对并发量大的情形的时候会表现越好,但同时也会增加系统在线程调度方面的开销,因此选择合适的事件处理器数量也很重要;另外,还应当考虑事件的平均处理时间,如果事件处理耗时较大,应当适当增加事件处理器数量。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,084评论 1 32
  • 面试题参考1 : 面试题[http://www.cocoachina.com/ios/20150803/12872...
    江河_ios阅读 1,704评论 0 4
  • 用两张图告诉你,为什么你的 App 会卡顿? - Android - 掘金 Cover 有什么料? 从这篇文章中你...
    hw1212阅读 12,685评论 2 59
  • 刚下过雨,天空的空气是新鲜的,但是,泥土,是湿的。 我想爬到我们小区外边的天桥上,要必经一条泥土踩出来的小路。 我...
    真爱521阅读 225评论 0 0
  • 【日精进打卡第82天】 【知~学习】 背诵《六项精进》2遍 共133遍 背诵《大学》2遍 共133遍 ••••••...
    黄国金阅读 141评论 0 1