Path: blob/master/finrl/meta/preprocessor/preprocessors.py
732 views
from __future__ import annotations12import datetime34import numpy as np5import pandas as pd6from sklearn.base import BaseEstimator7from sklearn.base import TransformerMixin8from sklearn.preprocessing import MaxAbsScaler9from stockstats import StockDataFrame as Sdf1011from finrl import config12from finrl.meta.preprocessor.yahoodownloader import YahooDownloader131415def load_dataset(*, file_name: str) -> pd.DataFrame:16"""17load csv dataset from path18:return: (df) pandas dataframe19"""20# _data = pd.read_csv(f"{config.DATASET_DIR}/{file_name}")21_data = pd.read_csv(file_name)22return _data232425def data_split(df, start, end, target_date_col="date"):26"""27split the dataset into training or testing using date28:param data: (df) pandas dataframe, start, end29:return: (df) pandas dataframe30"""31data = df[(df[target_date_col] >= start) & (df[target_date_col] < end)]32data = data.sort_values([target_date_col, "tic"], ignore_index=True)33data.index = data[target_date_col].factorize()[0]34return data353637def convert_to_datetime(time):38time_fmt = "%Y-%m-%dT%H:%M:%S"39if isinstance(time, str):40return datetime.datetime.strptime(time, time_fmt)414243class GroupByScaler(BaseEstimator, TransformerMixin):44"""Sklearn-like scaler that scales considering groups of data.4546In the financial setting, this scale can be used to normalize a DataFrame47with time series of multiple tickers. The scaler will fit and transform48data for each ticker independently.49"""5051def __init__(self, by, scaler=MaxAbsScaler, columns=None, scaler_kwargs=None):52"""Initializes GoupBy scaler.5354Args:55by: Name of column that will be used to group.56scaler: Scikit-learn scaler class to be used.57columns: List of columns that will be scaled.58scaler_kwargs: Keyword arguments for chosen scaler.59"""60self.scalers = {} # dictionary with scalers61self.by = by62self.scaler = scaler63self.columns = columns64self.scaler_kwargs = {} if scaler_kwargs is None else scaler_kwargs6566def fit(self, X, y=None):67"""Fits the scaler to input data.6869Args:70X: DataFrame to fit.71y: Not used.7273Returns:74Fitted GroupBy scaler.75"""76# if columns aren't specified, considered all numeric columns77if self.columns is None:78self.columns = X.select_dtypes(exclude=["object"]).columns79# fit one scaler for each group80for value in X[self.by].unique():81X_group = X.loc[X[self.by] == value, self.columns]82self.scalers[value] = self.scaler(**self.scaler_kwargs).fit(X_group)83return self8485def transform(self, X, y=None):86"""Transforms unscaled data.8788Args:89X: DataFrame to transform.90y: Not used.9192Returns:93Transformed DataFrame.94"""95# apply scaler for each group96X = X.copy()97for value in X[self.by].unique():98select_mask = X[self.by] == value99X.loc[select_mask, self.columns] = self.scalers[value].transform(100X.loc[select_mask, self.columns]101)102return X103104105class FeatureEngineer:106"""Provides methods for preprocessing the stock price data107108Attributes109----------110use_technical_indicator : boolean111we technical indicator or not112tech_indicator_list : list113a list of technical indicator names (modified from neofinrl_config.py)114use_turbulence : boolean115use turbulence index or not116user_defined_feature:boolean117use user defined features or not118119Methods120-------121preprocess_data()122main method to do the feature engineering123124"""125126def __init__(127self,128use_technical_indicator=True,129tech_indicator_list=config.INDICATORS,130use_vix=False,131use_turbulence=False,132user_defined_feature=False,133):134self.use_technical_indicator = use_technical_indicator135self.tech_indicator_list = tech_indicator_list136self.use_vix = use_vix137self.use_turbulence = use_turbulence138self.user_defined_feature = user_defined_feature139140def preprocess_data(self, df):141"""main method to do the feature engineering142@:param config: source dataframe143@:return: a DataMatrices object144"""145# clean data146df = self.clean_data(df)147148# add technical indicators using stockstats149if self.use_technical_indicator:150df = self.add_technical_indicator(df)151print("Successfully added technical indicators")152153# add vix for multiple stock154if self.use_vix:155df = self.add_vix(df)156print("Successfully added vix")157158# add turbulence index for multiple stock159if self.use_turbulence:160df = self.add_turbulence(df)161print("Successfully added turbulence index")162163# add user defined feature164if self.user_defined_feature:165df = self.add_user_defined_feature(df)166print("Successfully added user defined features")167168# fill the missing values at the beginning and the end169df = df.ffill().bfill()170return df171172def clean_data(self, data):173"""174clean the raw data175deal with missing values176reasons: stocks could be delisted, not incorporated at the time step177:param data: (df) pandas dataframe178:return: (df) pandas dataframe179"""180df = data.copy()181df = df.sort_values(["date", "tic"], ignore_index=True)182df.index = df.date.factorize()[0]183merged_closes = df.pivot_table(index="date", columns="tic", values="close")184merged_closes = merged_closes.dropna(axis=1)185tics = merged_closes.columns186df = df[df.tic.isin(tics)]187# df = data.copy()188# list_ticker = df["tic"].unique().tolist()189# only apply to daily level data, need to fix for minute level190# list_date = list(pd.date_range(df['date'].min(),df['date'].max()).astype(str))191# combination = list(itertools.product(list_date,list_ticker))192193# df_full = pd.DataFrame(combination,columns=["date","tic"]).merge(df,on=["date","tic"],how="left")194# df_full = df_full[df_full['date'].isin(df['date'])]195# df_full = df_full.sort_values(['date','tic'])196# df_full = df_full.fillna(0)197return df198199def add_technical_indicator(self, data):200"""201calculate technical indicators202use stockstats package to add technical inidactors203:param data: (df) pandas dataframe204:return: (df) pandas dataframe205"""206df = data.copy()207df = df.sort_values(by=["tic", "date"])208stock = Sdf.retype(df.copy())209unique_ticker = stock.tic.unique()210211for indicator in self.tech_indicator_list:212indicator_df = pd.DataFrame()213for i in range(len(unique_ticker)):214try:215temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]216temp_indicator = pd.DataFrame(temp_indicator)217temp_indicator["tic"] = unique_ticker[i]218temp_indicator["date"] = df[df.tic == unique_ticker[i]][219"date"220].to_list()221# indicator_df = indicator_df.append(222# temp_indicator, ignore_index=True223# )224indicator_df = pd.concat(225[indicator_df, temp_indicator], axis=0, ignore_index=True226)227except Exception as e:228print(e)229df = df.merge(230indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"231)232df = df.sort_values(by=["date", "tic"])233return df234# df = data.set_index(['date','tic']).sort_index()235# df = df.join(df.groupby(level=0, group_keys=False).apply(lambda x, y: Sdf.retype(x)[y], y=self.tech_indicator_list))236# return df.reset_index()237238def add_user_defined_feature(self, data):239"""240add user defined features241:param data: (df) pandas dataframe242:return: (df) pandas dataframe243"""244df = data.copy()245df["daily_return"] = df.close.pct_change(1)246# df['return_lag_1']=df.close.pct_change(2)247# df['return_lag_2']=df.close.pct_change(3)248# df['return_lag_3']=df.close.pct_change(4)249# df['return_lag_4']=df.close.pct_change(5)250return df251252def add_vix(self, data):253"""254add vix from yahoo finance255:param data: (df) pandas dataframe256:return: (df) pandas dataframe257"""258df = data.copy()259df_vix = YahooDownloader(260start_date=df.date.min(), end_date=df.date.max(), ticker_list=["^VIX"]261).fetch_data()262vix = df_vix[["date", "close"]]263vix.columns = ["date", "vix"]264265df = df.merge(vix, on="date")266df = df.sort_values(["date", "tic"]).reset_index(drop=True)267return df268269def add_turbulence(self, data):270"""271add turbulence index from a precalcualted dataframe272:param data: (df) pandas dataframe273:return: (df) pandas dataframe274"""275df = data.copy()276turbulence_index = self.calculate_turbulence(df)277df = df.merge(turbulence_index, on="date")278df = df.sort_values(["date", "tic"]).reset_index(drop=True)279return df280281def calculate_turbulence(self, data):282"""calculate turbulence index based on dow 30"""283# can add other market assets284df = data.copy()285df_price_pivot = df.pivot(index="date", columns="tic", values="close")286# use returns to calculate turbulence287df_price_pivot = df_price_pivot.pct_change()288289unique_date = df.date.unique()290# start after a year291start = 252292turbulence_index = [0] * start293# turbulence_index = [0]294count = 0295for i in range(start, len(unique_date)):296current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]297# use one year rolling window to calcualte covariance298hist_price = df_price_pivot[299(df_price_pivot.index < unique_date[i])300& (df_price_pivot.index >= unique_date[i - 252])301]302# Drop tickers which has number missing values more than the "oldest" ticker303filtered_hist_price = hist_price.iloc[304hist_price.isna().sum().min() :305].dropna(axis=1)306307cov_temp = filtered_hist_price.cov()308current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(309filtered_hist_price, axis=0310)311# cov_temp = hist_price.cov()312# current_temp=(current_price - np.mean(hist_price,axis=0))313314temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(315current_temp.values.T316)317if temp > 0:318count += 1319if count > 2:320turbulence_temp = temp[0][0]321else:322# avoid large outlier because of the calculation just begins323turbulence_temp = 0324else:325turbulence_temp = 0326turbulence_index.append(turbulence_temp)327try:328turbulence_index = pd.DataFrame(329{"date": df_price_pivot.index, "turbulence": turbulence_index}330)331except ValueError:332raise Exception("Turbulence information could not be added.")333return turbulence_index334335336