Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/data_processors/processor_wrds.py
732 views
1
from __future__ import annotations
2
3
import datetime
4
5
import numpy as np
6
import pandas as pd
7
import pandas_market_calendars as tc
8
import pytz
9
import wrds
10
from stockstats import StockDataFrame as Sdf
11
12
pd.options.mode.chained_assignment = None
13
14
15
class WrdsProcessor:
16
def __init__(self, if_offline=False):
17
if not if_offline:
18
self.db = wrds.Connection()
19
20
def download_data(
21
self,
22
start_date,
23
end_date,
24
ticker_list,
25
time_interval,
26
if_save_tempfile=False,
27
filter_shares=0,
28
):
29
self.start = start_date
30
self.end = end_date
31
self.time_interval = time_interval
32
33
def get_trading_days(start, end):
34
nyse = tc.get_calendar("NYSE")
35
# df = nyse.sessions_in_range(
36
# pd.Timestamp(start, tz=pytz.UTC), pd.Timestamp(end, tz=pytz.UTC)
37
# )
38
df = nyse.date_range_htf("1D", pd.Timestamp(start), pd.Timestamp(end))
39
trading_days = []
40
for day in df:
41
trading_days.append(str(day)[:10])
42
return trading_days
43
44
def data_fetch_wrds(date, stock_set):
45
# start_date, end_date should be in the same year
46
current_date = datetime.datetime.strptime(date, "%Y-%m-%d")
47
lib = "taqm_" + str(current_date.year) # taqm_2021
48
table = "ctm_" + current_date.strftime("%Y%m%d") # ctm_20210501
49
50
parm = {"syms": stock_set, "num_shares": filter_shares}
51
try:
52
data = self.db.raw_sql(
53
"select * from "
54
+ lib
55
+ "."
56
+ table
57
+ " where sym_root in %(syms)s "
58
+ "and time_m between '9:30:00' and '16:00:00' and size > %(num_shares)s and sym_suffix is null",
59
params=parm,
60
)
61
if_empty = False
62
return data, if_empty
63
except BaseException:
64
print("Data for date: " + date + " error")
65
if_empty = True
66
return None, if_empty
67
68
dates = get_trading_days(start_date, end_date)
69
print("Trading days: ")
70
print(dates)
71
first_time = True
72
empty = True
73
stock_set = tuple(ticker_list)
74
for i in dates:
75
x = data_fetch_wrds(i, stock_set, time_interval)
76
77
if not x[1]:
78
empty = False
79
dataset = x[0]
80
dataset = self.preprocess_to_ohlcv(
81
dataset, time_interval=(str(time_interval) + "S")
82
)
83
if first_time:
84
print("Data for date: " + i + " finished")
85
temp = dataset
86
first_time = False
87
if if_save_tempfile:
88
temp.to_csv("./temp.csv")
89
else:
90
print("Data for date: " + i + " finished")
91
temp = pd.concat([temp, dataset])
92
if if_save_tempfile:
93
temp.to_csv("./temp.csv")
94
if empty:
95
raise ValueError("Empty Data under input parameters!")
96
else:
97
result = temp
98
result = result.sort_values(by=["time", "tic"])
99
result = result.reset_index(drop=True)
100
return result
101
102
def preprocess_to_ohlcv(self, df, time_interval="60S"):
103
df = df[["date", "time_m", "sym_root", "size", "price"]]
104
tic_list = np.unique(df["sym_root"].values)
105
final_df = None
106
first_time = True
107
for i in range(len(tic_list)):
108
tic = tic_list[i]
109
time_list = []
110
temp_df = df[df["sym_root"] == tic]
111
for i in range(0, temp_df.shape[0]):
112
date = temp_df["date"].iloc[i]
113
time_m = temp_df["time_m"].iloc[i]
114
time = str(date) + " " + str(time_m)
115
try:
116
time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S.%f")
117
except BaseException:
118
time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
119
time_list.append(time)
120
temp_df["time"] = time_list
121
temp_df = temp_df.set_index("time")
122
data_ohlc = temp_df["price"].resample(time_interval).ohlc()
123
data_v = temp_df["size"].resample(time_interval).agg({"size": "sum"})
124
volume = data_v["size"].values
125
data_ohlc["volume"] = volume
126
data_ohlc["tic"] = tic
127
if first_time:
128
final_df = data_ohlc.reset_index()
129
first_time = False
130
else:
131
final_df = final_df.append(data_ohlc.reset_index(), ignore_index=True)
132
return final_df
133
134
def clean_data(self, df):
135
df = df[["time", "open", "high", "low", "close", "volume", "tic"]]
136
# remove 16:00 data
137
tic_list = np.unique(df["tic"].values)
138
ary = df.values
139
rows_1600 = []
140
for i in range(ary.shape[0]):
141
row = ary[i]
142
time = row[0]
143
if str(time)[-8:] == "16:00:00":
144
rows_1600.append(i)
145
146
df = df.drop(rows_1600)
147
df = df.sort_values(by=["tic", "time"])
148
149
# check missing rows
150
tic_dic = {}
151
for tic in tic_list:
152
tic_dic[tic] = [0, 0]
153
ary = df.values
154
for i in range(ary.shape[0]):
155
row = ary[i]
156
volume = row[5]
157
tic = row[6]
158
if volume != 0:
159
tic_dic[tic][0] += 1
160
tic_dic[tic][1] += 1
161
constant = np.unique(df["time"].values).shape[0]
162
nan_tics = []
163
for tic in tic_dic:
164
if tic_dic[tic][1] != constant:
165
nan_tics.append(tic)
166
# fill missing rows
167
normal_time = np.unique(df["time"].values)
168
169
df2 = df.copy()
170
for tic in nan_tics:
171
tic_time = df[df["tic"] == tic]["time"].values
172
missing_time = []
173
for i in normal_time:
174
if i not in tic_time:
175
missing_time.append(i)
176
for time in missing_time:
177
temp_df = pd.DataFrame(
178
[[time, np.nan, np.nan, np.nan, np.nan, 0, tic]],
179
columns=["time", "open", "high", "low", "close", "volume", "tic"],
180
)
181
# df2 = df2.append(temp_df, ignore_index=True)
182
df2 = pd.concat([df2, temp_df], axis=0, ignore_index=True)
183
184
# fill nan data
185
df = df2.sort_values(by=["tic", "time"])
186
for i in range(df.shape[0]):
187
if float(df.iloc[i]["volume"]) == 0:
188
previous_close = df.iloc[i - 1]["close"]
189
if str(previous_close) == "nan":
190
raise ValueError("Error nan price")
191
df.iloc[i, 1] = previous_close
192
df.iloc[i, 2] = previous_close
193
df.iloc[i, 3] = previous_close
194
df.iloc[i, 4] = previous_close
195
# check if nan
196
ary = df[["open", "high", "low", "close", "volume"]].values
197
assert not np.isnan(np.min(ary))
198
# final preprocess
199
df = df[["time", "open", "high", "low", "close", "volume", "tic"]]
200
df = df.reset_index(drop=True)
201
print("Data clean finished")
202
return df
203
204
def add_technical_indicator(
205
self,
206
df,
207
tech_indicator_list=[
208
"macd",
209
"boll_ub",
210
"boll_lb",
211
"rsi_30",
212
"dx_30",
213
"close_30_sma",
214
"close_60_sma",
215
],
216
):
217
df = df.rename(columns={"time": "date"})
218
df = df.copy()
219
df = df.sort_values(by=["tic", "date"])
220
stock = Sdf.retype(df.copy())
221
unique_ticker = stock.tic.unique()
222
tech_indicator_list = tech_indicator_list
223
224
for indicator in tech_indicator_list:
225
indicator_df = pd.DataFrame()
226
for i in range(len(unique_ticker)):
227
# print(unique_ticker[i], i)
228
temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
229
temp_indicator = pd.DataFrame(temp_indicator)
230
temp_indicator["tic"] = unique_ticker[i]
231
# print(len(df[df.tic == unique_ticker[i]]['date'].to_list()))
232
temp_indicator["date"] = df[df.tic == unique_ticker[i]][
233
"date"
234
].to_list()
235
# indicator_df = indicator_df.append(temp_indicator, ignore_index=True)
236
indicator_df = pd.concat(
237
[indicator_df, temp_indicator], axis=0, ignore_index=True
238
)
239
240
df = df.merge(
241
indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"
242
)
243
df = df.sort_values(by=["date", "tic"])
244
print("Succesfully add technical indicators")
245
return df
246
247
def calculate_turbulence(self, data, time_period=252):
248
# can add other market assets
249
df = data.copy()
250
df_price_pivot = df.pivot(index="date", columns="tic", values="close")
251
# use returns to calculate turbulence
252
df_price_pivot = df_price_pivot.pct_change()
253
254
unique_date = df.date.unique()
255
# start after a fixed time period
256
start = time_period
257
turbulence_index = [0] * start
258
# turbulence_index = [0]
259
count = 0
260
for i in range(start, len(unique_date)):
261
current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
262
# use one year rolling window to calcualte covariance
263
hist_price = df_price_pivot[
264
(df_price_pivot.index < unique_date[i])
265
& (df_price_pivot.index >= unique_date[i - time_period])
266
]
267
# Drop tickers which has number missing values more than the "oldest" ticker
268
filtered_hist_price = hist_price.iloc[
269
hist_price.isna().sum().min() :
270
].dropna(axis=1)
271
272
cov_temp = filtered_hist_price.cov()
273
current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(
274
filtered_hist_price, axis=0
275
)
276
temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
277
current_temp.values.T
278
)
279
if temp > 0:
280
count += 1
281
if count > 2:
282
turbulence_temp = temp[0][0]
283
else:
284
# avoid large outlier because of the calculation just begins
285
turbulence_temp = 0
286
else:
287
turbulence_temp = 0
288
turbulence_index.append(turbulence_temp)
289
290
turbulence_index = pd.DataFrame(
291
{"date": df_price_pivot.index, "turbulence": turbulence_index}
292
)
293
return turbulence_index
294
295
def add_turbulence(self, data, time_period=252):
296
"""
297
add turbulence index from a precalcualted dataframe
298
:param data: (df) pandas dataframe
299
:return: (df) pandas dataframe
300
"""
301
df = data.copy()
302
turbulence_index = self.calculate_turbulence(df, time_period=time_period)
303
df = df.merge(turbulence_index, on="date")
304
df = df.sort_values(["date", "tic"]).reset_index(drop=True)
305
return df
306
307
def add_vix(self, data):
308
vix_df = self.download_data(
309
["vix"], self.start, self.end_date, self.time_interval
310
)
311
cleaned_vix = self.clean_data(vix_df)
312
vix = cleaned_vix[["date", "close"]]
313
314
df = data.copy()
315
df = df.merge(vix, on="date")
316
df = df.sort_values(["date", "tic"]).reset_index(drop=True)
317
318
return df
319
320
def df_to_array(self, df, tech_indicator_list):
321
unique_ticker = df.tic.unique()
322
print(unique_ticker)
323
if_first_time = True
324
for tic in unique_ticker:
325
if if_first_time:
326
price_array = df[df.tic == tic][["close"]].values
327
# price_ary = df[df.tic==tic]['close'].values
328
tech_array = df[df.tic == tic][tech_indicator_list].values
329
turbulence_array = df[df.tic == tic]["turbulence"].values
330
if_first_time = False
331
else:
332
price_array = np.hstack(
333
[price_array, df[df.tic == tic][["close"]].values]
334
)
335
tech_array = np.hstack(
336
[tech_array, df[df.tic == tic][tech_indicator_list].values]
337
)
338
print("Successfully transformed into array")
339
return price_array, tech_array, turbulence_array
340
341