@magenta-kabuto Hello. Use the following example.
Note that the backtest parameters are set for daily prediction of values.
The prediction function is designed to return a value for one day. Later, I will show how to create a single-pass version.
import xarray as xr
import qnt.data as qndata
import qnt.backtester as qnbt
import qnt.ta as qnta
import qnt.stats as qns
import qnt.graph as qngraph
import qnt.output as qnout
import numpy as np
import pandas as pd
import torch
from torch import nn, optim
import random
asset_name_all = ['NAS:AAPL', 'NAS:GOOGL']
lookback_period = 155
train_period = 100
class LSTM(nn.Module):
"""
Class to define our LSTM network.
"""
def __init__(self, input_dim=3, hidden_layers=64):
super(LSTM, self).__init__()
self.hidden_layers = hidden_layers
self.lstm1 = nn.LSTMCell(input_dim, self.hidden_layers)
self.lstm2 = nn.LSTMCell(self.hidden_layers, self.hidden_layers)
self.linear = nn.Linear(self.hidden_layers, 1)
def forward(self, y):
outputs = []
n_samples = y.size(0)
h_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
c_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
h_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
c_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
for time_step in range(y.size(1)):
x_t = y[:, time_step, :] # Ensure x_t is [batch, input_dim]
h_t, c_t = self.lstm1(x_t, (h_t, c_t))
h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
output = self.linear(h_t2)
outputs.append(output.unsqueeze(1))
outputs = torch.cat(outputs, dim=1).squeeze(-1)
return outputs
def get_model():
def set_seed(seed_value=42):
"""Set seed for reproducibility."""
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)
torch.cuda.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value) # if you are using multi-GPU.
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(42)
model = LSTM(input_dim=3)
return model
def get_features(data):
close_price = data.sel(field="close").ffill('time').bfill('time').fillna(1)
open_price = data.sel(field="open").ffill('time').bfill('time').fillna(1)
high_price = data.sel(field="high").ffill('time').bfill('time').fillna(1)
log_close = np.log(close_price)
log_open = np.log(open_price)
features = xr.concat([log_close, log_open, high_price], "feature")
return features
def get_target_classes(data):
price_current = data.sel(field='open')
price_future = qnta.shift(price_current, -1)
class_positive = 1 # prices goes up
class_negative = 0 # price goes down
target_price_up = xr.where(price_future > price_current, class_positive, class_negative)
return target_price_up
def load_data(period):
return qndata.stocks.load_ndx_data(tail=period, assets=asset_name_all)
def train_model(data):
features_all = get_features(data)
target_all = get_target_classes(data)
models = dict()
for asset_name in asset_name_all:
model = get_model()
target_cur = target_all.sel(asset=asset_name).dropna('time', 'any')
features_cur = features_all.sel(asset=asset_name).dropna('time', 'any')
target_for_learn_df, feature_for_learn_df = xr.align(target_cur, features_cur, join='inner')
criterion = nn.MSELoss()
optimiser = optim.LBFGS(model.parameters(), lr=0.08)
epochs = 1
for i in range(epochs):
def closure():
optimiser.zero_grad()
feature_data = feature_for_learn_df.transpose('time', 'feature').values
in_ = torch.tensor(feature_data, dtype=torch.float32).unsqueeze(0)
out = model(in_)
target = torch.zeros(1, len(target_for_learn_df.values))
target[0, :] = torch.tensor(np.array(target_for_learn_df.values))
loss = criterion(out, target)
loss.backward()
return loss
optimiser.step(closure)
models[asset_name] = model
return models
def predict(models, data, state):
last_time = data.time.values[-1]
data_last = data.sel(time=slice(last_time, None))
weights = xr.zeros_like(data_last.sel(field='close'))
for asset_name in asset_name_all:
features_all = get_features(data_last)
features_cur = features_all.sel(asset=asset_name).dropna('time', 'any')
if len(features_cur.time) < 1:
continue
feature_data = features_cur.transpose('time', 'feature').values
in_ = torch.tensor(feature_data, dtype=torch.float32).unsqueeze(0)
out = models[asset_name](in_)
prediction = out.detach()[0]
weights.loc[dict(asset=asset_name, time=features_cur.time.values)] = prediction
weights = weights * data_last.sel(field="is_liquid")
# state may be null, so define a default value
if state is None:
default = xr.zeros_like(data_last.sel(field='close')).isel(time=-1)
state = {
"previus_weights": default,
}
previus_weights = state['previus_weights']
# align the arrays to prevent problems in case the asset list changes
previus_weights, weights = xr.align(previus_weights, weights, join='right')
weights_avg = (previus_weights + weights) / 2
next_state = {
"previus_weights": weights_avg.isel(time=-1),
}
# print(last_time)
# print("previus_weights")
# print(previus_weights)
# print(weights)
# print("weights_avg")
# print(weights_avg.isel(time=-1))
return weights_avg, next_state
weights = qnbt.backtest_ml(
load_data=load_data,
train=train_model,
predict=predict,
train_period=train_period,
retrain_interval=360,
retrain_interval_after_submit=1,
predict_each_day=True,
competition_type='stocks_nasdaq100',
lookback_period=lookback_period,
start_date='2006-01-01',
build_plots=True
)
I recommend not using state at all, but rather using the approach I mentioned above.
Because it's faster.
If you need to use a single-pass version, it's better to load more data and calculate the weight values for previous days, then combine them. You will have calculated weights for the previous days.