This example uses a Long Short Term Memory (LSTM) Neural Network to predict if the price is going up or down. This example was build for the Q18 NASDAQ-100 Stock Long-Short contest.
Important! When developing further you need to run the ./init.py file once in order to install the pytorch dependency.
Strategy idea: We will go long on cryptocurrencies depending on predictions of LSTM NN regarding of how sure the NN is the that the price is moving up.
Feature for learning - logarithm of closing price
To have a look at all the technical indicators we offer, go to Technical Indicators
We will use a specialized version of the Quantiacs backtester for this purpose, which dramatically speeds up the backtesting process when the models should be retrained on a regular basis.
Need help? Check the Documentation and find solutions/report problems in the Forum section.
More help with Jupyter? Check the official Jupyter page.
Once you are done, click on Submit to the contest and take part to our competitions.
Learn more about LSTM: PyTorch
API reference:
data: check how to work with data;
backtesting: read how to run the simulation and check the results.
%%javascript
window.IPython && (IPython.OutputArea.prototype._should_scroll = function(lines) { return false; })
// disable widget scrolling
# Multi-Pass implementation of a trivial crossover system using the Quantiacs built-in backtester.
import xarray as xr # xarray for data manipulation
import qnt.data as qndata # functions for loading data
import qnt.backtester as qnbt # built-in backtester
import qnt.ta as qnta # technical analysis library
import numpy as np
import logging
import pandas as pd
import torch
from torch import nn, optim
def get_features(data):
"""
get the features: for this example we only use the logarithm of the closing price
"""
price= data.sel(field="close").ffill('time').bfill('time').fillna(1)
price=np.log(price)
return price
def get_target_classes(data):
price_current = data.sel(field='close')
price_future = qnta.shift(price_current, -1)
class_positive = 1 #prices goes up
class_negative = 0 #price goes down
target_price_up = xr.where(price_future > price_current, class_positive, class_negative)
return target_price_up
class LSTM(nn.Module):
"""
class to define our LSTM network
"""
def __init__(self, hidden_layers=64):
super(LSTM, self).__init__()
self.hidden_layers = hidden_layers
# lstm1, lstm2, linear are all layers in the network
self.lstm1 = nn.LSTMCell(1, self.hidden_layers)
self.lstm2 = nn.LSTMCell(self.hidden_layers, self.hidden_layers)
self.linear = nn.Linear(self.hidden_layers, 1)
#define the foward function
def forward(self, y, future_preds=0):
outputs, n_samples = [], y.size(0)
h_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
c_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
h_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
c_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
for time_step in y.split(1, dim=1):
h_t, c_t = self.lstm1(time_step, (h_t, c_t)) # initial hidden and cell states
h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2)) # new hidden and cell states
output = self.linear(h_t2) # output from the last FC layer
outputs.append(output)
for i in range(future_preds):
h_t, c_t = self.lstm1(output, (h_t, c_t))
h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
output = self.linear(h_t2)
outputs.append(output)
# transform list to tensor
outputs = torch.cat(outputs, dim=1)
return outputs
def get_model():
model = LSTM()
return model
def train_model(data):
"""
train the LSTM network
"""
asset_name_all = ['NAS:AAPL', 'NAS:AMZN' , 'NAS:MSFT']
features_all = get_features(data)
target_all = get_target_classes(data)
models = dict()
for asset_name in asset_name_all:
model = get_model()
# drop missing values:
target_cur = target_all.sel(asset=asset_name).dropna('time', 'any')
features_cur = features_all.sel(asset=asset_name).dropna('time', 'any')
# align features and targets:
target_for_learn_df, feature_for_learn_df = xr.align(target_cur, features_cur, join='inner')
criterion = nn.MSELoss() # define loss function
optimiser = optim.LBFGS(model.parameters(), lr=0.08) # we use an LBFGS solver as optimiser
epochs = 1 #how many epochs
for i in range(epochs):
def closure(): # reevaluates the model and returns the loss (forward pass)
optimiser.zero_grad()
#input tensor
in_ = torch.zeros(1,len(feature_for_learn_df.values))
in_[0,:]=torch.tensor(np.array(feature_for_learn_df.values))
#output
out = model(in_)
#target tensor
target = torch.zeros(1,len(target_for_learn_df.values))
target[0,:]=torch.tensor(np.array(target_for_learn_df.values))
#evaluate loss
loss = criterion(out, target)
loss.backward()
return loss
optimiser.step(closure) #updates weights
models[asset_name] = model
return models
def predict(models,data):
"""
predict if price is going up or down and go long depending on it
"""
asset_name_all = ['NAS:AAPL', 'NAS:AMZN' , 'NAS:MSFT']
weights = xr.zeros_like(data.sel(field='close'))
for asset_name in asset_name_all:
features_all = get_features(data)
features_cur = features_all.sel(asset=asset_name).dropna('time','any')
if len(features_cur.time) < 1:
continue
#input tensor
in_ = torch.zeros(1,len(features_cur.values))
in_[0,:]=torch.tensor(np.array(features_cur.values))
#output
out = models[asset_name](in_)
prediction = out.detach()[0]
weights.loc[dict(asset=asset_name,time=features_cur.time.values)] = prediction
return weights
weights = qnbt.backtest_ml(
train=train_model,
predict=predict,
train_period=3*365, # the data length for training in calendar days
retrain_interval=365, # how often we have to retrain models (calendar days)
retrain_interval_after_submit=1, # how often retrain models after submission during evaluation (calendar days)
predict_each_day=False, # Is it necessary to call prediction for every day during backtesting?
# Set it to true if you suspect that get_features is looking forward.
competition_type='stocks_nasdaq100', # competition type
lookback_period=365, # how many calendar days are needed by the predict function to generate the output
start_date='2006-01-01', # backtest start date
build_plots=True # do you need the chart?
)
What libraries are available?
Our library makes extensive use of xarray:
pandas:
and numpy:
Function definitions can be found in the qnt folder in your private root directory.
# Import basic libraries.
import xarray as xr
import pandas as pd
import numpy as np
# Import quantnet libraries.
import qnt.data as qndata # load and manipulate data
import qnt.output as output # manage output
import qnt.backtester as qnbt # backtester
import qnt.stats as qnstats # statistical functions for analysis
import qnt.graph as qngraph # graphical tools
import qnt.ta as qnta # indicators library
May I import libraries?
Yes, please refer to the file init.ipynb in your home directory. You can dor example use:
! conda install -y scikit-learn
How to load data?
Futures:
data= qndata.futures.load_data(tail = 15*365, dims = ("time", "field", "asset"))
BTC Futures:
data= qndata.cryptofutures.load_data(tail = 15*365, dims = ("time", "field", "asset"))
Cryptocurrencies:
data= qndata.crypto.load_data(tail = 15*365, dims = ("time", "field", "asset"))
How to view a list of all tickers?
data.asset.to_pandas().to_list()
How to see which fields are available?
data.field.to_pandas().to_list()
How to load specific tickers?
data = qndata.futures.load_data(tail=15 * 365, assets=['F_O', 'F_DX', 'F_GC'])
How to select specific tickers after loading all data?
def get_data_filter(data, assets):
filler= data.sel(asset=assets)
return filler
get_data_filter(data, ["F_O", "F_DX", "F_GC"])
How to get the prices for the previous day?
qnta.shift(data.sel(field="open"), periods=1)
or:
data.sel(field="open").shift(time=1)
How do I get a list of the top 10 assets ranked by Sharpe ratio?
import qnt.stats as qnstats
data= qndata.futures.load_data(tail=16 * 365)
def get_best_instruments(data, weights, top_size):
# compute statistics:
stats_per_asset= qnstats.calc_stat(data, weights, per_asset=True)
# calculate ranks of assets by "sharpe_ratio":
ranks= (-stats_per_asset.sel(field="sharpe_ratio")).rank("asset")
# select top assets by rank "top_period" days ago:
top_period= 300
rank= ranks.isel(time=-top_period)
top= rank.where(rank <= top_size).dropna("asset").asset
# select top stats:
top_stats= stats_per_asset.sel(asset=top.values)
# print results:
print("SR tail of the top assets:")
display(top_stats.sel(field="sharpe_ratio").to_pandas().tail())
print("avg SR = ", top_stats[-top_period:].sel(field="sharpe_ratio").mean("asset")[-1].item())
display(top_stats)
return top_stats.coords["asset"].values
get_best_instruments(data, weights, 10)
How can I check the results for only the top 10 assets ranked by Sharpe ratio?
Select the top assets and then load their data:
best_assets= get_best_instruments(data, weights, 10)
data= qndata.futures.load_data(tail=15 * 365, assets=best_assets)
...
How can prices be processed?
Simply import standard libraries, for example numpy:
import numpy as np
high= np.log(data.sel(field="high"))
How can you reduce slippage impace when trading?
Just apply some technique to reduce turnover:
def get_lower_slippage(weights, rolling_time=6):
return weights.rolling({"time": rolling_time}).max()
improved_weights = get_lower_slippage(weights, rolling_time=6)
How to use technical analysis indicators?
For available indicators see the source code of the library: /qnt/ta
ATR
def get_atr(data, days=14):
high = data.sel(field="high") * 1.0
low = data.sel(field="low") * 1.0
close= data.sel(field="close") * 1.0
return qnta.atr(high, low, close, days)
atr= get_atr(data, days=14)
EMA
prices= data.sel(field="high")
prices_ema= qnta.ema(prices, 15)
TRIX
prices= data.sel(field="high")
prices_trix= qnta.trix(prices, 15)
ADL and EMA
adl= qnta.ad_line(data.sel(field="close")) * 1.0
adl_ema= qnta.ema(adl, 18)
How can you check the quality of your strategy?
import qnt.output as qnout
qnout.check(weights, data)
or
stat= qnstats.calc_stat(data, weights)
display(stat.to_pandas().tail())
or
import qnt.graph as qngraph
statistics= qnstats.calc_stat(data, weights)
display(statistics.to_pandas().tail())
performance= statistics.to_pandas()["equity"]
qngraph.make_plot_filled(performance.index, performance, name="PnL (Equity)", type="log")
display(statistics[-1:].sel(field = ["sharpe_ratio"]).transpose().to_pandas())
qnstats.print_correlation(weights, data)
An example using pandas
One can work with pandas DataFrames at intermediate steps and at the end convert them to xarray data structures:
def get_price_pct_change(prices):
prices_pandas= prices.to_pandas()
assets= data.coords["asset"].values
for asset in assets:
prices_pandas[asset]= prices_pandas[asset].pct_change()
return prices_pandas
prices= data.sel(field="close") * 1.0
prices_pct_change= get_price_pct_change(prices).unstack().to_xarray()
How to submit a strategy to the competition?
Check that weights are fine:
import qnt.output as qnout
qnout.check(weights, data)
If everything is ok, write the weights to file:
qnout.write(weights)
In your personal account:
- choose a strategy;
- click on the Submit button;
- select the type of competition.
At the beginning you will find the strategy under the Checking area (Competition > Checking). If Sharpe ratio is larger than 1 and technical checks are successful, the strategy will go under the Running area (Competition > Running). Otherwise it will be Filtered (Competition > Filtered) and you should inspect error and warning messages.