Notebook

Q17 Neural Networks

This template shows how to make a submission to the Q17 cryptocurrency contest using a neural-network based approach.

You can clone and edit this example there (tab Examples).


This example uses a Long Short Term Memory (LSTM) Neural Network to predict if the price is going up or down. This example was build for the Q17 crypto Top-10 Long-Short contest.

Important! When developing further you need to run the ./init.py file once in order to install the pytorch dependency.

Strategy idea: We will go long on cryptocurrencies depending on predictions of LSTM NN regarding of how sure the NN is the that the price is moving up.

Feature for learning - logarithm of closing price

To have a look at all the technical indicators we offer, go to Technical Indicators


We will use a specialized version of the Quantiacs backtester for this purpose, which dramatically speeds up the backtesting process when the models should be retrained on a regular basis.

Need help? Check the Documentation and find solutions/report problems in the Forum section.

More help with Jupyter? Check the official Jupyter page.

Once you are done, click on Submit to the contest and take part to our competitions.

Learn more about LSTM: PyTorch

API reference:

  • data: check how to work with data;

  • backtesting: read how to run the simulation and check the results.

In [1]:
%%javascript
window.IPython && (IPython.OutputArea.prototype._should_scroll = function(lines) { return false; })
// disable widget scrolling
In [2]:
# Multi-Pass implementation of a trivial crossover system using the Quantiacs built-in backtester.
import xarray as xr           # xarray for data manipulation
import qnt.data as qndata     # functions for loading data
import qnt.backtester as qnbt # built-in backtester
import qnt.ta as qnta         # technical analysis library
import numpy as np
import logging
import pandas as pd

import torch
from torch import nn, optim

torch.manual_seed(24)
np.random.seed(24)
In [3]:
def get_features(data):
    """
        get the features: for this example we only use the logarithm of the closing price
    
    """
    price= data.sel(field="close").ffill('time').bfill('time').fillna(1)
    price=np.log(price)
    return price
In [4]:
def get_target_classes(data):
    price_current = data.sel(field='close')
    price_future = qnta.shift(price_current, -1)

    class_positive = 1 #prices goes up
    class_negative = 0 #price goes down

    target_price_up = xr.where(price_future > price_current, class_positive, class_negative)

    return target_price_up
In [5]:
class LSTM(nn.Module):
    """
        class to define our LSTM network
    """
    def __init__(self, hidden_layers=64):
        super(LSTM, self).__init__()
        self.hidden_layers = hidden_layers
        # lstm1, lstm2, linear are all layers in the network
        self.lstm1 = nn.LSTMCell(1, self.hidden_layers)
        self.lstm2 = nn.LSTMCell(self.hidden_layers, self.hidden_layers)
        self.linear = nn.Linear(self.hidden_layers, 1)
        
    #define the foward function
    def forward(self, y, future_preds=0):
        outputs, n_samples = [], y.size(0)
        h_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
        c_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
        h_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
        c_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
        
        for time_step in y.split(1, dim=1):
            h_t, c_t = self.lstm1(time_step, (h_t, c_t)) # initial hidden and cell states
            h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2)) # new hidden and cell states
            output = self.linear(h_t2) # output from the last FC layer
            outputs.append(output)
            
        for i in range(future_preds):
            h_t, c_t = self.lstm1(output, (h_t, c_t))
            h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
            output = self.linear(h_t2)
            outputs.append(output)
            
        # transform list to tensor    
        outputs = torch.cat(outputs, dim=1)
        return outputs
In [6]:
def get_model():
    torch.manual_seed(42)
    np.random.seed(42)
    
    model = LSTM()

    return model
In [7]:
def train_model(data):
    """
        train the LSTM network
    """
    
    asset_name_all = data.coords['asset'].values

    features_all = get_features(data)
    target_all = get_target_classes(data)

    model = get_model()
    
    for asset_name in asset_name_all:
        
        # drop missing values:
        target_cur = target_all.sel(asset=asset_name).dropna('time', 'any')
        features_cur = features_all.sel(asset=asset_name).dropna('time', 'any')

        # align features and targets:
        target_for_learn_df, feature_for_learn_df = xr.align(target_cur, features_cur, join='inner')
        
    criterion = nn.MSELoss() # define loss function
    
    optimiser = optim.LBFGS(model.parameters(), lr=0.08) # we use an LBFGS solver as optimiser
    epochs = 1 #how many epochs 
    for i in range(epochs):
            def closure(): # reevaluates the model and returns the loss (forward pass)
                optimiser.zero_grad()
                
                #input tensor
                in_ = torch.zeros(1,len(feature_for_learn_df.values))
                in_[0,:]=torch.tensor(np.array(feature_for_learn_df.values))
                
                #output
                out = model(in_)
                
                #target tensor
                target = torch.zeros(1,len(target_for_learn_df.values))
                target[0,:]=torch.tensor(np.array(target_for_learn_df.values))
                
                #evaluate loss
                loss = criterion(out, target)
                loss.backward()
                
                return loss
            optimiser.step(closure) #updates weights
            
    return model
In [8]:
def predict(model,data):
    """
        predict if price is going up or down and go long depending on it
    
    """
    
    
    asset_name_all = data.coords['asset'].values
    weights = xr.zeros_like(data.sel(field='close'))
    
    for asset_name in asset_name_all:
        features_all = get_features(data)
        features_cur = features_all.sel(asset=asset_name).dropna('time','any')
        if len(features_cur.time) < 1:
            continue
            
        #input tensor
        in_ = torch.zeros(1,len(features_cur.values))
        in_[0,:]=torch.tensor(np.array(features_cur.values))
        
        #output
        out = model(in_)
        
        prediction = out.detach()[0]
        
        weights.loc[dict(asset=asset_name,time=features_cur.time.values)] = prediction
                
    return weights
In [ ]:
weights = qnbt.backtest_ml(
    train=train_model,
    predict=predict,
    train_period=10*365,   # the data length for training in calendar days
    retrain_interval=365,  # how often we have to retrain models (calendar days)
    retrain_interval_after_submit=1, # how often retrain models after submission during evaluation (calendar days)
    predict_each_day=False,  # Is it necessary to call prediction for every day during backtesting?
                             # Set it to true if you suspect that get_features is looking forward.
    competition_type='crypto_daily_long_short',  # competition type
    lookback_period=365,      # how many calendar days are needed by the predict function to generate the output
    start_date='2014-01-01',  # backtest start date
    build_plots=True          # do you need the chart?
)
Run the last iteration...
100% (7020808 of 7020808) |##############| Elapsed Time: 0:00:00 Time:  0:00:00

What libraries are available?

Our library makes extensive use of xarray:

xarray

pandas:

pandas

and numpy:

numpy

Function definitions can be found in the qnt folder in your private root directory.

# Import basic libraries.
import xarray as xr
import pandas as pd
import numpy as np

# Import quantnet libraries.
import qnt.data    as qndata  # load and manipulate data
import qnt.output as output   # manage output
import qnt.backtester as qnbt # backtester
import qnt.stats   as qnstats # statistical functions for analysis
import qnt.graph   as qngraph # graphical tools
import qnt.ta      as qnta    # indicators library

May I import libraries?

Yes, please refer to the file init.ipynb in your home directory. You can dor example use:

! conda install -y scikit-learn

How to load data?

Futures:

data= qndata.futures.load_data(tail = 15*365, dims = ("time", "field", "asset"))

BTC Futures:

data= qndata.cryptofutures.load_data(tail = 15*365, dims = ("time", "field", "asset"))

Cryptocurrencies:

data= qndata.crypto.load_data(tail = 15*365, dims = ("time", "field", "asset"))

How to view a list of all tickers?

data.asset.to_pandas().to_list()

How to see which fields are available?

data.field.to_pandas().to_list()

How to load specific tickers?

data = qndata.futures.load_data(tail=15 * 365, assets=['F_O', 'F_DX', 'F_GC'])

How to select specific tickers after loading all data?

def get_data_filter(data, assets):
    filler= data.sel(asset=assets)
    return filler

get_data_filter(data, ["F_O", "F_DX", "F_GC"])

How to get the prices for the previous day?

qnta.shift(data.sel(field="open"), periods=1)

or:

data.sel(field="open").shift(time=1)

How do I get a list of the top 10 assets ranked by Sharpe ratio?

import qnt.stats as qnstats

data= qndata.futures.load_data(tail=16 * 365)

def get_best_instruments(data, weights, top_size):
    # compute statistics:
    stats_per_asset= qnstats.calc_stat(data, weights, per_asset=True)
    # calculate ranks of assets by "sharpe_ratio":
    ranks= (-stats_per_asset.sel(field="sharpe_ratio")).rank("asset")
    # select top assets by rank "top_period" days ago:
    top_period= 300
    rank= ranks.isel(time=-top_period)
    top= rank.where(rank <= top_size).dropna("asset").asset

    # select top stats:
    top_stats= stats_per_asset.sel(asset=top.values)

    # print results:
    print("SR tail of the top assets:")
    display(top_stats.sel(field="sharpe_ratio").to_pandas().tail())

    print("avg SR = ", top_stats[-top_period:].sel(field="sharpe_ratio").mean("asset")[-1].item())
    display(top_stats)
    return top_stats.coords["asset"].values

get_best_instruments(data, weights, 10)

How can I check the results for only the top 10 assets ranked by Sharpe ratio?

Select the top assets and then load their data:

best_assets= get_best_instruments(data, weights, 10)

data= qndata.futures.load_data(tail=15 * 365, assets=best_assets)
...

How can prices be processed?

Simply import standard libraries, for example numpy:

import numpy as np

high= np.log(data.sel(field="high"))

How can you reduce slippage impace when trading?

Just apply some technique to reduce turnover:

def get_lower_slippage(weights, rolling_time=6):
    return weights.rolling({"time": rolling_time}).max()

improved_weights = get_lower_slippage(weights, rolling_time=6)

How to use technical analysis indicators?

For available indicators see the source code of the library: /qnt/ta

ATR

def get_atr(data, days=14):
    high = data.sel(field="high") * 1.0 
    low  = data.sel(field="low") * 1.0 
    close= data.sel(field="close") * 1.0

    return qnta.atr(high, low, close, days)

atr= get_atr(data, days=14)

EMA

prices= data.sel(field="high")
prices_ema= qnta.ema(prices, 15)

TRIX

prices= data.sel(field="high")
prices_trix= qnta.trix(prices, 15)

ADL and EMA

adl= qnta.ad_line(data.sel(field="close")) * 1.0 
adl_ema= qnta.ema(adl, 18)

How can you check the quality of your strategy?

import qnt.output as qnout
qnout.check(weights, data)

or

stat= qnstats.calc_stat(data, weights)
display(stat.to_pandas().tail())

or

import qnt.graph   as qngraph
statistics= qnstats.calc_stat(data, weights)
display(statistics.to_pandas().tail())

performance= statistics.to_pandas()["equity"]
qngraph.make_plot_filled(performance.index, performance, name="PnL (Equity)", type="log")

display(statistics[-1:].sel(field = ["sharpe_ratio"]).transpose().to_pandas())
qnstats.print_correlation(weights, data)

An example using pandas

One can work with pandas DataFrames at intermediate steps and at the end convert them to xarray data structures:

def get_price_pct_change(prices):
    prices_pandas= prices.to_pandas()
    assets= data.coords["asset"].values
    for asset in assets:
        prices_pandas[asset]= prices_pandas[asset].pct_change()
    return prices_pandas


prices= data.sel(field="close") * 1.0
prices_pct_change= get_price_pct_change(prices).unstack().to_xarray()

How to submit a strategy to the competition?

Check that weights are fine:

import qnt.output as qnout
qnout.check(weights, data)

If everything is ok, write the weights to file:

qnout.write(weights)

In your personal account:

  • choose a strategy;
  • click on the Submit button;
  • select the type of competition.

At the beginning you will find the strategy under the Checking area (Competition > Checking). If Sharpe ratio is larger than 1 and technical checks are successful, the strategy will go under the Running area (Competition > Running). Otherwise it will be Filtered (Competition > Filtered) and you should inspect error and warning messages.