Incorporating seed initialization into your PyTorch code ensures reproducibility by making the random number generation predictable. This involves setting seeds for the PyTorch engine, NumPy, and the Python random module if you're using it. Below, I'll show you how to integrate seed initialization into your existing code. Remember, while this can make your experiments more reproducible, it does not guarantee identical results across different hardware or PyTorch versions due to the inherent nondeterminism in some GPU operations.
import xarray as xr # xarray for data manipulation import qnt.data as qndata # functions for loading data import qnt.backtester as qnbt # built-in backtester import qnt.ta as qnta # technical analysis library import numpy as np import pandas as pd import torch from torch import nn, optim import random # Seed initialization function def set_seed(seed_value=42): """Set seed for reproducibility.""" random.seed(seed_value) np.random.seed(seed_value) torch.manual_seed(seed_value) torch.cuda.manual_seed(seed_value) torch.cuda.manual_seed_all(seed_value) # if you are using multi-GPU. torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # Set the seed for reproducibility set_seed(42) asset_name_all = ['NAS:AAPL', 'NAS:AMZN', 'NAS:MSFT'] class LSTM(nn.Module): """ Class to define our LSTM network. """ def __init__(self, input_dim=3, hidden_layers=64): super(LSTM, self).__init__() self.hidden_layers = hidden_layers self.lstm1 = nn.LSTMCell(input_dim, self.hidden_layers) self.lstm2 = nn.LSTMCell(self.hidden_layers, self.hidden_layers) self.linear = nn.Linear(self.hidden_layers, 1) def forward(self, y, future_preds=0): outputs = [] n_samples = y.size(0) h_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32) c_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32) h_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32) c_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32) for time_step in range(y.size(1)): x_t = y[:, time_step, :] # Ensure x_t is [batch, input_dim] h_t, c_t = self.lstm1(x_t, (h_t, c_t)) h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2)) output = self.linear(h_t2) outputs.append(output.unsqueeze(1)) outputs = torch.cat(outputs, dim=1).squeeze(-1) return outputs def get_model(): model = LSTM(input_dim=3) return model def get_features(data): close_price = data.sel(field="close").ffill('time').bfill('time').fillna(1) open_price = data.sel(field="open").ffill('time').bfill('time').fillna(1) high_price = data.sel(field="high").ffill('time').bfill('time').fillna(1) log_close = np.log(close_price) log_open = np.log(open_price) features = xr.concat([log_close, log_open, high_price], "feature") return features def get_target_classes(data): price_current = data.sel(field='close') price_future = qnta.shift(price_current, -1) class_positive = 1 # prices goes up class_negative = 0 # price goes down target_price_up = xr.where(price_future > price_current, class_positive, class_negative) return target_price_up def load_data(period): return qndata.stocks.load_ndx_data(tail=period, assets=asset_name_all) def train_model(data): features_all = get_features(data) target_all = get_target_classes(data) models = dict() for asset_name in asset_name_all: model = get_model() target_cur = target_all.sel(asset=asset_name).dropna('time', 'any') features_cur = features_all.sel(asset=asset_name).dropna('time', 'any') target_for_learn_df, feature_for_learn_df = xr.align(target_cur, features_cur, join='inner') criterion = nn.MSELoss() optimiser = optim.LBFGS(model.parameters(), lr=0.08) epochs = 1 for i in range(epochs): def closure(): optimiser.zero_grad() feature_data = feature_for_learn_df.transpose('time', 'feature').values in_ = torch.tensor(feature_data, dtype=torch.float32).unsqueeze(0) out = model(in_) target = torch.zeros(1, len(target_for_learn_df.values)) target[0, :] = torch.tensor(np.array(target_for_learn_df.values)) loss = criterion(out, target) loss.backward() return loss optimiser.step(closure) models[asset_name] = model return models def predict(models, data): weights = xr.zeros_like(data.sel(field='close')) for asset_name in asset_name_all: features_all = get_features(data) features_cur = features_all.sel(asset=asset_name).dropna('time', 'any') if len(features_cur.time) < 1: continue feature_data = features_cur.transpose('time', 'feature').values in_ = torch.tensor(feature_data, dtype=torch.float32).unsqueeze(0) out = models[asset_name](in_) prediction = out.detach()[0] weights.loc[dict(asset=asset_name, time=features_cur.time.values)] = prediction return weights weights = qnbt.backtest_ml( load_data=load_data, train=train_model, predict=predict, train_period=55, retrain_interval=55, retrain_interval_after_submit=1, predict_each_day=False, competition_type='stocks_nasdaq100', lookback_period=55, start_date='2024-01-01', build_plots=True )I think I won't be available next week. If you have any more questions, don’t expect an answer from me next week.