神经网络算法交易:波动预测与定制损失函数

ML&QI - 量化投资和机器学习

image.png
# Import model
from utils import *
import matplotlib.pylab as plt

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.layers.recurrent import LSTM, GRU
from keras.layers import Conv1D, MaxPooling1D, AtrousConvolution1D, RepeatVector, LeakyReLU
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger
from keras.layers.wrappers import Bidirectional
from keras import regularizers
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import *
from keras.optimizers import RMSprop, Adam, SGD, Nadam
from keras.initializers import *
from keras.constraints import *

from keras import backend as K
import seaborn as sns
import numpy as np
import pandas as pd
sns.despine()

def data2change(data):
    change = pd.DataFrame(data).pch_change()
    change = change.replace([np.inf, -np.inf], np.nan)
    change = change.fillna(0.).values.tolist()
    change = [c[0] for c in change]
    return change

# Setting Parameters
WINDOW = 30
EMB_SIZE = 6
STEP = 1
FORCAST = 1

# Loading Data
data_original = pd.read_csv('./SHCOMP.csv')[::-1]

openp = data_original.loc[:, 'Open'].tolist()
highp = data_original.loc[:, 'High'].tolist()
lowp = data_original.loc[:, 'Low'].tolist()
closep = data_original.loc[:, 'Close'].tolist()
volumep = data_original.loc[:, 'Volume'].tolist()

openp = data2change(openp)
highp = data2change(highp)
lowp = data2change(lowp)
closep = data2change(closep)
volumep = data2change(volumep)

# Compute Volatility Data
volatility = []
for i in range(WINDOW, len(openp)):
    window = highp[i - WINDOW: i]
    volatility.append(np.std(window))

openp, highp, lowp, closep, volumep = openp[WINDOW:], highp[WINDOW:], lowp[WINDOW:], closep[WINDOW:], volumep[WINDOW:]

# Plot the Volatility
plt.plot(volatility)
plt.show()

X, Y = [], []
for i in range(100, len(data_original), STEP):
    try:
        o = openp[i: i + WINDOW]
        h = highp[i: i + WINDOW]
        l = lowp[i: i + WINDOW]
        c = closep[i: i + WINDOW]
        v = volumep[i: i + WINDOW]
        
        vol = volatility[i: i + WINDOW]
        
        y_i = volatility[i + WINDOW + FORECAST]
        x_i = np.column_stack((vol, o, h, l, c, v))
        
    except Exception as e:
        break
    
    X.append(x_i)
    Y.append(y_i)

X, Y = np.array(X), np.array(Y)
X_train, X_test = X[:-50], X[-50:]
Y_train, Y_test = Y[:-50], Y[-50:]

X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], EMB_SIZE))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], EMB_SIZE))

# Loss Function
epsilon = 1.0e-9

def qlike_loss(y_true, y_pred):
    y_pred = K.clip(y_pred, epsilon, 1.0 - epsilon)
    loss = K.log(y_pred) + y_true / y_pred
    return K.mean(loss, axis=-1)

def mse_log(y_true, y_pred):
    y_pred = K.clip(y_pred, epsilon, 1.0 - epsilon)
    loss = K.square(K.log(y_true) - K.log(y_pred))
    return K.mean(loss, axis=-1)

def mse_sd(y_true, y_pred):
    y_pred = K.clip(y_pred, epsilon, 1.0 - epsilon)
    loss = K.square(y_true - K.sqrt(y_pred))
    return K.mean(loss, axis=-1)

def hmse(y_true, y_pred):
    y_pred = K.clip(y_pred, epsilon, 1.0 - epsilon)
    loss = K.square(y_true / y_pred - 1.)
    return K.mean(loss, axis=-1)

def stock_loss(y_true, y_pred):
    alpha = 100.
    loss = K.switch(K.less(y_true * y_pred, 0), \
                   alpha*y_pred**2 - K.sign(y_true)*y_pred + K.abs(y_true),\
                   K.abs(y_true - y_pred))
    return K.mean(loss, axis=-1)

# Model
model = Sequential()
model.add(Conv1D(kernel_size=4, input_shape=(WINDOW, EMB_SIZE), padding="same", filters=16))
model.add(MaxPooling1D(2))
model.add(LeakyReLU())
model.add(Conv1D(kernel_size=4, padding="same", filters=32))
model.add(MaxPooling1D(2))
model.add(LeakyReLU())
model.add(Flatten())

model.add(Dense(16))
model.add(LeakyReLU())

model.add(Dense(1))
model.add(Activation('linear'))

opt = Nadam(lr=0.002)

reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=25, min_lr=0.00001, verbose=1)
checkpointer = ModelCheckpoint(filepath="lolkekr.hdf5", verbose=1, save_best_only=True)

model.compile(optimizer=opt, loss=mse_log)

try:
    history = model.fit(X_train, Y_train, 
                       nb_epoch = 100,
                       batch_size = 256,
                       verbose=1, 
                       validation_data=(X_test, Y_test),
                       callbacks=[reduce_lr, checkpointer],
                       shuffle=True)
except Exception as e:
    print(e)
    
finally:
    # model.load_weights("lolkekr.hdf5")
    pred = model.predict(X_test)
    predicted = pred
    original = Y_test
    plt.title('Actual and Predicted')
    plt.legend(loc='best')
    plt.plot(original, color='black', label='Original Data')
    plt.plot(predicted, color='blue', label='Predicted Data')
    plt.show()
    
    print(np.mean(np.square(predicted - original)))
    print(np.mean(np.abs(predicted - original)))
    print(np.mean(np.abs((predicted - original) / original)))
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容