Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/data_processors/processor_eodhd.py
732 views
1
from __future__ import annotations
2
3
import datetime
4
import os
5
import time
6
from datetime import datetime
7
from datetime import timedelta
8
9
import numpy as np
10
import pandas as pd
11
import pandas_market_calendars as tc
12
import pytz
13
import requests
14
from stockstats import StockDataFrame as Sdf
15
16
17
class EodhdProcessor:
18
def __init__(self, csv_folder="./"):
19
self.csv_folder = csv_folder
20
pass
21
22
#
23
def download(self, api_token, dl_vix=True):
24
"""Fetches data from EODHD API
25
Parameters
26
----------
27
api_token: token from the EODHD api (All-in-one plan)
28
data_path: path where to save the csv files (of each ticker)
29
30
Returns
31
-------
32
none
33
"""
34
35
API_TOKEN = api_token # Replace with your API token
36
37
# Step 1: Get NASDAQ 100 components
38
nasdaq_100_ticker_url = f"https://eodhd.com/api/fundamentals/NDX.INDX?api_token={API_TOKEN}&fmt=json&filter=Components"
39
40
response = requests.get(nasdaq_100_ticker_url).json()
41
42
# Extract tickers
43
tickers = [data["Code"] for data in response.values()]
44
45
print(f"{len(tickers)} tickers data to be downloaded")
46
47
# Step 2: Fetch historical minute data for each ticker
48
start_date = datetime.datetime(
49
2016, 1, 1
50
) # Earliest available data for NASDAQ at EODHD
51
# end_date = datetime.datetime.now() # Today
52
end_date = datetime.datetime(2016, 1, 5) #
53
interval = "1m" # 1-minute interval
54
55
if dl_vix:
56
tickers.append("VIX")
57
58
for ticker in tickers:
59
60
all_ticker_data = []
61
print(f"Fetching data for {ticker}...")
62
63
start_timestamp = int(time.mktime(start_date.timetuple()))
64
end_timestamp = int(time.mktime(end_date.timetuple()))
65
66
while start_timestamp < end_timestamp:
67
next_timestamp = start_timestamp + (
68
120 * 24 * 60 * 60
69
) # 120 days max per request
70
if next_timestamp > end_timestamp:
71
next_timestamp = end_timestamp
72
73
if ticker == VIX:
74
url = f"https://eodhd.com/api/intraday/VIX.INDX?interval={interval}&from={start_timestamp}&to={next_timestamp}&api_token={API_TOKEN}&fmt=json"
75
else:
76
url = f"https://eodhd.com/api/intraday/{ticker}.US?interval={interval}&api_token={API_TOKEN}&fmt=json&from={start_timestamp}&to={next_timestamp}"
77
78
response = requests.get(url)
79
80
if response.status_code == 200:
81
data = response.json()
82
if data:
83
df = pd.DataFrame(data)
84
df["ticker"] = ticker # Add ticker column
85
all_ticker_data.append(df)
86
else:
87
print(f"Error fetching data for {ticker}: {response.text}")
88
89
# Move to next 120-day period
90
start_timestamp = next_timestamp
91
time.sleep(1) # Respect API rate limits
92
93
# Step 3: Save the data
94
if all_ticker_data:
95
96
final_df = pd.concat(all_ticker_data)
97
final_df.to_csv(
98
self.csv_folder + "nasdaq_100_minute_data_" + ticker + ".csv",
99
index=False,
100
)
101
print("Data saved to nasdaq_100_minute_data_" + ticker + ".csv")
102
103
else:
104
print("No data retrieved.")
105
106
return
107
108
def add_day_column(self):
109
"""add a Day column to all csv in csv_folder
110
Parameters
111
----------
112
113
Returns
114
-------
115
the max number of days
116
"""
117
118
# Step 1: First pass to collect all unique dates
119
all_dates = []
120
121
max_days = 0
122
123
for filename in os.listdir(self.csv_folder):
124
if filename.endswith(".csv"):
125
file_path = os.path.join(self.csv_folder, filename)
126
df = pd.read_csv(file_path)
127
128
if "datetime" in df.columns:
129
converted = pd.to_datetime(df["datetime"], errors="coerce")
130
dates = converted.dt.date.dropna().tolist()
131
all_dates.extend(dates)
132
133
# Sort and create date index dictionary
134
unique_dates = sorted(set(all_dates))
135
date_index_dict = {str(date): idx for idx, date in enumerate(unique_dates)}
136
137
max_days = len(date_index_dict)
138
139
print("Date index dictionary created.\n")
140
141
# Step 2: Second pass to update each file with the 'Day' column
142
for filename in os.listdir(self.csv_folder):
143
if filename.endswith(".csv"):
144
file_path = os.path.join(self.csv_folder, filename)
145
df = pd.read_csv(file_path)
146
147
if "datetime" in df.columns:
148
# Convert datetime column to datetime objects
149
converted = pd.to_datetime(df["datetime"], errors="coerce")
150
151
# Map to day indices
152
day_indices = converted.dt.date.map(
153
lambda d: date_index_dict.get(str(d), None)
154
)
155
156
# Add the Day column
157
df["Day"] = day_indices
158
159
# Save the updated file back
160
df.to_csv(file_path, index=False)
161
162
print(f"Saved updated file: {filename}")
163
164
return max_days
165
166
def tics_in_more_than_90perc_days(self, max_days):
167
168
sup_to_90 = 0
169
tics_present_in_more_than_90_per = []
170
171
for filename in os.listdir(self.csv_folder):
172
173
print(f"self.csv_folder {self.csv_folder}")
174
print(f"filename {filename}")
175
176
if filename.endswith(".csv"):
177
file_path = os.path.join(self.csv_folder, filename)
178
try:
179
df = pd.read_csv(file_path)
180
181
if "Day" not in df.columns:
182
print(f"{filename}: 'Day' column not found.")
183
continue
184
185
# Drop NA values and ensure the column is integer type
186
unique_days = (
187
pd.to_numeric(df["Day"], errors="coerce")
188
.dropna()
189
.astype(int)
190
.nunique()
191
)
192
percentage = (unique_days / max_days) * 100
193
194
if percentage > 90:
195
sup_to_90 += 1
196
tics_present_in_more_than_90_per.append(str(df["ticker"][0]))
197
198
print(f"{filename}: {unique_days} unique days ({percentage:.2f}%)")
199
200
except Exception as e:
201
print(f"{filename}: Error reading file - {e}")
202
203
return tics_present_in_more_than_90_per
204
205
def nber_present_tics_per_day(self, max_days, tics_in_more_than_90perc_days):
206
207
dico_ints = {i: 0 for i in range(max_days + 1)}
208
counter = 0
209
210
for filename in os.listdir(self.csv_folder):
211
if filename.endswith(".csv"):
212
# print(counter)
213
# print(self.csv_folder)
214
# print(filename)
215
216
file_path = os.path.join(self.csv_folder, filename)
217
try:
218
# print("in try 0")
219
df = pd.read_csv(file_path)
220
# print("in try 1")
221
if str(df["ticker"][0]) in tics_in_more_than_90perc_days:
222
# print("in try 2")
223
if "Day" not in df.columns:
224
# print("in try 3")
225
print(f"{filename}: 'Day' column not found.")
226
continue
227
else:
228
# print("in try 4")
229
unique_days = set(df["Day"].tolist())
230
231
for num in unique_days:
232
dico_ints[num] += 1
233
234
except Exception as e:
235
print(f"{filename}: Error reading file - {e}")
236
exit()
237
238
counter += 1
239
240
return dico_ints
241
242
def process_after_dl(self):
243
244
# add a day column
245
max_days = self.add_day_column()
246
247
# find the tics that are present in more than 90% of the days
248
tics_in_more_than_90perc_days = self.tics_in_more_than_90perc_days(max_days)
249
250
# print(tics_in_more_than_90perc_days) # ['WDAY', 'ADP', 'XEL', 'VRTX', 'AAPL', 'VRSK', 'ADBE', 'ADI']
251
252
# create the dict of days (keys) and number of present tics (values)
253
dico_ints = self.nber_present_tics_per_day(
254
max_days, tics_in_more_than_90perc_days
255
)
256
257
# for each key (day) if the number of present tics is = tics_in_90%, add the Day id to days_to_keep
258
num_of_good_ticks = len(tics_in_more_than_90perc_days)
259
days_to_keep = []
260
for k, v in dico_ints.items():
261
if v == num_of_good_ticks:
262
days_to_keep.append(k)
263
264
# loop over each tic CSV and remove non wished days
265
df_list = []
266
for filename in os.listdir(self.csv_folder):
267
268
if filename.endswith(".csv"):
269
print(f"removed uncomplete days from {filename}")
270
file_path = os.path.join(self.csv_folder, filename)
271
try:
272
df = pd.read_csv(file_path)
273
if df["ticker"].iloc[0] in tics_in_more_than_90perc_days:
274
filtered_df = df[df["Day"].isin(days_to_keep)]
275
df_list.append(filtered_df.sort_values(by="Day"))
276
# if str(df["ticker"][0]) in tics_present_in_more_than_90_per:
277
except Exception as e:
278
print(f"{filename}: Error reading file - {e}")
279
280
df.loc[df["ticker"] != "VIX", "volume"] = df.loc[
281
df["ticker"] != "VIX", "volume"
282
].astype(int)
283
284
df = pd.concat(df_list, ignore_index=True)
285
286
# Reset the Days integers
287
unique_days = df["Day"].unique()
288
mapping = {old: new for new, old in enumerate(sorted(unique_days))}
289
df["Day"] = df["Day"].map(mapping)
290
291
return df
292
293
def clean_data(self, df, min_24=True):
294
295
df.rename(columns={"ticker": "tic"}, inplace=True)
296
df.rename(columns={"datetime": "time"}, inplace=True)
297
df["time"] = pd.to_datetime(df["time"])
298
299
df = df[["time", "open", "high", "low", "close", "volume", "tic", "Day"]]
300
301
# remove 16:00 data
302
df.drop(df[df["time"].astype(str).str.endswith("16:00:00")].index, inplace=True)
303
304
df.sort_values(by=["tic", "time"], inplace=True)
305
df.reset_index(drop=True, inplace=True)
306
307
tics = df["tic"].unique()
308
days = df["Day"].unique()
309
310
start_time = df["time"].min()
311
end_time = df["time"].max()
312
start_time = df["time"].min().replace(hour=0, minute=0, second=0)
313
end_time = df["time"].max().replace(hour=23, minute=59, second=0)
314
time_range = pd.date_range(
315
start=start_time, end=end_time, freq="min"
316
) # 'T' is minute frequency
317
minute_df = pd.DataFrame({"time": time_range})
318
319
# ADDING MISSING ROWS
320
for tic in tics:
321
322
print(f"Adding Missing Rows for tic {tic}")
323
324
for day in days:
325
326
# 0) Create the sub df of the missing times
327
328
times_for_this_tic_and_day = df.loc[
329
(df["Day"] == day) & (df["tic"] == tic), "time"
330
]
331
times_for_this_tic_and_day = pd.to_datetime(times_for_this_tic_and_day)
332
333
if min_24:
334
specific_day = (
335
df.loc[(df["Day"] == day) & (df["tic"] == tic), "time"]
336
.iloc[0]
337
.date()
338
)
339
filtered_minute_df = minute_df[
340
minute_df["time"].dt.date == pd.to_datetime(specific_day).date()
341
]
342
filtered_minute_df["time"] = pd.to_datetime(
343
filtered_minute_df["time"]
344
)
345
missing_times = filtered_minute_df[
346
~filtered_minute_df["time"].isin(times_for_this_tic_and_day)
347
]
348
else:
349
# all times across all tics, for the day
350
# Filter the DataFrame for the given Day (e.g., day_value = 3)
351
existing_time_values = df[df["Day"] == day]["time"].unique()
352
353
existing_time_df = pd.DataFrame(
354
{"time": pd.to_datetime(existing_time_values)}
355
)
356
missing_times = existing_time_df[
357
~existing_time_df["time"].isin(times_for_this_tic_and_day)
358
]
359
360
missing_times["open"] = np.nan # float64
361
missing_times["high"] = np.nan # float64
362
missing_times["low"] = np.nan # float64
363
missing_times["close"] = np.nan # float64
364
missing_times["volume"] = np.nan # float64
365
missing_times["tic"] = tic # object (empty string is still an object)
366
missing_times["Day"] = day # int64
367
missing_times = missing_times.astype(
368
{
369
"open": "float64",
370
"high": "float64",
371
"low": "float64",
372
"close": "float64",
373
"volume": "float64",
374
"tic": "object",
375
"Day": "int64",
376
}
377
)
378
379
# 1) Add the sub df (missing_times) to df
380
# Example: insert after the last row where Day = 2 and tic = "AAPL"
381
mask = (df["Day"] == day) & (df["tic"] == tic)
382
insert_index = df[mask].index.max()
383
384
# Split df_orig and insert df in between
385
df_before = df.iloc[: insert_index + 1]
386
df_after = df.iloc[insert_index + 1 :]
387
388
df = pd.concat([df_before, missing_times, df_after], ignore_index=True)
389
390
df.sort_values(by=["tic", "time"], inplace=True)
391
df.reset_index(drop=True, inplace=True)
392
393
# Replace all 0 volume with a Nan (to allow for ffill and bfill to work)
394
df.loc[df["volume"] == 0, "volume"] = np.nan
395
396
## FILLING THE MISSING ROWS
397
for tic in tics:
398
399
print(f"Filling Missing Rows for tic {tic}")
400
401
cols_to_ffill = ["close", "open", "high", "low", "volume"]
402
df.loc[df["tic"] == tic, cols_to_ffill] = (
403
df.loc[df["tic"] == tic, cols_to_ffill].ffill().bfill()
404
)
405
406
return df
407
408
def add_technical_indicator(
409
self,
410
df,
411
tech_indicator_list=[
412
"macd",
413
"boll_ub",
414
"boll_lb",
415
"rsi_30",
416
"dx_30",
417
"close_30_sma",
418
"close_60_sma",
419
],
420
):
421
df = df.rename(columns={"time": "date"})
422
df = df.copy()
423
df = df.sort_values(by=["tic", "date"])
424
stock = Sdf.retype(df.copy())
425
unique_ticker = stock.tic.unique()
426
tech_indicator_list = tech_indicator_list
427
428
for indicator in tech_indicator_list:
429
print(f"doing indicator {indicator}")
430
indicator_df = pd.DataFrame()
431
for i in range(len(unique_ticker)):
432
# print(unique_ticker[i], i)
433
temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
434
temp_indicator = pd.DataFrame(temp_indicator)
435
temp_indicator["tic"] = unique_ticker[i]
436
# print(len(df[df.tic == unique_ticker[i]]['date'].to_list()))
437
temp_indicator["date"] = df[df.tic == unique_ticker[i]][
438
"date"
439
].to_list()
440
# indicator_df = indicator_df.append(temp_indicator, ignore_index=True)
441
indicator_df = pd.concat(
442
[indicator_df, temp_indicator], axis=0, ignore_index=True
443
)
444
445
df = df.merge(
446
indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"
447
)
448
df = df.sort_values(by=["date", "tic"])
449
print("Succesfully add technical indicators")
450
return df
451
452
def calculate_turbulence(self, data, time_period=252):
453
# can add other market assets
454
df = data.copy()
455
df_price_pivot = df.pivot(index="date", columns="tic", values="close")
456
# use returns to calculate turbulence
457
df_price_pivot = df_price_pivot.pct_change()
458
459
unique_date = df.date.unique()
460
# start after a fixed time period
461
start = time_period
462
turbulence_index = [0] * start
463
# turbulence_index = [0]
464
count = 0
465
for i in range(start, len(unique_date)):
466
current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
467
# use one year rolling window to calcualte covariance
468
hist_price = df_price_pivot[
469
(df_price_pivot.index < unique_date[i])
470
& (df_price_pivot.index >= unique_date[i - time_period])
471
]
472
# Drop tickers which has number missing values more than the "oldest" ticker
473
filtered_hist_price = hist_price.iloc[
474
hist_price.isna().sum().min() :
475
].dropna(axis=1)
476
477
cov_temp = filtered_hist_price.cov()
478
current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(
479
filtered_hist_price, axis=0
480
)
481
temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
482
current_temp.values.T
483
)
484
if temp > 0:
485
count += 1
486
if count > 2:
487
turbulence_temp = temp[0][0]
488
else:
489
# avoid large outlier because of the calculation just begins
490
turbulence_temp = 0
491
else:
492
turbulence_temp = 0
493
turbulence_index.append(turbulence_temp)
494
495
turbulence_index = pd.DataFrame(
496
{"date": df_price_pivot.index, "turbulence": turbulence_index}
497
)
498
return turbulence_index
499
500
def add_turbulence(self, data, time_period=252):
501
"""
502
add turbulence index from a precalcualted dataframe
503
:param data: (df) pandas dataframe
504
:return: (df) pandas dataframe
505
"""
506
df = data.copy()
507
turbulence_index = self.calculate_turbulence(df, time_period=time_period)
508
df = df.merge(turbulence_index, on="date")
509
df = df.sort_values(["date", "tic"]).reset_index(drop=True)
510
return df
511
512