这个回测模块以行情为主线,多进程运行,之所以用到多进程,主要是考虑到以后会处理一些复杂的风控和策略运算。
代码有些粗糙,没有运行界面,仅当交流学习之用吧。
首先导入需要用到的模块:
import multiprocessing as mp
import pymongo
import time
import datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import ticker
回测模块写成一个类,里面的函数包括仓位管理、风险管理、收益/持仓计算、信号执行、交易模块主入口(这主要是为了和实盘接口匹配)、行情模块主入口,行情结束后绘制行情和收益曲线。
class testback():
def __init__(self,symbol='RB2210',startdate='20220101',enddate='20220606'):
self.client=pymongo.MongoClient(host='localhost',port=27017,maxPoolSize=1000)
self.db=self.client['qh']
self.col=self.db[symbol]
self.startdate=startdate
self.enddate=enddate
def judgetradetime(self): #实时交易时用的,回测用不上。
festival_lists = ['20220910', '20220911', '20220912', '20221001', '20221002', '20221003', '20221004', '20221005',
'20221006', '20221007']
nowday = datetime.datetime.now().strftime('%Y%m%d')
nowweeday=datetime.datetime.now().weekday()
if nowweeday==5 or nowweeday==6 or nowday in festival_lists:
return False
else:
now=datetime.datetime.now().time()
daopen=datetime.time(9,15,0)
damiddle = datetime.time(10, 10, 0)
damopen=datetime.time(10, 31, 0)
daclose=datetime.time(11, 25, 0)
dpopen=datetime.time(13, 32, 0)
dpclose=datetime.time(14, 55, 0)
nopen=datetime.time(21,5, 0)
nclose=datetime.time(23, 55, 0)
if now>=daopen and now<=damiddle:
return True
elif now>=damopen and now<=daclose:
return True
elif now>=dpopen and now<=dpclose:
return True
elif now >= nopen and now <= nclose:
return True
else:
return False
#仓位管理
def position_management(self,newprice,flaglist):
available_money = flaglist[6]
use_money=available_money*0.1 #使用资金,临时使用
win_rate=0.55 #胜率,以后根据判断
win_lost=2 #赔率
hand_rate=win_rate*win_lost-1 #投入比例
wave_rate=0.01 #行情波动率,以后根据判断
stop_loss_rate=wave_rate/2
plan_position=(use_money*hand_rate/stop_loss_rate)*0.5 #暂时把使用资金减半
plan_num=plan_position//(newprice*10)
return int(plan_num),stop_loss_rate
#风险管理
def risk_management(self,flaglist,mdlist,tradelist):
#准备到点时要自动平仓
#除了有止损也要有止赢策略
origin_money=flaglist[6]
available_money = [] # 开始的时候查询可用资金
profit_list_original=0
time.sleep(0.5)
while True:
time.sleep(0.01)
newprice=mdlist[-1].get('最新价',None)
profit_now,profit_list,_=self.profit_caculator(tradelist,newprice)
available_money.append(origin_money + profit_now+sum(profit_list))
if len(available_money)>50:
available_money=available_money[-10:]
now_available_money=available_money[-1]
flaglist[6]=now_available_money
print("可用余额:",now_available_money)
# 当前持仓收益风险控制
# if profit_now<0 and abs(profit_now)>now_available_money*0.01:
# print('大止损平仓!')
# flaglist[7]=1
# break 实盘中这导致程序的直接退出。
# else:
# continue
#设置风控等级
#收益/持仓计算
def profit_caculator(self,tradelist,newprice):
#以后尽量每次重启后都查询持仓,append到这个list里面。
if list(tradelist) == []:
return 0,[],(3,0,0)
else:
df=pd.DataFrame(list(tradelist))
df['买卖方向']=df['买卖方向'].astype('int')
df['开平标志'] = df['开平标志'].astype('int')
df['数量']=df['数量']*10
df['价值']=df['价格']*df['数量']
df_dif=df.groupby(['报单编号','买卖方向','开平标志'],as_index=False).agg({'数量':'sum','价值':'sum'})
df_row=df_dif.shape[0]
if df_row==1:
position_now=(df_dif['买卖方向'][0],int(df_dif['数量'][0]/10),df_dif['价值'][0])
profit_list=[0]
if df_dif['买卖方向'][0]==0:
profit_now=newprice*df_dif['数量'][0]-df_dif['价值'][0]
else:
profit_now=df_dif['价值'][0]-newprice*df_dif['数量'][0]
return profit_now,profit_list,position_now
else:
df_last_row=df_dif.tail(1).reset_index(drop=True)
if df_last_row['开平标志'][0]==0:
position_now=(df_last_row['买卖方向'][0],int(df_last_row['数量'][0]/10),df_last_row['价值'][0])
if df_last_row['买卖方向'][0]==0:
profit_now=newprice*df_last_row['数量'][0]-df_last_row['价值'][0]
else:
profit_now=df_last_row['价值'][0]-newprice*df_last_row['数量'][0]
df_dif=df_dif.drop(df_row-1)
df_dif['收益辅助']=np.where(df_dif['买卖方向']==1,df_dif['价值'],df_dif['价值']*(-1))
df_dif_close=df_dif[df_dif['开平标志']==3].reset_index(drop=True)
df_dif_open=df_dif[df_dif['开平标志']!=3].reset_index(drop=True)
df_dif_close['收益']=df_dif_close['收益辅助']+df_dif_open['收益辅助']
profit_list=df_dif_close['收益'].to_list()
return profit_now,profit_list,position_now
else:
position_now=(3,0,0)
df_dif['收益辅助'] = np.where(df_dif['买卖方向'] == 1, df_dif['价值'], df_dif['价值'] * (-1))
df_dif_close = df_dif[df_dif['开平标志'] == 3].reset_index(drop=True) #index不一致则无法相加
df_dif_open = df_dif[df_dif['开平标志'] != 3].reset_index(drop=True)
df_dif_close['收益']=df_dif_close['收益辅助']+df_dif_open['收益辅助']
profit_list = df_dif_close['收益'].to_list()
return 0,profit_list, position_now
#信号执行
def trade_signal_manager(self,signal,flaglist,position_now,plan_num):
#time.sleep(0.5)#防止第一笔交易没成交就执行第二笔交易,或许用一个成交信号来控制
if flaglist[1]==1: #交易没有完成的时候不处理任何信号
return
flaglist[5]=position_now[0]
if signal=='b':
if flaglist[5]==3:
flaglist[1:5:1] = 1, '0', '0',plan_num
if flaglist[5]==1:
flaglist[1:5:1] = 1, '0', '3',position_now[1]
if flaglist[5]==0:
pass
if signal=='s':
if flaglist[5]==3:
flaglist[1:5:1] = 1, '1', '0',plan_num
if flaglist[5]==0:
flaglist[1:5:1] = 1, '1', '3',position_now[1]
if flaglist[5]==1:
pass
if signal=='c':
if flaglist[5]==0:
flaglist[1:5:1] = 1, '1', '3',position_now[1]
if flaglist[5]==1:
flaglist[1:5:1] = 1, '0', '3',position_now[1]
if flaglist[5]==3:
pass
# 交易模块主入口,模拟实盘的
def main_trade(self, flaglist, tradelist):
while True:
if flaglist[0] == 0:
continue
else:
if flaglist[1] == 1:
DIRECTION, OFFSET, VOLUME = flaglist[2], flaglist[3], flaglist[4]
tradedict = {'报单编号': str(int(time.time()*10000)), '成交时间': time.time(),
'成交编号': int(np.random.random() * 10000),
'买卖方向': DIRECTION, '开平标志': OFFSET, '价格': flaglist[8],
'数量': VOLUME, '合约代码': 'rb2210'}
tradelist.append(tradedict)
while True: # 轮询直到交易完成再把交易信号置为可用。
_, _, position_now = self.profit_caculator(tradelist, 0)
if OFFSET == '3' and position_now[0] == 3:
break
elif OFFSET == '0' and position_now[0] != 3:
break
else:
continue
flaglist[1] = 0
continue
else:
continue
# 行情模块主入口
def main_mduser(self, mdlist,tradelist,flaglist):
pstartdate=datetime.datetime.strptime(self.startdate,'%Y%m%d')
penddate=datetime.datetime.strptime(self.enddate,'%Y%m%d')
date_space=(penddate-pstartdate).days
for d in range(date_space):
#强化学习策略q_table用到的状态位,共9个状态
state_list =[0,0,0,0,0,0,0,0,0] #ma5vma10,ma5vma40,bulin,position(10,30),vol_position(10,30),now_bulin,now_ma
now_date=pstartdate+datetime.timedelta(days=d)
now_start=time.mktime(now_date.timetuple())
now_end=now_start+86400
history_start=now_start-6912000 #60天前的时间戳
history_end=now_start
now_find = self.col.find({"时间戳":{"$gte":now_start,"$lte":now_end}})
history_find=self.col.find({"时间戳":{"$gte":history_start,"$lte":history_end}})
df_history=pd.DataFrame(history_find)
df_history=df_history[['开盘', '成交量', '持仓量', '收盘', '日期', '最低', '最高', '结算价']]
df_history["日期"]=pd.to_datetime(df_history["日期"])
df_history[['开盘', '成交量', '持仓量', '收盘']] = df_history[['开盘', '成交量', '持仓量', '收盘']].astype('float')
df_history=df_history.set_index("日期")
df_history = df_history.resample("D").mean().dropna()
df_history = df_history.assign( #我考虑能不能把所有回测到期日之前的线都拿出来后切片,但那样会不会涉及到未来信息。
ma5=df_history['收盘'].rolling(5).mean(),
ma10=df_history['收盘'].rolling(10).mean(),
ma40=df_history['收盘'].rolling(40).mean(),
max10=df_history['收盘'].rolling(10).max(),
max30=df_history['收盘'].rolling(30).max(),
min10=df_history['收盘'].rolling(10).min(),
min30=df_history['收盘'].rolling(30).min(),
vol_ma10=df_history['持仓量'].rolling(10).mean(),
vol_max10=df_history['持仓量'].rolling(10).max(),
vol_max30=df_history['持仓量'].rolling(30).max(),
vol_min10=df_history['持仓量'].rolling(10).min(),
vol_min30=df_history['持仓量'].rolling(30).min(),
std=df_history['收盘'].std() * 0.718,
up_bullin=lambda x: x["ma5"] + x["std"], # 这里只能用lambda函数,不能直接用df_history["ma10"]
down_bullin=lambda x: x["ma5"] - x["std"]
)
history_piece=df_history.iloc[df_history.shape[0]-1]
if history_piece["ma5"] >= history_piece["ma10"]:
state_list[0] = "b" # 金叉
else:
state_list[0] = "s"
if history_piece["ma5"] >= history_piece["ma40"]:
state_list[1] = "b" # 金叉
else:
state_list[1] = "s"
if history_piece["收盘"] >= history_piece["up_bullin"]:
state_list[2] = "h"
elif history_piece["收盘"] <= history_piece["down_bullin"]:
state_list[2] = "l"
else:
state_list[2] = "m"
if history_piece["收盘"]>=history_piece["max10"]:
state_list[3]="h"
elif history_piece["收盘"]<history_piece["max10"] and history_piece["收盘"]>=history_piece["ma10"]:
state_list[3] = "mh"
elif history_piece["收盘"]>history_piece["min10"] and history_piece["收盘"]<history_piece["ma10"]:
state_list[3] = "ml"
else:
state_list[3] = "l"
if history_piece["收盘"]>=history_piece["max30"]:
state_list[4]="h"
elif history_piece["收盘"]<history_piece["max30"] and history_piece["收盘"]>=history_piece["ma40"]:
state_list[4] = "mh"
elif history_piece["收盘"]>history_piece["min30"] and history_piece["收盘"]<history_piece["ma40"]:
state_list[4] = "ml"
else:
state_list[4] = "l"
if history_piece["持仓量"]>=history_piece["vol_max10"]:
state_list[5]="h"
elif history_piece["持仓量"]<history_piece["vol_max10"] and history_piece["持仓量"]>=history_piece["vol_ma10"]:
state_list[5] = "mh"
elif history_piece["持仓量"]>history_piece["vol_min10"] and history_piece["持仓量"]<history_piece["vol_ma10"]:
state_list[5] = "ml"
else:
state_list[5] = "l"
if history_piece["持仓量"]>=history_piece["vol_max30"]:
state_list[6]="h"
elif history_piece["持仓量"]<history_piece["vol_max30"] and history_piece["持仓量"]>=history_piece["vol_ma10"]:
state_list[6] = "mh"
elif history_piece["持仓量"]>history_piece["vol_min30"] and history_piece["持仓量"]<history_piece["vol_ma10"]:
state_list[6] = "ml"
else:
state_list[6] = "l"
for f in now_find:
available_money = 1000000
f['最新价'] = float(f['收盘'])
profit_now, profit_list, position_now = self.profit_caculator(tradelist, f['最新价'])
print("行情里面当前盈利:",profit_now)
available_money += profit_now+sum(profit_list)
f['余额'] = available_money
print("行情里面余额:",f['余额'])
f['持仓'] = position_now[0]
f["history_state"]=state_list
mdlist.append(f)
# if len(mdlist)>300: #想防止mdlist过大可逐渐删除部分,但不能用切片选择,因为这样相当于用一个局部mdlist替换了全局的mdlist,其他进程的mdlist就得不到更新了。
# del mdlist[:100]
print(f"时间:{time.strftime('%Y%m%d %H:%M:%S',time.localtime(f['时间戳']))},最新价:{f['最新价']}")
time.sleep(0.02) #行情太慢有时候捕抓不到交易状态,因为一个行情tick可能有两个交易状态,平仓后适合马上开仓,这要看策略。
flaglist[9]=1 #行情结束标志,开始画图
df=pd.DataFrame(list(mdlist))
df=df[['最新价','余额','持仓','日期']]
df['日期']=pd.to_datetime(df['日期'])
price_max=df['最新价'].max()
price_min=df['最新价'].min()
df['最新价归一'] =df['最新价'].apply(lambda x:(x-price_min)/(price_max-price_min))
balance_max=df['余额'].max()
balance_min=df['余额'].min()
df['余额归一']=df['余额'].apply(lambda x:(x-balance_min)/(balance_max-balance_min))
df['均价28']=df['最新价归一'].rolling(28).mean()
df['均价14'] = df['最新价归一'].rolling(14).mean()
df['差值']=df['最新价归一'].rolling(10).std()
df['上界'] = df['均价14'] + df['差值']
df['下界'] = df['均价14'] - df['差值']
df['持仓信号']=df['持仓'].shift().fillna(3)-df['持仓'] #显示持仓的上一笔才是真正的交易
ax = df[['最新价归一', '余额归一', '均价28', '均价14']].plot(figsize=(160, 90), grid=True, rot=45)
def format_date(x, pos=None):
thisind = np.clip(int(x + 0.5), 0, df.shape[0] - 1)
return df["日期"][thisind].strftime("%m/%d %H:%M")
ticks = np.arange(0, df.shape[0], int(df.shape[0]*0.05))
ax.set_xticks(ticks)
ax.xaxis.set_major_formatter(ticker.FuncFormatter(format_date))
for i,v in df[['最新价归一']].iterrows():
if df.loc[i,'持仓信号']==2 or df.loc[i,'持仓信号']==-1:
plt.annotate("卖开",(i,v['最新价归一']),(i,v['最新价归一']+0.05),arrowprops=dict(arrowstyle="->"),fontsize=12)
elif df.loc[i,'持仓信号']==-2:
plt.annotate("买平", (i, v['最新价归一']),(i,v['最新价归一']+0.05),arrowprops=dict(arrowstyle="->"),fontsize=12)
elif df.loc[i,'持仓信号']==3 or df.loc[i,'持仓信号']==1:
plt.annotate("买开", (i, v['最新价归一']),(i,v['最新价归一']+0.05),arrowprops=dict(arrowstyle="->"),fontsize=12)
elif df.loc[i,'持仓信号']==-3:
plt.annotate("卖平", (i, v['最新价归一']),(i,v['最新价归一']+0.05),arrowprops=dict(arrowstyle="->"),fontsize=12)
else:
pass
df.to_excel("./backtest1.xlsx") #数据存储到本地
plt.show()
def start(self): #主进程
manager = mp.Manager()
mdlist = manager.list()
tradelist = manager.list()
flaglist = manager.list([1, 0, '0', '0', 1, 0, 1000000, 0,
0,0]) # 交易标识[0启停进程,1等待交易,2买卖方向,3开平仓,4交易量,5仓位情况,6可用资金,7止损标识,
# 8最新价(只在回测用),9保存qtable标志]
pmd = mp.Process(target=self.main_mduser, args=(mdlist,tradelist,flaglist))
ptrade = mp.Process(target=self.main_trade, args=(flaglist, tradelist),daemon=True)
pstrategy = mp.Process(target=strategy_RLlearning, args=(flaglist, mdlist, tradelist),daemon=True)
prisk_management = mp.Process(target=self.risk_management, args=(flaglist, mdlist, tradelist),daemon=True)
pmd.start()
time.sleep(0.01)
ptrade.start()
time.sleep(0.01)
prisk_management.start()
time.sleep(0.01)
pstrategy.start()
pmd.join()
以下为策略部分,简单的强化学习策略,把行为值的更新简化了,交易信号执行方面,只要不持仓就执行交易信号,遇到止盈止损信号就马上平仓。
def strategy_RLlearning(flaglist,mdlist,tradelist):
# signal:"b","s","c"
RL=qlearning_state()
profit_now_list=[]
state_hodelist=[]
sstate_list=[]
while True:
if len(mdlist)>15:
df=pd.DataFrame(list(mdlist)) #不知道这里为什么要加一个list
newprice = mdlist[-1].get('最新价', None)
statelist = mdlist[-1].get('history_state', None)
df_mean = df['最新价'].rolling(5).mean()[df.shape[0] - 1]
df_mean20=df['最新价'].rolling(15).mean()[df.shape[0] - 1]
df_std = df['最新价'].rolling(20).std()[df.shape[0] - 1] # 单单是标准差的话就太小了,这关系到状态位,不能随便改。
cut_loss=newprice*0.005
flaglist[8]=newprice #只在回测时用
profit_now, profit_list,position_now = testback().profit_caculator(tradelist,newprice)
plan_num, stop_loss_rate=testback().position_management(newprice,flaglist)
flaglist[5] = position_now[0]
if profit_now !=0:
profit_now_list.append(profit_now)
if len(profit_now_list)>200:
profit_now_list=profit_now_list[-200:]
print('当前持仓收益:',profit_now)
if newprice>=(df_mean + df_std):
statelist[7]="h"
elif newprice<=(df_mean - df_std):
statelist[7] = "l"
else:
statelist[7] = "m"
if df_mean>=df_mean20:
statelist[8]="b"
else:
statelist[8] = "s"
state="".join(statelist)
action=RL.choose_action(state)
if position_now[0]==3:
action_hold=action
state_hold=state
sstate_list.append(state)
state_hodelist.append(state_hold)
testback().trade_signal_manager(action, flaglist, position_now, plan_num)
continue
if profit_now<-cut_loss*position_now[1]*10:
signal = 'c'
testback().trade_signal_manager(signal, flaglist, position_now, plan_num)
reward=-30
RL.learn(state_hold, action_hold, reward)
profit_now_list = []
continue
if len(profit_now_list)>10 and np.max(profit_now_list)>cut_loss*position_now[1]*10*2 and profit_now<=np.max(profit_now_list)*0.8: #回撤20%
signal = 'c'
testback().trade_signal_manager(signal, flaglist, position_now, plan_num)
reward = 10
RL.learn(state_hold, action_hold, reward)
profit_now_list = []
continue
if flaglist[9]==1:
qtable=RL.q_table
qtable.to_excel("./qtable.xlsx")
flaglist[9]=0
最后是程序入口:
if __name__ == '__main__':
test=testback(symbol='RB2211',startdate='20220726',enddate='20220729')
test.start()