In [None]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) { return false; }
// disable widget scrolling

In [None]:
import json

import xarray as xr
import xarray.ufuncs as xruf
import numpy as np
import pandas as pd

import qnt.data as qndata          # data loading and manipulation
import qnt.stats as qnstats        # key statistics
import qnt.graph as qngraph        # graphical tools
import qnt.ta as qnta              # technical analysis indicators
import qnt.output as qnout         # for writing output
import qnt.log as qnlog            # log configuration
import qnt.optimizer as qno        # optimizer

# display function for fancy displaying:
from IPython.display import display
# lib for charts
import plotly.graph_objs as go

In [None]:
data = qndata.futures_load_data(min_date='2005-01-01')

In [None]:
def strategy_long(data, asset=None, ma_periods=150, roc_periods=5, sideways_threshold=2):
    if asset is not None:
        data = data.sel(asset=[asset])
        
    close = data.sel(field='close')
    
    
    # calculate MA 
    ma = qnta.lwma(close, ma_periods)
    # calcuate ROC
    roc = qnta.roc(ma, roc_periods)

    # positive trend direction
    positive_trend = roc > sideways_threshold
    # negtive trend direction
    negative_trend = roc < -sideways_threshold 
    # sideways
    sideways_trend = abs(roc) <= sideways_threshold
    
    # We suppose that a sideways trend after a positive trend is also positive
    side_positive_trend = positive_trend.where(sideways_trend == False).ffill('time').fillna(False)
    # and a sideways trend after a negative trend is also negative
    side_negative_trend = negative_trend.where(sideways_trend == False).ffill('time').fillna(False)

    # define signals
    buy_signal = positive_trend
    buy_stop_signal = side_negative_trend

    sell_signal = negative_trend
    sell_stop_signal = side_positive_trend

    # calc positions 
    position = close.copy(True)
    position[:] = np.nan
    position = xr.where(buy_signal, 1, position)
    position = xr.where(sell_signal, -1, position)
    position = xr.where(xruf.logical_and(buy_stop_signal, position.ffill('time') > 0), 0, position)
    position = xr.where(xruf.logical_and(sell_stop_signal, position.ffill('time') < 0), 0, position)
    position = position.ffill('time').fillna(0)

    return position


In [None]:
###DEBUG###
# evaluator will remove cells with such marks before evaluation

output = strategy_long(data)
stats = qnstats.calc_stat(data, output.sel(time=slice('2006-01-01',None)))
display(stats.to_pandas().tail())

In [None]:
###DEBUG###
# evaluator will remove cells with such marks before evaluation

result_for_all = qno.optimize_strategy(
    data,
    strategy_long,
    qno.full_range_args_generator(
        ma_periods=range(10, 200, 50), # TODO set the proper range and step
        roc_periods=range(10, 100, 50), # TODO set the proper range and step
        sideways_threshold=range(0,5,1), # TODO set the proper range and step
    ),
    workers=1 # you can set more workers on your local PC to speed up
)

In [None]:
###DEBUG###

print("---")
print("Best iteration:")
display(result_for_all['best_iteration'])

In [None]:
###DEBUG###
# evaluator will remove cells with such marks before evaluation

result_long = qno.optimize_strategy(
    data,
    strategy_long,
    qno.full_range_args_generator(
        ma_periods=range(10, 200, 100), # TODO set the proper range and step
        roc_periods=range(10, 100, 100), # TODO set the proper range and step
        sideways_threshold=range(0, 5, 5), # TODO set the proper range and step
        asset=data.asset.values.tolist()
    ),
    workers=1 # you can set more workers on your local PC to speed up
)

In [None]:
###DEBUG###
# evaluator will remove cells with such marks before evaluation

def find_best_parameters(result, asset_count, parameter_set_count):
    assets = data.asset.values.tolist()
    assets.sort(key=lambda a: -asset_weight(result, a, parameter_set_count))
    assets = assets[:asset_count]
    params = []
    for a in assets:
        params += get_best_parameters_for_asset(result, a, parameter_set_count)
    return params


def asset_weight(result, asset, parameter_set_count):
    asset_iterations = [i for i in result['iterations'] if i['args']['asset'] == asset]
    asset_iterations.sort(key=lambda i: -i['result']['sharpe_ratio'])
    # weight is a sum of the three best iterations
    return sum(i['result']['sharpe_ratio'] for i in asset_iterations[:parameter_set_count])


def get_best_parameters_for_asset(result, asset, parameter_set_count):
    asset_iterations = [i for i in result['iterations'] if i['args']['asset'] == asset]
    asset_iterations.sort(key=lambda i: -i['result']['sharpe_ratio'])
    return [i['args'] for i in asset_iterations[:parameter_set_count]]


config = find_best_parameters(result=result_long, asset_count=15, parameter_set_count=3)
# If you change the asset_count and/or parameters_count, you will get a new strategy.

json.dump(config, open('config.json', 'w'), indent=2)

display(config)

In [None]:
config = json.load(open('config.json', 'r'))

In [None]:
def optimized_strategy(data, config):
    results = []
    for c in config:
        results.append(strategy_long(data, **c))
    # align and join results
    results = xr.align(*results, join='outer')
    results = [r.fillna(0) for r in results]
    output = sum(results) / len(results)
    return output

In [None]:
output = optimized_strategy(data, config)
output = qnout.clean(output, data) # fix common issues

qnout.check(output, data) 
qnout.write(output)

stats = qnstats.calc_stat(data, output.sel(time=slice('2006-01-01',None)))
display(stats.to_pandas().tail())
qngraph.make_major_plots(stats)