kungfu系统学习(一)--探索低延迟

此系列用来记录学习kungfu系统的点点滴滴


介绍

kungfu系统是知乎大神董可人团队做的高频交易系统,后端使用c++11/17,前端使用vue/nodejs开发,主要特点包括低延迟交易、开放的策略编写方式、友好的使用方式、跨平台运行、灵活的扩展接口,具体介绍不多说,可以看github上的介绍

探索低延迟 从易筋经开始

接下来分析yijinjing的源码

1、/yijingjing/include/kungfu/common.h

这里定义了FORWARD_DECLARE_PTR和DECLARE_PTR作为smart ptr的宏定义,在其他代码中,大量使用到这个宏定义。从代码上看,FORWARD_DECLARE_PTR引用了DECLARE_PTR,并未做太多处理。而DECLARE_PTR定义了typedef std::shared_ptr<X> X##_ptr。接下来分析其他代码时要检查两者的区别。

2、/yijingjing/include/kungfu/yijinjing/common.h
  • class event,namespace yijinjing
    基类,所有成员函数都是虚函数
    有个data()的内联函数,使用了reinterpret_cast,做类型的转换。
    同时声明了DECLARE_PTR(event),即event_ptr相当于std::shared_ptr<event>的定义。
  • class publisher,namespace yijinjing
    跟发布相关的接口基类
    也声明了DECLARE_PTR(publisher)
  • class observer,namespace yijinjing
    跟观察者相关的接口基类
    也声明了DECLARE_PTR(observer)
  • 几个枚举类型,namespace data
    mode:应该是处理模型,live是实盘,data是查询数据,replay是回看,backtest是回测
    category:MD是行情,TD是交易,STRATEGY是策略,SYSTEM是系统
    layout:数据存储方式?JOURNAL,SQLITE,NANOMSG,LOG
  • class locator,namespace data
    数据定位器的基类
    包括了get_env(获取环境变量),layout_dir(数据存储目录),layout_file(数据存储的文件名,参数包括了下面的location类),list_page_id(从page列表中获取id?)
  • class location,namespace data
    使用了std::enable_shared_from_this,具体分析参见知乎上的一个问题,大概意思是防止循环引用,因为class location包含了class locator的智能指针,而该指针的成员函数参数包括location的智能指针,例如layout_dir。
    属性包括了mode,category两个枚举类型值,也有group,name两个string值,还有uname,赋值方式是category/group/name/mode,uid是把uname进行hash得出来的字符串。
    当然也包含了locator数据定位器的智能指针。
  • namespace rx
    应该是跟rxcpp相关的信息
    ReactiveX就是”观察者模式+迭代器模式+函数式编程”,它扩展了观察者模式,通过使用可观察的对象序列流来表述一系列事件,订阅者进行占点观察并对序列流做出反应(或持久化或输出显示等等);借鉴迭代器模式,对多个对象序列进行迭代输出,订阅者可以依次处理不同的对象序列;使用函数式编程思想(functional programming),极大简化问题解决的步骤。
    rxcpp是ReactiveX用c++语言实现的版本
3、/yijingjing/include/kungfu/yijinjing/journal/common.h

接下来到journal了。journal是易筋经的数据模型,在知乎文章有简单介绍,接下来几个源码分析都是围绕这个来进行。首先是common.h。
定义了frame,page,journal,reader,writer的智能指针。

4、/yijingjing/include/kungfu/yijinjing/journal/frame.h
  • struct frame_header
    定义frame的头部信息,包括length(整个frame的总长度,包括frame的头部信息和数据体的长度),header_length(头部信息长度),gen_time(该frame的生成时间),trigger_time(该frame的触发时间,用于延迟统计),msg_type(数据体的类型),source(该frame的来源),dest(该frame的目的地)

  • class frame
    继承了event,可以看出class event的大多数方法跟frame_header的属性有对应关系。
    包含frame_header属性
    友元类是journal和writer,因为private里包含了很多set属性方法
    注意frame.h并没有frame.cc

5、/yijingjing/include/kungfu/yijinjing/journal/page.h
  • struct page_header
    同样,定了一个struct page_header结构体,包括了version(版本),page_header_length(page头部信息长度),page_size(page大小),frame_header_length(frame头部信息长度,为什么这里也要有这个信息?不是在frame里定义了吗?),last_frame_position(最后一个frame的位置)等属性

  • class page
    包含了location的智能指针,page_id,lazy_(false代表低延迟?),size_(跟page_header的page_size是什么关系?),page_header(头部信息),还有dest_id_(跟frame的dest是什么关系?)
    以下是page的结构图,摘自知乎文章

    image

    可以看到,page有连续的frame组成,所以class page里包含了对frame的操作,注意这里连续的frame并没有形成一个链表。
    接下来是跟mmap相关的操作。

  • page::load
    page_ptr page::load(const data::location_ptr &location, uint32_t dest_id, int page_id, bool is_writing, bool lazy)
    首先根据location数据定位器的数据类型和dest_id来获取该page的大小page_size,然后通过get_page_path获取mmap映射文件的路径,注意调用了python代码

def layout_dir(self, location, layout):
  mode = pyyjj.get_mode_name(location.mode)
  category = pyyjj.get_category_name(location.category)
  p = os.path.join(self._home, category, location.group, location.name, pyyjj.get_layout_name(layout), mode)
  if not os.path.exists(p):
    os.makedirs(p)
  return p

def layout_file(self, location, layout, name):
  return os.path.join(self.layout_dir(location, layout), "{}.{}".format(name, pyyjj.get_layout_name(layout)))

然后根据上述获取的page_size,path,还有参数is_writing,lazy去调用util/mmap.cpp的load_mmap_buffer代码,申请mmap。这里进入到load_mmap_buffer代码中。首先根据如果is_writing为真或lazy为false,则说明是master。通过open返回文件句柄,master的话则是read and write,否则是只读。同时,如果是master,则通过lseek(fd, size - 1, SEEK_SET)把读写位置移到末尾,同时测试是否能写入。根据mmap开辟共享内存空间,将首指针返回。
这里就回到了page.cpp的load方法中,返回的首指针即address。
接下来把address地址赋给page_header结构体,作为page对象的属性。auto header = reinterpret_cast<page_header *>(address);
接下来初始化page_header。
返回将上述元素new一个page对象,返回该page对象的智能指针。

  • page::find_page_id
    int page::find_page_id(const data::location_ptr &location, uint32_t dest_id, int64_t time)
    首先根据location定位器和dest_id去列举所有page_ids。这里同样是通过journal.py的list_page_id获取。包括了文件名正则表达式。
def list_page_id(self, location, dest_id):
  page_ids = []
  for journal in glob.glob(os.path.join(self.layout_dir(location, pyyjj.layout.JOURNAL), hex(dest_id)[2:] + '.*.journal')):
  match = JOURNAL_LOCATION_PATTERN.match(journal[len(self._home) + 1:])
  if match:
    page_id = match.group(6)
    page_ids.append(int(page_id))
  return page_ids

接下来就逐个加载page,如果该page的begin_time(也就是该page第一个frame的gen_time)小于参数time,则返回该page的id(为什么是id,这个id用在什么地方?)

6、/yijingjing/include/kungfu/yijinjing/journal/journal.h
  • class journal
    包括了location定位器的智能指针,dest_id_(又来了,这里也有dest_id_),is_writing_(对应page?),lazy_(对应page?),current_page_,frame_(当前的frame?),page_frame_nb_(不知,需要看代码)

  • class reader
    包括了lazy_属性,journal的普通指针*current_,还有一个journal_ptr智能指针的vector journals_。
    class reader有join方法,此方法是把多个journal加入到class reader的journals_里,根据location数据定位器的uid和dest_id确定唯一的journal。如果没有重复的journal,则把新建一个journal对象放到journals_最后,同时对这个journal户必须进行seek_to_time操作,在seek_to_time里加载page。最后是sort。实际上并没有排序,只是从journals_里选出当前frame的生成时间最小的journal,赋给current_属性。

  • class writer
    包含了一个mutex锁writer_mtx_(为什么会有锁,低延迟系统是不会有锁的,接下来会研究),一个journal智能指针journal_,frame_id_base_(不清楚?),publisher_ptr智能指针publisher_(终于看到跟通信相关的类了,接下来会研究。),size_to_write_(待写入的数据大小)

    • writer::writer(const data::location_ptr& location, uint32_t dest_id, bool lazy, publisher_ptr publisher) : publisher_(std::move(publisher)), size_to_write_(0)
      根据location数据定位器的uid和dest_id计算出frame_id_base_,然后新建一个journal对象的智能指针,通过journal::seek_to_time加载page和定位current_page和current_frame等信息。注意seek_to_time的time是当前时间。

    • void writer::close_page(int64_t trigger_time)
      在writer::open_frame中调用。参数是trigger_time。首先获取当前page的指针,然后加载下一个page,最后根据之前获取的当前page的指针信息新建一个frame对象last_page_frame,作为上一个page的last_frame,注意这个frame的data_length为零。然后把这个frame set到past_page里。

    • template<typename T> T &open_data(int64_t trigger_time, int32_t msg_type)
      为了研究open_data是如何使用的,随便找一段代码。这里就从wingchun里获取一段代码,/wingchun/include/kungfu/wingchun/msg.h的class MsgWriter类的void write_data(int msg_type, const std::string &json_str)方法。
      -> 这里调用了writer_->open_data方法,trigger_time为零,msg_type是msg::type::Quote,相当于获取了journal_的current_page_的current_frame_(注意,本次没有新增frame,新的frame在新建page或填完frame的data后会自动新建,接下来会说。)。并在data方法中将当前frame地址加上data_frame大小后的地址开始开辟大小为Quote大小的空间,通过reinterpret_cast。这样就在当前frame开辟了一个空间存放Quote数据。
      ->调用from_json方法,把json数据一点点的写入到Quote对象里
      ->然后调用writer::close_data方法,重置size_to_write属性为零。然后调用writer::close_frame方法,参数是Quote的大小。
      ->进入writer::close_frame方法,第一步是获取当前的frame,然后获取下一个frame的地址next_frame_address,该地址是当前frame的地址加上data_frame的大小和Quote对象的大小,并判断是否超出current_page_的边界。然后在next_frame_address开始开辟一段大小为frame_header大小的空间。然后操作当前frame,也就是已写入数据的最新的frame,更新gen_time为当前时间,更新data_length为Quote大小。然后把这个frame的地址作为current_page_的上一个frame的地址。接下来调用journal::next方法,通过frame::move_to_next方法更新journal的当前frame current_frame_为上述开辟的空间首地址。
      ->接着解锁writer_mtx,因为再open_data时通过open_frame加了锁。
      ->最后通过publisher发布出去。这里待研究,发布了什么东西?怎么发布?有什么接收者?publisher实例在哪里创建?

    • frame_ptr writer::open_frame(int64_t trigger_time, int32_t msg_type, uint32_t data_length)
      首先判断两个frame_header的大小加上data_length必须小于当前page current_page_的大小。接着通过try_lock来判断锁的情况,如果没有获得锁,则计时,超时退出。否则获得锁。进入下一步操作。如果当前journal_的当前frame加上将要加上的frame_header和data大小大于journal_当前page的边界,则调用close_page,翻到新的page,并给上一个page set_last_frame。然后获取最新的frame,更新属性,注意source是journal_的location数据定位器的uid,dest是journal_的dest_id,保持一致。但是这里没有set data_length。

7、/yijingjing/src/journal/journal.cpp

最重要的方法是void journal::seek_to_time(int64_t nanotime)这里给定一个纳秒时间nanotime,然后根据location_,dest_id_获取第一个比nanotime的page,如果当前的page不等于获取出来的page(通过page_id确定比较),则把当前page current_page_加载为获取出来的page,同时把frame_设置为当前page的第一个frame的address,也把page_frame_nb_设置为零。
然后判断current_page_是否为满和时间是否小于nanotime,如果都满足则不断的把frame_换到下一个,或者加载下一个page。从而实现这篇文章里提到的(可能是吧?)

在每次读写进行到 Page 边界的时候,易筋经需要小心进行一系列细节操作,使得访问者可以无缝的切换到下一个 Page,涉及到诸如调用 mmap 创建新的内存映射文件,对内存映射文件进行预处理等,这些操作往往相当耗时(毫秒级别),我们无法在实时读写数据时承受这个负担。易筋经的解决方案,是在后台启动并维护一个 PageEngine 进程,该进程负责提前载入缓存一些备用 Page,在需要时便可以立刻交付使用。在这种结构设计下,由于我们是夸进程的申请、分配、释放资源,所以需要多做一些安保工作,确保资源不会泄漏。PageEngine 内部会小心的记录申请者的进程编号 pid ,定期查询客户进程是否仍然健康存在,发现僵尸进程则会对其申请的资源进行释放操作。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容