Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
packtpublishing
GitHub Repository: packtpublishing/machine-learning-for-algorithmic-trading-second-edition
Path: blob/master/utils.py
2909 views
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
__author__ = 'Stefan Jansen'
4
5
import numpy as np
6
7
np.random.seed(42)
8
9
10
def format_time(t):
11
"""Return a formatted time string 'HH:MM:SS
12
based on a numeric time() value"""
13
m, s = divmod(t, 60)
14
h, m = divmod(m, 60)
15
return f'{h:0>2.0f}:{m:0>2.0f}:{s:0>2.0f}'
16
17
18
class MultipleTimeSeriesCV:
19
"""Generates tuples of train_idx, test_idx pairs
20
Assumes the MultiIndex contains levels 'symbol' and 'date'
21
purges overlapping outcomes"""
22
23
def __init__(self,
24
n_splits=3,
25
train_period_length=126,
26
test_period_length=21,
27
lookahead=None,
28
date_idx='date',
29
shuffle=False):
30
self.n_splits = n_splits
31
self.lookahead = lookahead
32
self.test_length = test_period_length
33
self.train_length = train_period_length
34
self.shuffle = shuffle
35
self.date_idx = date_idx
36
37
def split(self, X, y=None, groups=None):
38
unique_dates = X.index.get_level_values(self.date_idx).unique()
39
days = sorted(unique_dates, reverse=True)
40
split_idx = []
41
for i in range(self.n_splits):
42
test_end_idx = i * self.test_length
43
test_start_idx = test_end_idx + self.test_length
44
train_end_idx = test_start_idx + self.lookahead - 1
45
train_start_idx = train_end_idx + self.train_length + self.lookahead - 1
46
split_idx.append([train_start_idx, train_end_idx,
47
test_start_idx, test_end_idx])
48
49
dates = X.reset_index()[[self.date_idx]]
50
for train_start, train_end, test_start, test_end in split_idx:
51
52
train_idx = dates[(dates[self.date_idx] > days[train_start])
53
& (dates[self.date_idx] <= days[train_end])].index
54
test_idx = dates[(dates[self.date_idx] > days[test_start])
55
& (dates[self.date_idx] <= days[test_end])].index
56
if self.shuffle:
57
np.random.shuffle(list(train_idx))
58
yield train_idx.to_numpy(), test_idx.to_numpy()
59
60
def get_n_splits(self, X, y, groups=None):
61
return self.n_splits
62
63