发现jqdata数据方便,但技术因子是收费项目,就手撸几个常见技术指标以备用
1- macd
def cal_ema(df, N):
a = 2/(N+1)
b = pd.DataFrame(columns = ['close'], index=df.index)
for i in range(len(df)):
if i == 0:
b.iloc[i] = df['close'].iloc[i]
else:
b.iloc[i] = a * df['close'].iloc[i] + (1-a) * b.iloc[i-1]
return b
def cal_dea(df, short_t=12, long_t=26 ,avg_t=9):
ema_short = cal_ema(df, short_t)
ema_long = cal_ema(df, long_t)
diff = ema_short - ema_long
df['dea'] = cal_ema(diff, avg_t)
return df
2- rsi相对强弱指标
通过比较一段时期内的平均收盘涨数和平均收盘跌数来分析市场买沽盘的意向和实力
N日RS=[A÷B]×100%
A.....N日涨幅之和
B.....N日跌幅之和
N日RSI=A/(A+B)×100
def cal_rsi(df, period=12):
if len(df)<period:
pass
else:
delta = df.diff().dropna()
u = delta * 0
d = u.copy()
u[delta > 0] = delta[delta > 0]
d[delta < 0] = -delta[delta < 0]
u[u.index[period - 1]] = np.mean(u[:period])
u = u.drop(u.index[:(period - 1)])
d[d.index[period - 1]] = np.mean(d[:period])
d = d.drop(d.index[:(period - 1)])
avgGain = u.ewm(com=period - 1, adjust=False).mean()
avgLoss = d.ewm(com=period - 1, adjust=False).mean()
rs = avgGain / avgLoss
result = 100 - 100 / (1 + rs)
return result
3-dmi动向指标或趋向指标
通过分析股票价格在涨跌过程中买卖双方力量均衡点的变化情况,适用中长期
Step 1. 计算Directional movement (动向变化值)
+DM:当日最高价比昨日最高价高并且当日最低价比昨日最低价高,即为上升动向+DM。上升幅度为:当日最高价减去昨日最高价。
-DM:当日最高价比昨日最高价低并且当日最低价比昨日最低价低,即为下降动向-DM。下降幅度为:昨日最低价减去今日最低价。
Step 2 . 计算True Range (真实波幅)
TR =∣最高价-最低价∣,∣最高价-昨收∣,∣昨收-最低价∣ 三者之中的最高值
Step 3: 计算Directional Movment Index (动向指数)
+DI(14) = +DM(14)/TR(14)*100
-DI(14) = -DM(14)/TR(14)*100
Step 4: 计算ADX
DX是+DI与-DI两者之差的绝对值除以两者之和的百分数。
DX=[(+DI14)-(-DI14)]/[(+DI14)+(-DI14)]*100
ADX是DX的14天平滑平均线。
ADX = SMA(DX, 14)
def cal_adx(df, N=14, M=6):
hd = df['high'].diff().dropna()
ld = -df['low'].diff().dropna()
dmp = pd.DataFrame({'dmp':[0]*len(hd)},index=hd.index)
dmp[(hd>0) & (ld<0)] = hd
dmp = dmp.rolling(N).sum().dropna()
dmm = pd.DataFrame({'dmm':[0]*len(ld)},index=ld.index)
dmm[(hd<0)&(ld>0)] = ld
dmm = dmm.rolling(N).sum().dropna()
temp = pd.concat([df['high']-df['low'], abs(df['high']-df['close'].shift(1)),\
abs(df['low']-df['close'].shift(1))],axis=1).dropna()
tr = temp.max(axis=1).dropna()
s_index = dmm.index & tr.index &dmp.index
dmp = dmp.loc[s_index]
dmm = dmm.loc[s_index]
tr =tr.loc[s_index]
pdi = 100*dmp['dmp']/tr
mdi = dmm['dmm']*100/tr
dx = abs(pdi-mdi)/(pdi+mdi)*100
adx = dx.rolling(M).mean().dropna()
adx = pd.DataFrame(adx,columns=['adx'])
return adx
4-kdj
def cal_kdj(df, N=9, M=3):
df['l_low'] = df['low'].rolling(N).min()
df['h_high'] = df['high'].rolling(N).max()
df['rsv'] = (df['close']-df['l_low'])/(df['h_high']-df['l_low'])
df['k'] = df['rsv'].ewm(adjust=False, alpha=1/M).mean()
df['d'] = df['k'].ewm(adjust=False, alpha=1/M).mean()
df['j'] = 3*df['k']-2*df['d']
return df['j']
看到有个赋值用np.where写的,觉得挺好,收藏备用
data['pre_j']=data['j'].shift(1)
data['long_signal']=np.where((data['pre_j']<lower)&(data['j']>=lower),1,0)
data['short_signal']=np.where((data['pre_j']>upper)&(data['j']<=upper),-1,0)