简介
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,使用JAVA语言编写,号称基于Disruptor开发的系统单线程能支撑每秒600万订单。不过,今天主要介绍的是Disruptor的一个C++版本,github地址:https://github.com/Abc-Arbitrage/Disruptor-cpp java版本:https://github.com/LMAX-Exchange/disruptor
编译安装
首先需要获取Disruptor-cpp源代码,地址在上面“简介”中。需要注意的是该版本的Disruptor引用了C++扩展库boost的部分内容,因此除了Disruptor-cpp本身的源代码以外,还需要额外下载一份boost源代码,地址:https://www.boost.org/
-
编译boost
现如今boost库的编译已经比较简单,不管是在Linux还是Windows平台,建议参考boost官方文档,当然你也可以通过百度找到无数的介绍boost编译的文章,不过这其中有很大一部分可参考性并不高,一方面是因为各种各样的转载和拷贝,导致文档排版混乱,可读性比较差;另一方面随着技术的不断更新,网上文章的描述或方法可能已经不再适用于当前的版本,可能会对初学者造成一定的误导(以上两点不限于boost编译问题,如果你也经常通过互联网搜索来解决问题,应该也会深有体会)。因此还是尽量参考官方文档吧,还能顺便提高英文阅读水平。
终端运行(Windows为bootstrap.bat):
bootstrap.sh
b2
b2 还可以通过参数指定不同的编译选项,编译完毕后生成的库文件会存放在stage
目录下,如须进行安装继续运行命令:b2 install --prefix=安装目录 不指定该参数默认将安装在/usr/local目录下
-
编译Disruptor-cpp
如果你已经成功安装boost,那么编译安装Disruptor-cpp将变得异常简单,打开终端进入源代码目录依次执行以下命令即可完成编译安装(cmake会自动检测已经安装的boost库):
mkdir build
cd ./build
cmake ../
make
make install
Windows就更加简单了,源代码中附带有Visual Studio的工程文件(位于msvc目录),直接打开工程根据实际情况配置头文件和依赖库目录,根据需要配置编译选项,直接构建即可。有一点需要注意的是Windows下只需要配置boost库的目录即可,不需要明确指出需要链接那些库文件,boost会自动链接需要的库文件,因为boost库是Header-Only Libraries
。
编码示例
利用Disruptor-cpp进行开发,首先需要理解它的基本结构(设计模式),换句话说也就是Disruptor-cpp都包含那些基本组件以及它们之间有着怎样的关系。
- Sequence: 表示事件的序号,可以理解为RingBuffer上的一个地址,拿到一个序号就获取了RingBuffer上一个事件操作的权限;
- Sequencer: Sequence只是一个事件的序号,要想获取一个序号就需要使用Sequencer向RingBuffer申请,因此Sequencer可以理解为生产者与缓存RingBuffer之间的桥梁;
- WaitStrategy: 表示等待策略,如需要发布事件时RingBuffer上已无可用位置或事件处理去获取事件时RingBuffer已无可消费事件时如何进行等待;
- SequenceBarrier: 消费者要获取一个事件并非直接访问RingBuffer,而是需要通过SequenceBarrier,因此SequenceBarrier可以理解为消费者与缓存RingBuffer之间的桥梁(注意与Sequencer的对比);
- EventProcessor: 事件处理器,是消费者线程池Executor的调度单元,是对事件处理EventHandler与异常处理ExceptionHandler等的一层封装;
- Event: 消费事件,具体实现由用户定义;
- RingBuffer: 基于数组的缓存实现,也是创建sequencer与定义WaitStrategy的入口;
- Disruptor: Disruptor的使用入口,持有RingBuffer、消费者线程池Executor、消费者集合ConsumerRepository等。
更加相信信息还可以参考以下两篇文章:
https://www.cnblogs.com/zhaohongtian/p/6801638.html
https://www.cnblogs.com/daoqidelv/p/6995888.html
下面是Linux平台下示例代码:
// DisruptorDemo.cpp
#include "Disruptor/Disruptor.h"
#include "Disruptor/ThreadPerTaskScheduler.h"
#include <iostream>
#include <string>
#include <cstdint>
#include <memory>
#include <cstdlib>
#include <time.h>
// 定义事件体
struct Event
{
int64_t id{ 0 };
std::string str;
};
// 继承事件处理器接口
class Worker : public Disruptor::IWorkHandler< Event >
{
public:
explicit Worker(){}
// 重写事件处理回调函数
void onEvent(Event& event) override
{
usleep(1000);
_actuallyProcessed++;
}
private:
int32_t _actuallyProcessed{ 0 };
};
class Producer
{
public:
Producer(std::int32_t ringBufferSize, std::int32_t workerCount)
{
m_ringBufferSize = ringBufferSize;
m_workerCount = workerCount;
// 创建调度器
m_ptrTaskScheduler = std::make_shared< Disruptor::ThreadPerTaskScheduler >();
// 创建Disruptor
m_ptrDisruptor = std::make_shared< Disruptor::disruptor<Event> >([]() { return Event(); }
, m_ringBufferSize, m_ptrTaskScheduler);
// 创建事件处理器
for (size_t i = 0; i < m_workerCount; i++)
{
m_workers.push_back(std::make_shared< Worker >());
}
// 绑定
m_ptrDisruptor->handleEventsWithWorkerPool(m_workers);
}
~Producer()
{
stop();
}
void start()
{
m_ptrTaskScheduler->start();
m_ptrDisruptor->start();
}
void push(const Event &event)
{
auto ringBuffer = m_ptrDisruptor->ringBuffer();
auto nextSequence = ringBuffer->next();
(*ringBuffer)[nextSequence] = event;
ringBuffer->publish(nextSequence);
}
protected:
void stop()
{
m_ptrDisruptor->shutdown();
m_ptrTaskScheduler->stop();
}
private:
std::shared_ptr<Disruptor::ThreadPerTaskScheduler> m_ptrTaskScheduler;
std::shared_ptr<Disruptor::disruptor<Event> > m_ptrDisruptor;
std::vector<std::shared_ptr<Disruptor::IWorkHandler<Event> > > m_workers;
std::int32_t m_ringBufferSize{ 1024 };
std::int32_t m_workerCount{ 1 };
std::int32_t m_schedulerCount{ 1 };
};
int main(int argc, char* argv[])
{
timespec t1,t2;
clock_gettime(CLOCK_MONOTONIC, &t1);
if(argc < 4)
return 0;
Producer producer(atoi(argv[1]), atoi(argv[2]));
producer.start();
for (size_t i = 0; i < atoi(argv[3]); i++)
{
producer.push(Event());
}
clock_gettime(CLOCK_MONOTONIC, &t2);
std::cout << "event num:" << argv[3] << std::endl;
std::cout << "worker num:" << argv[2] << std::endl;
std::cout << "buffer size:" << argv[1] << std::endl;
std::cout << "time:" <<
(t2.tv_nsec - t1.tv_nsec) / 1000000 + (t2.tv_sec - t1.tv_sec) * 1000
<< std::endl;
return 0;
}
# DisruptorDemo CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(DisruptorDemo)
set(CMAKE_BUILD_TYPE Release)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")
add_executable(DisruptorDemo ${CMAKE_SOURCE_DIR}/src/DisruptorDemo.cpp)
target_link_libraries(DisruptorDemo Disruptor boost_system boost_thread boost_chrono)
下表记录了在不同Buffer Size和Worker Num时,程序处理2000个事件的总耗时(单位/毫秒,注意以下数均是在Windows平台下试验得到的,如果是在Linux平台下性能会好很多,但其变化趋势基本相同。另外队列不能设置太大,否则测得时间是不准确的,因为队列过大将导致很多数据被压入队列但实际上并没有被处理完毕,主函数就已经达到第二个计时点,因此队列大小相对于事件数量应该是可忽略的):
队列大小 \ 事件处理器数量 | 1 | 5 | 10 | 100 |
---|---|---|---|---|
2 | 11391 | 6859 | 5922 | 7156 |
4 | 11344 | 3391 | 2797 | 3891 |
8 | 12953 | 2250 | 2094 | 2375 |
32 | 11782 | 1937 | 969 | 1297 |
分析表中数据大体能够得出以下结论(仅针对本文的示例程序):
- 事件处理器为1时,队列大小对处理性能几乎没有影响;
- 队列大小与处理性能大体成正比关系;
- 随着事件处理器数量的增加,性能先升后降。
需要注意的是,队列也并非越大越好。因为随着队列的增大,其能够缓冲的事件量也会增加,在面对事件的生成速度大于处理速度时,就会导致大量的事件被缓冲在队列中,从而可能引发事件处理超时的情况;虽然事件处理器数量越大在应对并发量大的情形的时候会表现越好,但同时也会增加系统在线程调度方面的开销,因此选择合适的事件处理器数量也很重要;另外,还应当考虑事件的平均处理时间,如果事件处理耗时较大,应当适当增加事件处理器数量。