Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/preprocessor/preprocessors.py
732 views
1
from __future__ import annotations
2
3
import datetime
4
5
import numpy as np
6
import pandas as pd
7
from sklearn.base import BaseEstimator
8
from sklearn.base import TransformerMixin
9
from sklearn.preprocessing import MaxAbsScaler
10
from stockstats import StockDataFrame as Sdf
11
12
from finrl import config
13
from finrl.meta.preprocessor.yahoodownloader import YahooDownloader
14
15
16
def load_dataset(*, file_name: str) -> pd.DataFrame:
17
"""
18
load csv dataset from path
19
:return: (df) pandas dataframe
20
"""
21
# _data = pd.read_csv(f"{config.DATASET_DIR}/{file_name}")
22
_data = pd.read_csv(file_name)
23
return _data
24
25
26
def data_split(df, start, end, target_date_col="date"):
27
"""
28
split the dataset into training or testing using date
29
:param data: (df) pandas dataframe, start, end
30
:return: (df) pandas dataframe
31
"""
32
data = df[(df[target_date_col] >= start) & (df[target_date_col] < end)]
33
data = data.sort_values([target_date_col, "tic"], ignore_index=True)
34
data.index = data[target_date_col].factorize()[0]
35
return data
36
37
38
def convert_to_datetime(time):
39
time_fmt = "%Y-%m-%dT%H:%M:%S"
40
if isinstance(time, str):
41
return datetime.datetime.strptime(time, time_fmt)
42
43
44
class GroupByScaler(BaseEstimator, TransformerMixin):
45
"""Sklearn-like scaler that scales considering groups of data.
46
47
In the financial setting, this scale can be used to normalize a DataFrame
48
with time series of multiple tickers. The scaler will fit and transform
49
data for each ticker independently.
50
"""
51
52
def __init__(self, by, scaler=MaxAbsScaler, columns=None, scaler_kwargs=None):
53
"""Initializes GoupBy scaler.
54
55
Args:
56
by: Name of column that will be used to group.
57
scaler: Scikit-learn scaler class to be used.
58
columns: List of columns that will be scaled.
59
scaler_kwargs: Keyword arguments for chosen scaler.
60
"""
61
self.scalers = {} # dictionary with scalers
62
self.by = by
63
self.scaler = scaler
64
self.columns = columns
65
self.scaler_kwargs = {} if scaler_kwargs is None else scaler_kwargs
66
67
def fit(self, X, y=None):
68
"""Fits the scaler to input data.
69
70
Args:
71
X: DataFrame to fit.
72
y: Not used.
73
74
Returns:
75
Fitted GroupBy scaler.
76
"""
77
# if columns aren't specified, considered all numeric columns
78
if self.columns is None:
79
self.columns = X.select_dtypes(exclude=["object"]).columns
80
# fit one scaler for each group
81
for value in X[self.by].unique():
82
X_group = X.loc[X[self.by] == value, self.columns]
83
self.scalers[value] = self.scaler(**self.scaler_kwargs).fit(X_group)
84
return self
85
86
def transform(self, X, y=None):
87
"""Transforms unscaled data.
88
89
Args:
90
X: DataFrame to transform.
91
y: Not used.
92
93
Returns:
94
Transformed DataFrame.
95
"""
96
# apply scaler for each group
97
X = X.copy()
98
for value in X[self.by].unique():
99
select_mask = X[self.by] == value
100
X.loc[select_mask, self.columns] = self.scalers[value].transform(
101
X.loc[select_mask, self.columns]
102
)
103
return X
104
105
106
class FeatureEngineer:
107
"""Provides methods for preprocessing the stock price data
108
109
Attributes
110
----------
111
use_technical_indicator : boolean
112
we technical indicator or not
113
tech_indicator_list : list
114
a list of technical indicator names (modified from neofinrl_config.py)
115
use_turbulence : boolean
116
use turbulence index or not
117
user_defined_feature:boolean
118
use user defined features or not
119
120
Methods
121
-------
122
preprocess_data()
123
main method to do the feature engineering
124
125
"""
126
127
def __init__(
128
self,
129
use_technical_indicator=True,
130
tech_indicator_list=config.INDICATORS,
131
use_vix=False,
132
use_turbulence=False,
133
user_defined_feature=False,
134
):
135
self.use_technical_indicator = use_technical_indicator
136
self.tech_indicator_list = tech_indicator_list
137
self.use_vix = use_vix
138
self.use_turbulence = use_turbulence
139
self.user_defined_feature = user_defined_feature
140
141
def preprocess_data(self, df):
142
"""main method to do the feature engineering
143
@:param config: source dataframe
144
@:return: a DataMatrices object
145
"""
146
# clean data
147
df = self.clean_data(df)
148
149
# add technical indicators using stockstats
150
if self.use_technical_indicator:
151
df = self.add_technical_indicator(df)
152
print("Successfully added technical indicators")
153
154
# add vix for multiple stock
155
if self.use_vix:
156
df = self.add_vix(df)
157
print("Successfully added vix")
158
159
# add turbulence index for multiple stock
160
if self.use_turbulence:
161
df = self.add_turbulence(df)
162
print("Successfully added turbulence index")
163
164
# add user defined feature
165
if self.user_defined_feature:
166
df = self.add_user_defined_feature(df)
167
print("Successfully added user defined features")
168
169
# fill the missing values at the beginning and the end
170
df = df.ffill().bfill()
171
return df
172
173
def clean_data(self, data):
174
"""
175
clean the raw data
176
deal with missing values
177
reasons: stocks could be delisted, not incorporated at the time step
178
:param data: (df) pandas dataframe
179
:return: (df) pandas dataframe
180
"""
181
df = data.copy()
182
df = df.sort_values(["date", "tic"], ignore_index=True)
183
df.index = df.date.factorize()[0]
184
merged_closes = df.pivot_table(index="date", columns="tic", values="close")
185
merged_closes = merged_closes.dropna(axis=1)
186
tics = merged_closes.columns
187
df = df[df.tic.isin(tics)]
188
# df = data.copy()
189
# list_ticker = df["tic"].unique().tolist()
190
# only apply to daily level data, need to fix for minute level
191
# list_date = list(pd.date_range(df['date'].min(),df['date'].max()).astype(str))
192
# combination = list(itertools.product(list_date,list_ticker))
193
194
# df_full = pd.DataFrame(combination,columns=["date","tic"]).merge(df,on=["date","tic"],how="left")
195
# df_full = df_full[df_full['date'].isin(df['date'])]
196
# df_full = df_full.sort_values(['date','tic'])
197
# df_full = df_full.fillna(0)
198
return df
199
200
def add_technical_indicator(self, data):
201
"""
202
calculate technical indicators
203
use stockstats package to add technical inidactors
204
:param data: (df) pandas dataframe
205
:return: (df) pandas dataframe
206
"""
207
df = data.copy()
208
df = df.sort_values(by=["tic", "date"])
209
stock = Sdf.retype(df.copy())
210
unique_ticker = stock.tic.unique()
211
212
for indicator in self.tech_indicator_list:
213
indicator_df = pd.DataFrame()
214
for i in range(len(unique_ticker)):
215
try:
216
temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
217
temp_indicator = pd.DataFrame(temp_indicator)
218
temp_indicator["tic"] = unique_ticker[i]
219
temp_indicator["date"] = df[df.tic == unique_ticker[i]][
220
"date"
221
].to_list()
222
# indicator_df = indicator_df.append(
223
# temp_indicator, ignore_index=True
224
# )
225
indicator_df = pd.concat(
226
[indicator_df, temp_indicator], axis=0, ignore_index=True
227
)
228
except Exception as e:
229
print(e)
230
df = df.merge(
231
indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"
232
)
233
df = df.sort_values(by=["date", "tic"])
234
return df
235
# df = data.set_index(['date','tic']).sort_index()
236
# df = df.join(df.groupby(level=0, group_keys=False).apply(lambda x, y: Sdf.retype(x)[y], y=self.tech_indicator_list))
237
# return df.reset_index()
238
239
def add_user_defined_feature(self, data):
240
"""
241
add user defined features
242
:param data: (df) pandas dataframe
243
:return: (df) pandas dataframe
244
"""
245
df = data.copy()
246
df["daily_return"] = df.close.pct_change(1)
247
# df['return_lag_1']=df.close.pct_change(2)
248
# df['return_lag_2']=df.close.pct_change(3)
249
# df['return_lag_3']=df.close.pct_change(4)
250
# df['return_lag_4']=df.close.pct_change(5)
251
return df
252
253
def add_vix(self, data):
254
"""
255
add vix from yahoo finance
256
:param data: (df) pandas dataframe
257
:return: (df) pandas dataframe
258
"""
259
df = data.copy()
260
df_vix = YahooDownloader(
261
start_date=df.date.min(), end_date=df.date.max(), ticker_list=["^VIX"]
262
).fetch_data()
263
vix = df_vix[["date", "close"]]
264
vix.columns = ["date", "vix"]
265
266
df = df.merge(vix, on="date")
267
df = df.sort_values(["date", "tic"]).reset_index(drop=True)
268
return df
269
270
def add_turbulence(self, data):
271
"""
272
add turbulence index from a precalcualted dataframe
273
:param data: (df) pandas dataframe
274
:return: (df) pandas dataframe
275
"""
276
df = data.copy()
277
turbulence_index = self.calculate_turbulence(df)
278
df = df.merge(turbulence_index, on="date")
279
df = df.sort_values(["date", "tic"]).reset_index(drop=True)
280
return df
281
282
def calculate_turbulence(self, data):
283
"""calculate turbulence index based on dow 30"""
284
# can add other market assets
285
df = data.copy()
286
df_price_pivot = df.pivot(index="date", columns="tic", values="close")
287
# use returns to calculate turbulence
288
df_price_pivot = df_price_pivot.pct_change()
289
290
unique_date = df.date.unique()
291
# start after a year
292
start = 252
293
turbulence_index = [0] * start
294
# turbulence_index = [0]
295
count = 0
296
for i in range(start, len(unique_date)):
297
current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
298
# use one year rolling window to calcualte covariance
299
hist_price = df_price_pivot[
300
(df_price_pivot.index < unique_date[i])
301
& (df_price_pivot.index >= unique_date[i - 252])
302
]
303
# Drop tickers which has number missing values more than the "oldest" ticker
304
filtered_hist_price = hist_price.iloc[
305
hist_price.isna().sum().min() :
306
].dropna(axis=1)
307
308
cov_temp = filtered_hist_price.cov()
309
current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(
310
filtered_hist_price, axis=0
311
)
312
# cov_temp = hist_price.cov()
313
# current_temp=(current_price - np.mean(hist_price,axis=0))
314
315
temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
316
current_temp.values.T
317
)
318
if temp > 0:
319
count += 1
320
if count > 2:
321
turbulence_temp = temp[0][0]
322
else:
323
# avoid large outlier because of the calculation just begins
324
turbulence_temp = 0
325
else:
326
turbulence_temp = 0
327
turbulence_index.append(turbulence_temp)
328
try:
329
turbulence_index = pd.DataFrame(
330
{"date": df_price_pivot.index, "turbulence": turbulence_index}
331
)
332
except ValueError:
333
raise Exception("Turbulence information could not be added.")
334
return turbulence_index
335
336