Path: blob/master/finrl/meta/data_processors/processor_wrds.py
732 views
from __future__ import annotations12import datetime34import numpy as np5import pandas as pd6import pandas_market_calendars as tc7import pytz8import wrds9from stockstats import StockDataFrame as Sdf1011pd.options.mode.chained_assignment = None121314class WrdsProcessor:15def __init__(self, if_offline=False):16if not if_offline:17self.db = wrds.Connection()1819def download_data(20self,21start_date,22end_date,23ticker_list,24time_interval,25if_save_tempfile=False,26filter_shares=0,27):28self.start = start_date29self.end = end_date30self.time_interval = time_interval3132def get_trading_days(start, end):33nyse = tc.get_calendar("NYSE")34# df = nyse.sessions_in_range(35# pd.Timestamp(start, tz=pytz.UTC), pd.Timestamp(end, tz=pytz.UTC)36# )37df = nyse.date_range_htf("1D", pd.Timestamp(start), pd.Timestamp(end))38trading_days = []39for day in df:40trading_days.append(str(day)[:10])41return trading_days4243def data_fetch_wrds(date, stock_set):44# start_date, end_date should be in the same year45current_date = datetime.datetime.strptime(date, "%Y-%m-%d")46lib = "taqm_" + str(current_date.year) # taqm_202147table = "ctm_" + current_date.strftime("%Y%m%d") # ctm_202105014849parm = {"syms": stock_set, "num_shares": filter_shares}50try:51data = self.db.raw_sql(52"select * from "53+ lib54+ "."55+ table56+ " where sym_root in %(syms)s "57+ "and time_m between '9:30:00' and '16:00:00' and size > %(num_shares)s and sym_suffix is null",58params=parm,59)60if_empty = False61return data, if_empty62except BaseException:63print("Data for date: " + date + " error")64if_empty = True65return None, if_empty6667dates = get_trading_days(start_date, end_date)68print("Trading days: ")69print(dates)70first_time = True71empty = True72stock_set = tuple(ticker_list)73for i in dates:74x = data_fetch_wrds(i, stock_set, time_interval)7576if not x[1]:77empty = False78dataset = x[0]79dataset = self.preprocess_to_ohlcv(80dataset, time_interval=(str(time_interval) + "S")81)82if first_time:83print("Data for date: " + i + " finished")84temp = dataset85first_time = False86if if_save_tempfile:87temp.to_csv("./temp.csv")88else:89print("Data for date: " + i + " finished")90temp = pd.concat([temp, dataset])91if if_save_tempfile:92temp.to_csv("./temp.csv")93if empty:94raise ValueError("Empty Data under input parameters!")95else:96result = temp97result = result.sort_values(by=["time", "tic"])98result = result.reset_index(drop=True)99return result100101def preprocess_to_ohlcv(self, df, time_interval="60S"):102df = df[["date", "time_m", "sym_root", "size", "price"]]103tic_list = np.unique(df["sym_root"].values)104final_df = None105first_time = True106for i in range(len(tic_list)):107tic = tic_list[i]108time_list = []109temp_df = df[df["sym_root"] == tic]110for i in range(0, temp_df.shape[0]):111date = temp_df["date"].iloc[i]112time_m = temp_df["time_m"].iloc[i]113time = str(date) + " " + str(time_m)114try:115time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S.%f")116except BaseException:117time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S")118time_list.append(time)119temp_df["time"] = time_list120temp_df = temp_df.set_index("time")121data_ohlc = temp_df["price"].resample(time_interval).ohlc()122data_v = temp_df["size"].resample(time_interval).agg({"size": "sum"})123volume = data_v["size"].values124data_ohlc["volume"] = volume125data_ohlc["tic"] = tic126if first_time:127final_df = data_ohlc.reset_index()128first_time = False129else:130final_df = final_df.append(data_ohlc.reset_index(), ignore_index=True)131return final_df132133def clean_data(self, df):134df = df[["time", "open", "high", "low", "close", "volume", "tic"]]135# remove 16:00 data136tic_list = np.unique(df["tic"].values)137ary = df.values138rows_1600 = []139for i in range(ary.shape[0]):140row = ary[i]141time = row[0]142if str(time)[-8:] == "16:00:00":143rows_1600.append(i)144145df = df.drop(rows_1600)146df = df.sort_values(by=["tic", "time"])147148# check missing rows149tic_dic = {}150for tic in tic_list:151tic_dic[tic] = [0, 0]152ary = df.values153for i in range(ary.shape[0]):154row = ary[i]155volume = row[5]156tic = row[6]157if volume != 0:158tic_dic[tic][0] += 1159tic_dic[tic][1] += 1160constant = np.unique(df["time"].values).shape[0]161nan_tics = []162for tic in tic_dic:163if tic_dic[tic][1] != constant:164nan_tics.append(tic)165# fill missing rows166normal_time = np.unique(df["time"].values)167168df2 = df.copy()169for tic in nan_tics:170tic_time = df[df["tic"] == tic]["time"].values171missing_time = []172for i in normal_time:173if i not in tic_time:174missing_time.append(i)175for time in missing_time:176temp_df = pd.DataFrame(177[[time, np.nan, np.nan, np.nan, np.nan, 0, tic]],178columns=["time", "open", "high", "low", "close", "volume", "tic"],179)180# df2 = df2.append(temp_df, ignore_index=True)181df2 = pd.concat([df2, temp_df], axis=0, ignore_index=True)182183# fill nan data184df = df2.sort_values(by=["tic", "time"])185for i in range(df.shape[0]):186if float(df.iloc[i]["volume"]) == 0:187previous_close = df.iloc[i - 1]["close"]188if str(previous_close) == "nan":189raise ValueError("Error nan price")190df.iloc[i, 1] = previous_close191df.iloc[i, 2] = previous_close192df.iloc[i, 3] = previous_close193df.iloc[i, 4] = previous_close194# check if nan195ary = df[["open", "high", "low", "close", "volume"]].values196assert not np.isnan(np.min(ary))197# final preprocess198df = df[["time", "open", "high", "low", "close", "volume", "tic"]]199df = df.reset_index(drop=True)200print("Data clean finished")201return df202203def add_technical_indicator(204self,205df,206tech_indicator_list=[207"macd",208"boll_ub",209"boll_lb",210"rsi_30",211"dx_30",212"close_30_sma",213"close_60_sma",214],215):216df = df.rename(columns={"time": "date"})217df = df.copy()218df = df.sort_values(by=["tic", "date"])219stock = Sdf.retype(df.copy())220unique_ticker = stock.tic.unique()221tech_indicator_list = tech_indicator_list222223for indicator in tech_indicator_list:224indicator_df = pd.DataFrame()225for i in range(len(unique_ticker)):226# print(unique_ticker[i], i)227temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]228temp_indicator = pd.DataFrame(temp_indicator)229temp_indicator["tic"] = unique_ticker[i]230# print(len(df[df.tic == unique_ticker[i]]['date'].to_list()))231temp_indicator["date"] = df[df.tic == unique_ticker[i]][232"date"233].to_list()234# indicator_df = indicator_df.append(temp_indicator, ignore_index=True)235indicator_df = pd.concat(236[indicator_df, temp_indicator], axis=0, ignore_index=True237)238239df = df.merge(240indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"241)242df = df.sort_values(by=["date", "tic"])243print("Succesfully add technical indicators")244return df245246def calculate_turbulence(self, data, time_period=252):247# can add other market assets248df = data.copy()249df_price_pivot = df.pivot(index="date", columns="tic", values="close")250# use returns to calculate turbulence251df_price_pivot = df_price_pivot.pct_change()252253unique_date = df.date.unique()254# start after a fixed time period255start = time_period256turbulence_index = [0] * start257# turbulence_index = [0]258count = 0259for i in range(start, len(unique_date)):260current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]261# use one year rolling window to calcualte covariance262hist_price = df_price_pivot[263(df_price_pivot.index < unique_date[i])264& (df_price_pivot.index >= unique_date[i - time_period])265]266# Drop tickers which has number missing values more than the "oldest" ticker267filtered_hist_price = hist_price.iloc[268hist_price.isna().sum().min() :269].dropna(axis=1)270271cov_temp = filtered_hist_price.cov()272current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(273filtered_hist_price, axis=0274)275temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(276current_temp.values.T277)278if temp > 0:279count += 1280if count > 2:281turbulence_temp = temp[0][0]282else:283# avoid large outlier because of the calculation just begins284turbulence_temp = 0285else:286turbulence_temp = 0287turbulence_index.append(turbulence_temp)288289turbulence_index = pd.DataFrame(290{"date": df_price_pivot.index, "turbulence": turbulence_index}291)292return turbulence_index293294def add_turbulence(self, data, time_period=252):295"""296add turbulence index from a precalcualted dataframe297:param data: (df) pandas dataframe298:return: (df) pandas dataframe299"""300df = data.copy()301turbulence_index = self.calculate_turbulence(df, time_period=time_period)302df = df.merge(turbulence_index, on="date")303df = df.sort_values(["date", "tic"]).reset_index(drop=True)304return df305306def add_vix(self, data):307vix_df = self.download_data(308["vix"], self.start, self.end_date, self.time_interval309)310cleaned_vix = self.clean_data(vix_df)311vix = cleaned_vix[["date", "close"]]312313df = data.copy()314df = df.merge(vix, on="date")315df = df.sort_values(["date", "tic"]).reset_index(drop=True)316317return df318319def df_to_array(self, df, tech_indicator_list):320unique_ticker = df.tic.unique()321print(unique_ticker)322if_first_time = True323for tic in unique_ticker:324if if_first_time:325price_array = df[df.tic == tic][["close"]].values326# price_ary = df[df.tic==tic]['close'].values327tech_array = df[df.tic == tic][tech_indicator_list].values328turbulence_array = df[df.tic == tic]["turbulence"].values329if_first_time = False330else:331price_array = np.hstack(332[price_array, df[df.tic == tic][["close"]].values]333)334tech_array = np.hstack(335[tech_array, df[df.tic == tic][tech_indicator_list].values]336)337print("Successfully transformed into array")338return price_array, tech_array, turbulence_array339340341