vnpy源码阅读-3.EventEngine

from collections import defaultdict  # 字典默认类型
from queue import Empty, Queue  # 队列
from threading import Thread  # 多线程
from time import sleep
from typing import Any, Callable, List  # 类型提示

先看第一个导入的模块,defaultdict类初始化时传入一个类型list, str, int, dict,当字典中找不到对应的key时返回传入的类型,看示例

d = defaultdict(dict)
d[1] = 'one'

print(d[1])
print(d[2])
# return
one
{}

导入的包了解了,接着看代码吧

Evnet

class Event:

    def __init__(self, type: str, data: Any = None):
        self.type: str = type
        self.data: Any = data

定义一个Evnet的类,有type和data参数

init

def __init__(self, interval: int = 1):
        """
        默认情况下,计时器事件每1秒生成一次
        """
        self._interval = interval  # 时间间隔
        self._queue = Queue()  # FIFO队列,先进先出
        self._active = False    # 开关
        self._thread = Thread(target=self._run)  # run方法线程
        self._timer = Thread(target=self._run_timer)  #run_timer方法线程
        self._handlers = defaultdict(list)  # 处理列表 {'type': []}
        self._general_handlers = []  # 通用处理列表

初始化的参数中,interval和timer用不到可以不管,其他的看注释能明白作用

_run

    def _run(self) -> None:
        """
        从队列中获取事件,然后对其进行处理。
        Get event from queue and then process it.
        """
        while self._active:
            try:
                event = self._queue.get(block=True, timeout=1)  # 从队列取出事件进行处理,先进先出
                self._process(event)
            except Empty:
                pass

EventEngine启动后,会不停从队列_queue中get,当_queue不为空时就把队列中等待的任务取出交给_process方法去处理

_process

    def _process(self, event: Event) -> None:
        """
        First ditribute event to those handlers registered listening
        to this type.

        Then distrubute event to those general handlers which listens
        to all types.
        """
        if event.type in self._handlers:
            [handler(event) for handler in self._handlers[event.type]]

        if self._general_handlers:
            [handler(event) for handler in self._general_handlers]

根据event.type分配到不同的方法去处理

_run_timer

    def _run_timer(self) -> None:
        """
        按间隔(秒)睡眠,然后生成计时器事件。
        Sleep by interval second(s) and then generate a timer event.
        """
        while self._active:
            sleep(self._interval)
            event = Event(EVENT_TIMER)
            self.put(event)

可以暂时不用管,用不上

register

    def register(self, type: str, handler: HandlerType) -> None:  # handler是一个方法
        """
        为特定事件类型注册新的处理程序函数。每一个事件类型只能注册一次函数。 
        Register a new handler function for a specific event type. Every
        function can only be registered once for each event type.
        """
        handler_list = self._handlers[type]
        if handler not in handler_list:
            handler_list.append(handler)

参数里HandlerType的定义,callable是一个回调函数,可以看成func([event], None)

HandlerType = Callable[[Event], None]

当我们知道handler是一个方法,再看下订阅tick示例,就清晰多了。当我们注册一个关键字'tick',self._handlers中没有’tick'时,则返回一个[],接着就会把on_tick方法注册进去,此时self._handlers = {'tick': [on_tick]}。
下面代码中event_engine.put(event)后,event被添加到_queue,并在_run方法被get到_process方法,最后找到self._handlers['tick']对应的回调函数,到此事件结束。

def on_tick(event):
    # 处理tick数据
     pass

if __name__ == '__main__':
    event_engine = EventEngine()
    event_engine.start()
    event_engine.register('tick', on_tick)

    tick_data = get_tick(symbol)  # {'open': 1, 'close':0.8}
    event = Event(type='tick', data=tick_data)
    event_engine.put(event) 

register_general

    def register_general(self, handler: HandlerType) -> None:
        """
        Register a new handler function for all event types. Every
        function can only be registered once for each event type.
        """
        if handler not in self._general_handlers:
            self._general_handlers.append(handler)

用法和register差不多,只是少了type,暂时没找到用处,一般都是用的register

代码中还有start、put、unregister等方法,因为阅读难度不大就不拿出来了。

总结

EvnetEngine的用法大致就是:
接收到事件(如订阅、点击、接收数据)-->推送到事件引擎-->引擎从队列中取出事件交给_process分配-->_process根据register注册的对应类型分配处理方法

遗留问题

1.callable 回调函数的用法
2.register什么时候进行注册?
3.register_general要怎么用?
4.queue源码阅读
5.thread源码阅读

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354

推荐阅读更多精彩内容