事件驱动
如果自己要设计实现一个事件驱动回测系统,应该要包含哪些组件呢?
事件:事件是系统功能的基本单元。诸如,如“行情”、“信号”、“订单”、“”等,该类型决定了它在事件循环中如何处理。
事件队列:事件队列是一个内存中的Python队列对象,存储了所有触发并待处理的事件。
数据处理:一个抽象基类,为处理历史或实时市场数据提供接口。
策略:一个抽象基类,它提供了接收市场数据并生成相应信号事件的接口。
执行引擎:模拟与经纪公司的连接。从队列中接收订单事件并执行它们。一旦订单被执行,处理器会创建成交事件,描述实际成交的内容,包括费用、佣金和滑点等。
回测:所有这些组件都被封装在一个事件循环中,该循环正确处理所有事件类型,并将它们路由到适当的组件。
先来考虑事件和事件队列。
1 事件
定义各种不同的事件,“行情”、“信号”、“订单”、“定时器”等,该类型决定了它在事件循环中如何处理。
##### 所有事件的基类
class Event(object):
"""Event is base class providing an interface for all subsequent (inherited) events"""
pass
##### 行情事件,行情来的时候触发
class QuoteEvent(Event):
def __init__(self):
self.type = 'eQUOTE'
##### 定时器事件,固定时间触发
class TimerEvent(Event):
def __init__(self):
self.type = 'eTIMER'
##### 信号事件,策略触发,买还是卖
class SignalEvent(Event):
def __init__(self, strategy_id, symbol, datetime, signal_type, strength):
# 事件的类型
self.type = 'eSIGNAL'
# 策略ID
self.strategy_id = strategy_id
# 股票代码
self.symbol = symbol
# 触发时间
self.datetime = datetime
# 信号的具体类型
self.signal_type = signal_type
#self.strength = strength
##### 订单,引擎触发
class OrderEvent(Event):
def __init__(self, symbol, order_type, quantity, direction):
# 事件的类型
self.type = 'eORDER'
# 股票代码
self.symbol = symbol
# 订单类型
self.order_type = order_type
# 订单数量
self.quantity = quantity
# 订单方向
self.direction = direction
def print_order(self):
""" Outputs the values within the Order. """
print(
"Order: Symbol=%s, Type=%s, Quantity=%s, Direction=%s" %
(self.symbol, self.order_type, self.quantity, self.direction) )
##### 成本费用,引擎触发
class CostEvent(Event):
def __init__(self, timeindex, symbol, exchange, quantity, direction, fill_cost, commission=None):
# 事件的类型
self.type = 'eCOST'
# 费用产生时间
self.timeindex = timeindex
# 股票代码
self.symbol = symbol
# 交易所
self.exchange = exchange
# 数量
self.quantity = quantity
# 方向
self.direction = direction
# Calculate commission
if commission is None:
self.commission = self.calculate_ib_commission()
else:
self.commission = commission
def calculate_ib_commission(self):
# 假设默认费用为千分之一
return 0.001
2 事件队列
触发者只需要触发信号到队列,无需关心谁将收到;订阅者只需订阅需要处理的信号,并从事件队列收到回调,无需关心谁触发
# 事件队列分2种,主要区分是否支持优先级
import time
import random
import queue
import threading
##### 2.1 无优先级队列
# 创建一个队列
q = queue.Queue()
# 生产者线程
def producer(q, name,start, end, step):
for i in range(start, end, step):
print(f"{name}生产者生产了 {i}")
q.put(i)
time.sleep(random.random()) # 随机等待时间,模拟生产过程
# 消费者线程
def consumer(q, name):
while True:
item = q.get()
if item is None: # None作为队列结束的信号
break
print(f"{name}消费者消费了 {item}")
time.sleep(random.random()) # 随机等待时间,模拟消费过程
# 创建并启动生产者线程
producer_thread1 = threading.Thread(target=producer, args=(q,"produce1", 1, 5, 2))
producer_thread2 = threading.Thread(target=producer, args=(q,"produce2", 2, 5, 2))
producer_thread1.start()
producer_thread2.start()
# 创建并启动消费者线程
consumer_thread1 = threading.Thread(target=consumer, args=(q,"consum1"))
consumer_thread1.start()
# 等待生产者线程完成生产
producer_thread1.join()
producer_thread2.join()
# 向队列中添加None作为结束信号
q.put(None)
# 等待消费者线程完成消费
consumer_thread1.join()
print("所有线程已完成")
produce1生产者生产了 1
produce2生产者生产了 2
consum1消费者消费了 1
produce1生产者生产了 3
consum1消费者消费了 2
produce2生产者生产了 4
consum1消费者消费了 3
consum1消费者消费了 4
所有线程已完成
##### 2.2 优先级队列
# 创建一个优先级队列
priority_queue = queue.PriorityQueue()
# 任务类,包含任务内容和优先级
class Task:
def __init__(self, priority, content):
self.priority = priority
self.content = content
# 定义小于比较方法,用于优先级队列排序
def __lt__(self, other):
return self.priority < other.priority
# 工作线程
def worker(priority_queue, name ):
while not priority_queue.empty():
# 获取优先级最高的任务
task = priority_queue.get()
print(f"{name}Worker处理任务: {task.content}, 优先级: {task.priority}")
time.sleep(random.random()) # 模拟处理任务的时间
priority_queue.task_done() # 标记任务完成
# 添加任务到优先级队列
def add_tasks(priority_queue,name,start, end, step):
for i in range(start, end, step):
priority = random.randint(1, 10) # 随机生成优先级
content = f"任务{i}"
task = Task(priority, content)
priority_queue.put(task)
print(f"添加任务: {task.content}, 优先级: {task.priority}")
# 创建并启动生产者线程
producer_thread1 = threading.Thread(target=add_tasks, args=(priority_queue,"produce1", 1, 5, 1))
producer_thread1.start()
# 创建并启动两个消费者线程
consumer_thread1 = threading.Thread(target=worker, args=(priority_queue,"consum1"))
consumer_thread1.start()
# 等待生产者线程完成生产
producer_thread1.join()
# 等待消费者线程完成消费
consumer_thread1.join()
print("所有线程已完成")
添加任务: 任务1, 优先级: 5
添加任务: 任务2, 优先级: 1
添加任务: 任务3, 优先级: 5
添加任务: 任务4, 优先级: 8
consum1Worker处理任务: 任务2, 优先级: 1
consum1Worker处理任务: 任务3, 优先级: 5
consum1Worker处理任务: 任务1, 优先级: 5
consum1Worker处理任务: 任务4, 优先级: 8
所有线程已完成
让消费者线程进行随机休眠的目的是生产者线程有充足的时间把任务加入优先队列,从结果可以看到,优先级数值小的任务被优先执行。
#### 2.3 事件引擎
# 任务类,包含任务内容和优先级
class EventTask:
def __init__(self, priority, event):
self.priority = priority
self.event = event
# 定义小于比较方法,用于优先级队列排序 ,
def __lt__(self, other):
# 这里耍个小把戏,默认数值越小,处理级别越高
#return self.priority < other.priority
# 改为数值越大,处理级别越高
return self.priority > other.priority
class EventEngine(object):
"""
事件驱动
变量说明
priority_queue_:私有变量,事件队列
thread_:私有变量,事件处理线程
timer_:私有变量,计时器
handlers_:私有变量,事件处理函数字典
方法说明
run_: 私有方法,事件处理线程连续运行用
process_: 私有方法,处理事件,调用注册在引擎中的监听函数
onTimer_:私有方法,计时器固定事件间隔触发后,向事件队列中存入计时器事件
start: 公共方法,启动引擎
stop:公共方法,停止引擎
register:公共方法,向引擎中注册监听函数
unregister:公共方法,向引擎中注销监听函数
put:公共方法,向事件队列中存入新的事件
"""
#----------------------------------------------------------------------
def __init__(self):
"""初始化事件引擎"""
# 事件队列
self.priority_queue_ = queue.PriorityQueue()
# 事件处理线程
self.task_thread_ = threading.Thread(target = self.run_)
# 计时器,用于触发计时器事件
self.timer_thread_ = threading.Thread(target = self.runTimer_)
self.timerSleep_= 1 # 计时器触发间隔(默认1秒)
# 停止信号
self.stop_ = True
# 这里的handlers_是一个字典,用来保存对应的事件调用关系
self.handlers_ = {}
self.lock = threading.Lock()
#----------------------------------------------------------------------
def run_(self):
"""运行"""
while self.stop_ == False:
if not self.priority_queue_.empty():
# 获取事件的阻塞时间设为1秒
event_task = self.priority_queue_.get(block = True, timeout = 1)
self.process_(event_task.event)
#----------------------------------------------------------------------
def process_(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type in self.handlers_:
# 若存在,则按顺序将事件传递给处理函数执行
# print("self.handlers_", self.handlers_)
for handler in self.handlers_[event.type]:
handler(event)
#----------------------------------------------------------------------
def runTimer_(self):
"""运行在计时器线程中的循环函数"""
while not self.stop_ :
# 创建计时器事件
event = TimerEvent()
task = EventTask(1, event)
# 向队列中存入计时器事件
self.put(task)
# 等待
time.sleep(self.timerSleep_)
#----------------------------------------------------------------------
def start(self):
"""引擎启动"""
# 将引擎设为启动
self.stop_ = False
# 启动事件处理线程
self.task_thread_.start()
# 启动计时器,计时器事件间隔默认设定为1秒
self.timer_thread_.start()
#----------------------------------------------------------------------
def stop(self):
"""停止引擎"""
# 停止
self.stop_ = True
# 计时器
self.timer_thread_.join()
# 等待事件处理线程退出
self.task_thread_.join()
#----------------------------------------------------------------------
def register(self, type_name, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.handlers_[type_name]
except KeyError:
handlerList = []
self.handlers_[type_name] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
#----------------------------------------------------------------------
def unregister(self, type_name, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.handlers_[type_name]
# print("find type_name:", type_name)
# 如果该函数存在于列表中,则移除
if handler in handlerList:
# print("remove handler")
# print("handlerList1", handlerList)
handlerList.remove(handler)
# print("handlerList2", handlerList)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.handlers_[type_name]
except KeyError:
pass
#----------------------------------------------------------------------
def put(self, priority_event):
"""向事件队列中存入事件"""
self.priority_queue_.put(priority_event)
# 处理时间信号
def handle_timer_event(event):
print(f"recv timer event:{event.type}")
# 处理行情信号
def handle_quote_event(event):
print(f"recv quote event:{event.type}")
# 添加任务到优先级队列
def add_quote_tasks(event_engine,start, end, step):
for i in range(start, end, step):
priority = random.randint(1, 10) # 随机生成优先级
content = QuoteEvent()
task = EventTask(priority, content)
event_engine.put(task)
print(f"添加任务: quote, 优先级: {priority}")
event_engine = EventEngine()
event_engine.start()
print(" register quote event---->")
event_engine.register(QuoteEvent().type, handle_quote_event)
print(" register timer event---->")
event_engine.register(TimerEvent().type, handle_timer_event)
# 创建并启动生产者线程
producer_thread1 = threading.Thread(target=add_quote_tasks, args=(event_engine, 1, 3, 1))
producer_thread1.start()
# 等待生产者线程完成生产
producer_thread1.join()
print("cancel register timer event---->")
time.sleep(1)
# 取消时间订阅
event_engine.unregister(TimerEvent().type, handle_timer_event)
# 等待
time.sleep(3)
# 关闭引擎
event_engine.stop()
print("所有线程已完成")
register quote event---->
register timer event---->
添加任务: quote, 优先级: 7
添加任务: quote, 优先级: 6
cancel register timer event---->
recv quote event:eQUOTE
recv quote event:eQUOTE
recv timer event:eTIMER
所有线程已完成
以上完成了一个事件引擎的简单运行机制,接收订阅,对收到的订阅函数回调,进行信号的推送。