在我的 19 年的年度学习计划中,量化交易是其中的一项大的工程,是自己下定决心要攻克的一个难题。应该说 19 年是我的量化学习年,为了强化自己的决定,我付费在年初参加了邢不行的量化学习课程,到现在已经过了一个多月,自己从没有摸过 python , 到能用 python 编写简单的量化程序。除了认真学习课程外,也下了很大的功夫和精力。
我的第一个布林策略也已经运行了一段时间了,程序的改进也一直在进行。从测试的情况看,效果应该还不错。
在改进程序的过程中,我对最初的量化程序进行了重构,尽量将量化策略和相关的交易所代码分离。这样一个量化策略写完,只要适配一下交易所的代码,就可以在另外的交易所上运行了。
在编程的过程中,对于一些常见问题的解决方法,有了一些体会。我把它写下来。
- 程序运行的参数全部使用配置文件,这样运行程序时,使用不同的配置文件就可以了。
oscardeMacBook-Pro:oscbot oscar$ ./okex_boll.py
Usage: okex_boll.py -c 策略配置文件
okex 布林线策略
okex_boll.py: error: 请指定策略配置文件!
我的主程序代码:
#!/usr/bin/env python
# -*-coding:utf-8-*-
from bots.adaptation.trend_okex import TrendOkex
from bots.exchanges.okex_level import OkexLevel
from bots.algo.boll import BollAlgo
from bots.util.log import config_log
from bots.util.util import check_network, get_config
import logging
from optparse import OptionParser
parser = OptionParser(usage="%prog -c 策略配置文件 \nokex 布林线策略\n")
parser.add_option("-c", "--configfile",
action="store",
type='string',
dest="configfile",
help="指定策略运行配置文件"
)
(options, args) = parser.parse_args()
if not options.configfile:
parser.error('请指定策略配置文件!')
options = get_config(options.configfile)
config_log(options['name'], options['log'])
if not check_network():
print('网络错误,无法连接 google.com 请先检查网络或设置代理')
exit(1)
logging.info('use config:%s', options['algo'])
ok = OkexLevel(options['algo']['trade'], options['algo']['base'], auths=options['api'])
tokex = TrendOkex(ok)
boll_algo = BollAlgo(tokex, para=[options['algo']['n'], options['algo']['m']], name=options['name'],
interval=options['algo']['interval'])
boll_algo.run()
我把所有策略的配置文件放在程序的 configs 目录下,运行产生的 log ,放在 logs 目录下。
配置文件示例:
{
"name": "okex_boll",
"log": "./logs/okex_boll.log",
"algo": {
"trade": "eth",
"base": "usdt",
"interval": 30,
"n": 200,
"m": 2
},
"api": {
"name": "okex",
"key": "**************",
"secret": "******************",
"pass": "***********"
}
}
上面的布林参数我已经改过了,请不要照抄使用!
- 程序运行的 log 是非常重要的,在程序有问题的时候,我们需要它来查找 bug.
以下是配置 log 的代码
def config_log(name='oscbot',logfile=''):
"""
设置 log
:param name:
:return:
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO) # Log等级总开关
logger.name = name
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
# log to console
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
# log to file
if logfile == '':
logfile = './' + name + '.log'
fh = logging.FileHandler(logfile, mode='a')
fh.setLevel(logging.INFO) # 输出到file的log等级的开关
fh.setFormatter(formatter)
logger.addHandler(fh)
# log to 钉钉
config = json_config('config.json')
dd = DingDing(robot_id=config['dingding']['robot_id'])
formatter = logging.Formatter('%(name)s - %(message)s')
dd.setFormatter(formatter)
logger.addHandler(dd)
- 在写交易所的代码过程中,下单取帐户数据要考虑失败重试,我使用 tenacity 库使得这部分的代码不会太繁琐。
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(wait=wait_exponential(multiplier=1, max=10), reraise=True, stop=stop_after_attempt(6))
def get_kicker(self):
try:
return self.spotAPI.get_specific_ticker(self.instrument_id)
except Exception as e:
logging.error('取末成交订单失败:%s', e, extra={'dingding': True})
logging.error(e, exc_info=True)
raise e
需要了解的话,可以搜索一下它的用法。
- 为了加快速度,很多情况下从交易所取得的数据,可以缓存下来,供以后使用。所以我在代码中使用了 lru_cache
from functools import lru_cache
@lru_cache(maxsize=32)
@retry(wait=wait_exponential(multiplier=1, max=10), reraise=True, stop=stop_after_attempt(6))
def level_config_info(api,instrument_id):
try:
return api.get_specific_config_info(instrument_id)
except Exception as e:
logging.warning('取杠杆配置信息报错:%s', e, extra={'dingding': True})
logging.error(e, exc_info=True)
raise e
# 杠杆配置信息
def level_info(self,refresh=False):
if refresh:
level_config_info.cache_clear()
return level_config_info(self.levelAPI,self.instrument_id)