from datetime import datetime, timedelta
from pymongo import UpdateOne, ASCENDING
from database import DB_CONN
from stock_util import get_trading_dates, get_all_codes
"""
对日行情数据做进一步的处理:
1. 填充is_trading字段,is_trading用来区分某只股票在某个交易日是否为停牌
2. 填充停牌日的行情数据
3. 填充复权因子和前收
"""
def fill_is_trading_between(begin_date=None, end_date=None):
"""
填充指定时间段内的is_trading字段
:param begin_date: 开始日期
:param end_date: 结束日期
"""
# 获取指定日期范围的所有交易日列表,按日期正序排列
all_dates = get_trading_dates(begin_date, end_date)
# 循环填充所有交易日的is_trading字段
for date in all_dates:
# 填充daily数据集
fill_single_date_is_trading(date, 'daily')
# 填充daily_hfq数据集
fill_single_date_is_trading(date, 'daily_hfq')
def fill_is_trading(date=None):
"""
为日线数据增加is_trading字段,表示是否交易的状态,True - 交易 False - 停牌
从Tushare来的数据不包含交易状态,也不包含停牌的日K数据,为了系统中使用的方便,我们需要填充停牌是的K数据。
一旦填充了停牌的数据,那么数据库中就同时包含了停牌和交易的数据,为了区分这两种数据,就需要增加这个字段。
在填充该字段时,要考虑到是否最坏的情况,也就是数据库中可能已经包含了停牌和交易的数据,但是却没有is_trading
字段。这个方法通过交易量是否为0,来判断是否停牌
"""
if date is None:
all_dates = get_trading_dates()
else:
all_dates = [date]
for date in all_dates:
fill_single_date_is_trading(date, 'daily')
fill_single_date_is_trading(date, 'daily_hfq')
def fill_single_date_is_trading(date, collection_name):
"""
填充某一个日行情的数据集的is_trading
:param date: 日期
:param collection_name: 集合名称
"""
print('填充字段, 字段名: is_trading,日期:%s,数据集:%s' %
(date, collection_name), flush=True)
daily_cursor = DB_CONN[collection_name].find(
{'date': date},
projection={'code': True, 'volume': True, 'index': True, '_id': False},
batch_size=1000)
update_requests = []
for daily in daily_cursor:
# 当日成交量大于0,则为交易状态
is_trading = daily['volume'] > 0
update_requests.append(
UpdateOne(
{'code': daily['code'], 'date': date, 'index': daily['index']},
{'$set': {'is_trading': is_trading}}))
if len(update_requests) > 0:
update_result = DB_CONN[collection_name].bulk_write(update_requests, ordered=False)
print('填充字段, 字段名: is_trading,日期:%s,数据集:%s,更新:%4d条' %
(date, collection_name, update_result.modified_count), flush=True)
def fill_daily_k_at_suspension_days(begin_date=None, end_date=None):
"""
填充指定日期范围内,股票停牌日的行情数据。
填充时,停牌的开盘价、最高价、最低价和收盘价都为最近一个交易日的收盘价,成交量为0,
is_trading是False
:param begin_date: 开始日期
:param end_date: 结束日期
"""
# 当前日期的前一天
before = datetime.now() - timedelta(days=1)
# 找到据当前最近一个交易日的所有股票的基本信息
basics = []
while 1:
# 转化为str
last_trading_date = before.strftime('%Y-%m-%d')
# 因为TuShare的基本信息最早知道2016-08-09,所以如果日期早于2016-08-09
# 则结束查找
if last_trading_date < '2016-08-09':
break
# 找到当日的基本信息
basic_cursor = DB_CONN['basic'].find(
{'date': last_trading_date},
# 填充时需要用到两个字段股票代码code和上市日期timeToMarket,
# 上市日期用来判断
projection={'code': True, 'timeToMarket': True, '_id': False},
# 一次返回5000条,可以降低网络IO开销,提高速度
batch_size=5000)
# 将数据放到basics列表中
basics = [basic for basic in basic_cursor]
# 如果查询到了数据,在跳出循环
if len(basics) > 0:
break
# 如果没有找到数据,则继续向前一天
before -= timedelta(days=1)
# 获取指定日期范围内所有交易日列表
all_dates = get_trading_dates(begin_date, end_date)
# 填充daily数据集中的停牌日数据
fill_daily_k_at_suspension_days_at_date_one_collection(
basics, all_dates, 'daily')
# 填充daily_hfq数据中的停牌日数据
fill_daily_k_at_suspension_days_at_date_one_collection(
basics, all_dates, 'daily_hfq')
def fill_daily_k_at_suspension_days_at_date_one_collection(
basics, all_dates, collection):
"""
更新单个数据集的单个日期的数据
:param basics:
:param all_dates:
:param collection:
:return:
"""
code_last_trading_daily_dict = dict()
for date in all_dates:
update_requests = []
last_daily_code_set = set(code_last_trading_daily_dict.keys())
for basic in basics:
code = basic['code']
# 如果循环日期小于
if date < basic['timeToMarket']:
print('日期:%s, %s 还没上市,上市日期: %s' % (date, code, basic['timeToMarket']), flush=True)
else:
# 找到当日数据
daily = DB_CONN[collection].find_one({'code': code, 'date': date, 'index': False})
if daily is not None:
code_last_trading_daily_dict[code] = daily
last_daily_code_set.add(code)
else:
if code in last_daily_code_set:
last_trading_daily = code_last_trading_daily_dict[code]
suspension_daily_doc = {
'code': code,
'date': date,
'close': last_trading_daily['close'],
'open': last_trading_daily['close'],
'high': last_trading_daily['close'],
'low': last_trading_daily['close'],
'volume': 0,
'is_trading': False
}
update_requests.append(
UpdateOne(
{'code': code, 'date': date, 'index': False},
{'$set': suspension_daily_doc},
upsert=True))
if len(update_requests) > 0:
update_result = DB_CONN[collection].bulk_write(update_requests, ordered=False)
print('填充停牌数据,日期:%s,数据集:%s,插入:%4d条,更新:%4d条' %
(date, collection, update_result.upserted_count, update_result.modified_count), flush=True)
def fill_au_factor_pre_close(begin_date, end_date):
"""
为daily数据集填充:
1. 复权因子au_factor,复权的因子计算方式:au_factor = hfq_close/close
2. pre_close = close(-1) * au_factor(-1)/au_factor
:param begin_date: 开始日期
:param end_date: 结束日期
"""
all_codes = get_all_codes()
for code in all_codes:
hfq_daily_cursor = DB_CONN['daily_hfq'].find(
{'code': code, 'date': {'$lte': end_date, '$gte': begin_date}, 'index': False},
sort=[('date', ASCENDING)],
projection={'date': True, 'close': True})
date_hfq_close_dict = dict([(x['date'], x['close']) for x in hfq_daily_cursor])
daily_cursor = DB_CONN['daily'].find(
{'code': code, 'date': {'$lte': end_date, '$gte': begin_date}, 'index': False},
sort=[('date', ASCENDING)],
projection={'date': True, 'close': True}
)
last_close = -1
last_au_factor = -1
update_requests = []
for daily in daily_cursor:
date = daily['date']
try:
close = daily['close']
doc = dict()
# 复权因子 = 当日后复权价格 / 当日实际价格
au_factor = round(date_hfq_close_dict[date] / close, 2)
doc['au_factor'] = au_factor
# 当日前收价 = 前一日实际收盘价 * 前一日复权因子 / 当日复权因子
if last_close != -1 and last_au_factor != -1:
pre_close = last_close * last_au_factor / au_factor
doc['pre_close'] = round(pre_close, 2)
last_au_factor = au_factor
last_close = close
update_requests.append(
UpdateOne(
{'code': code, 'date': date, 'index': False},
{'$set': doc}))
except:
print('计算复权因子时发生错误,股票代码:%s,日期:%s' % (code, date), flush=True)
# 恢复成初始值,防止用错
last_close = -1
last_au_factor = -1
if len(update_requests) > 0:
update_result = DB_CONN['daily'].bulk_write(update_requests, ordered=False)
print('填充复权因子和前收,股票:%s,更新:%4d条' %
(code, update_result.modified_count), flush=True)
if __name__ == '__main__':
fill_au_factor_pre_close('2015-01-01', '2015-12-31')
fill_is_trading_between('2015-01-01', '2015-12-31')
fill_daily_k_at_suspension_days('2015-01-01', '2015-12-31')