Path: blob/master/finrl/meta/data_processors/processor_eodhd.py
732 views
from __future__ import annotations12import datetime3import os4import time5from datetime import datetime6from datetime import timedelta78import numpy as np9import pandas as pd10import pandas_market_calendars as tc11import pytz12import requests13from stockstats import StockDataFrame as Sdf141516class EodhdProcessor:17def __init__(self, csv_folder="./"):18self.csv_folder = csv_folder19pass2021#22def download(self, api_token, dl_vix=True):23"""Fetches data from EODHD API24Parameters25----------26api_token: token from the EODHD api (All-in-one plan)27data_path: path where to save the csv files (of each ticker)2829Returns30-------31none32"""3334API_TOKEN = api_token # Replace with your API token3536# Step 1: Get NASDAQ 100 components37nasdaq_100_ticker_url = f"https://eodhd.com/api/fundamentals/NDX.INDX?api_token={API_TOKEN}&fmt=json&filter=Components"3839response = requests.get(nasdaq_100_ticker_url).json()4041# Extract tickers42tickers = [data["Code"] for data in response.values()]4344print(f"{len(tickers)} tickers data to be downloaded")4546# Step 2: Fetch historical minute data for each ticker47start_date = datetime.datetime(482016, 1, 149) # Earliest available data for NASDAQ at EODHD50# end_date = datetime.datetime.now() # Today51end_date = datetime.datetime(2016, 1, 5) #52interval = "1m" # 1-minute interval5354if dl_vix:55tickers.append("VIX")5657for ticker in tickers:5859all_ticker_data = []60print(f"Fetching data for {ticker}...")6162start_timestamp = int(time.mktime(start_date.timetuple()))63end_timestamp = int(time.mktime(end_date.timetuple()))6465while start_timestamp < end_timestamp:66next_timestamp = start_timestamp + (67120 * 24 * 60 * 6068) # 120 days max per request69if next_timestamp > end_timestamp:70next_timestamp = end_timestamp7172if ticker == VIX:73url = f"https://eodhd.com/api/intraday/VIX.INDX?interval={interval}&from={start_timestamp}&to={next_timestamp}&api_token={API_TOKEN}&fmt=json"74else:75url = f"https://eodhd.com/api/intraday/{ticker}.US?interval={interval}&api_token={API_TOKEN}&fmt=json&from={start_timestamp}&to={next_timestamp}"7677response = requests.get(url)7879if response.status_code == 200:80data = response.json()81if data:82df = pd.DataFrame(data)83df["ticker"] = ticker # Add ticker column84all_ticker_data.append(df)85else:86print(f"Error fetching data for {ticker}: {response.text}")8788# Move to next 120-day period89start_timestamp = next_timestamp90time.sleep(1) # Respect API rate limits9192# Step 3: Save the data93if all_ticker_data:9495final_df = pd.concat(all_ticker_data)96final_df.to_csv(97self.csv_folder + "nasdaq_100_minute_data_" + ticker + ".csv",98index=False,99)100print("Data saved to nasdaq_100_minute_data_" + ticker + ".csv")101102else:103print("No data retrieved.")104105return106107def add_day_column(self):108"""add a Day column to all csv in csv_folder109Parameters110----------111112Returns113-------114the max number of days115"""116117# Step 1: First pass to collect all unique dates118all_dates = []119120max_days = 0121122for filename in os.listdir(self.csv_folder):123if filename.endswith(".csv"):124file_path = os.path.join(self.csv_folder, filename)125df = pd.read_csv(file_path)126127if "datetime" in df.columns:128converted = pd.to_datetime(df["datetime"], errors="coerce")129dates = converted.dt.date.dropna().tolist()130all_dates.extend(dates)131132# Sort and create date index dictionary133unique_dates = sorted(set(all_dates))134date_index_dict = {str(date): idx for idx, date in enumerate(unique_dates)}135136max_days = len(date_index_dict)137138print("Date index dictionary created.\n")139140# Step 2: Second pass to update each file with the 'Day' column141for filename in os.listdir(self.csv_folder):142if filename.endswith(".csv"):143file_path = os.path.join(self.csv_folder, filename)144df = pd.read_csv(file_path)145146if "datetime" in df.columns:147# Convert datetime column to datetime objects148converted = pd.to_datetime(df["datetime"], errors="coerce")149150# Map to day indices151day_indices = converted.dt.date.map(152lambda d: date_index_dict.get(str(d), None)153)154155# Add the Day column156df["Day"] = day_indices157158# Save the updated file back159df.to_csv(file_path, index=False)160161print(f"Saved updated file: {filename}")162163return max_days164165def tics_in_more_than_90perc_days(self, max_days):166167sup_to_90 = 0168tics_present_in_more_than_90_per = []169170for filename in os.listdir(self.csv_folder):171172print(f"self.csv_folder {self.csv_folder}")173print(f"filename {filename}")174175if filename.endswith(".csv"):176file_path = os.path.join(self.csv_folder, filename)177try:178df = pd.read_csv(file_path)179180if "Day" not in df.columns:181print(f"{filename}: 'Day' column not found.")182continue183184# Drop NA values and ensure the column is integer type185unique_days = (186pd.to_numeric(df["Day"], errors="coerce")187.dropna()188.astype(int)189.nunique()190)191percentage = (unique_days / max_days) * 100192193if percentage > 90:194sup_to_90 += 1195tics_present_in_more_than_90_per.append(str(df["ticker"][0]))196197print(f"{filename}: {unique_days} unique days ({percentage:.2f}%)")198199except Exception as e:200print(f"{filename}: Error reading file - {e}")201202return tics_present_in_more_than_90_per203204def nber_present_tics_per_day(self, max_days, tics_in_more_than_90perc_days):205206dico_ints = {i: 0 for i in range(max_days + 1)}207counter = 0208209for filename in os.listdir(self.csv_folder):210if filename.endswith(".csv"):211# print(counter)212# print(self.csv_folder)213# print(filename)214215file_path = os.path.join(self.csv_folder, filename)216try:217# print("in try 0")218df = pd.read_csv(file_path)219# print("in try 1")220if str(df["ticker"][0]) in tics_in_more_than_90perc_days:221# print("in try 2")222if "Day" not in df.columns:223# print("in try 3")224print(f"{filename}: 'Day' column not found.")225continue226else:227# print("in try 4")228unique_days = set(df["Day"].tolist())229230for num in unique_days:231dico_ints[num] += 1232233except Exception as e:234print(f"{filename}: Error reading file - {e}")235exit()236237counter += 1238239return dico_ints240241def process_after_dl(self):242243# add a day column244max_days = self.add_day_column()245246# find the tics that are present in more than 90% of the days247tics_in_more_than_90perc_days = self.tics_in_more_than_90perc_days(max_days)248249# print(tics_in_more_than_90perc_days) # ['WDAY', 'ADP', 'XEL', 'VRTX', 'AAPL', 'VRSK', 'ADBE', 'ADI']250251# create the dict of days (keys) and number of present tics (values)252dico_ints = self.nber_present_tics_per_day(253max_days, tics_in_more_than_90perc_days254)255256# for each key (day) if the number of present tics is = tics_in_90%, add the Day id to days_to_keep257num_of_good_ticks = len(tics_in_more_than_90perc_days)258days_to_keep = []259for k, v in dico_ints.items():260if v == num_of_good_ticks:261days_to_keep.append(k)262263# loop over each tic CSV and remove non wished days264df_list = []265for filename in os.listdir(self.csv_folder):266267if filename.endswith(".csv"):268print(f"removed uncomplete days from {filename}")269file_path = os.path.join(self.csv_folder, filename)270try:271df = pd.read_csv(file_path)272if df["ticker"].iloc[0] in tics_in_more_than_90perc_days:273filtered_df = df[df["Day"].isin(days_to_keep)]274df_list.append(filtered_df.sort_values(by="Day"))275# if str(df["ticker"][0]) in tics_present_in_more_than_90_per:276except Exception as e:277print(f"{filename}: Error reading file - {e}")278279df.loc[df["ticker"] != "VIX", "volume"] = df.loc[280df["ticker"] != "VIX", "volume"281].astype(int)282283df = pd.concat(df_list, ignore_index=True)284285# Reset the Days integers286unique_days = df["Day"].unique()287mapping = {old: new for new, old in enumerate(sorted(unique_days))}288df["Day"] = df["Day"].map(mapping)289290return df291292def clean_data(self, df, min_24=True):293294df.rename(columns={"ticker": "tic"}, inplace=True)295df.rename(columns={"datetime": "time"}, inplace=True)296df["time"] = pd.to_datetime(df["time"])297298df = df[["time", "open", "high", "low", "close", "volume", "tic", "Day"]]299300# remove 16:00 data301df.drop(df[df["time"].astype(str).str.endswith("16:00:00")].index, inplace=True)302303df.sort_values(by=["tic", "time"], inplace=True)304df.reset_index(drop=True, inplace=True)305306tics = df["tic"].unique()307days = df["Day"].unique()308309start_time = df["time"].min()310end_time = df["time"].max()311start_time = df["time"].min().replace(hour=0, minute=0, second=0)312end_time = df["time"].max().replace(hour=23, minute=59, second=0)313time_range = pd.date_range(314start=start_time, end=end_time, freq="min"315) # 'T' is minute frequency316minute_df = pd.DataFrame({"time": time_range})317318# ADDING MISSING ROWS319for tic in tics:320321print(f"Adding Missing Rows for tic {tic}")322323for day in days:324325# 0) Create the sub df of the missing times326327times_for_this_tic_and_day = df.loc[328(df["Day"] == day) & (df["tic"] == tic), "time"329]330times_for_this_tic_and_day = pd.to_datetime(times_for_this_tic_and_day)331332if min_24:333specific_day = (334df.loc[(df["Day"] == day) & (df["tic"] == tic), "time"]335.iloc[0]336.date()337)338filtered_minute_df = minute_df[339minute_df["time"].dt.date == pd.to_datetime(specific_day).date()340]341filtered_minute_df["time"] = pd.to_datetime(342filtered_minute_df["time"]343)344missing_times = filtered_minute_df[345~filtered_minute_df["time"].isin(times_for_this_tic_and_day)346]347else:348# all times across all tics, for the day349# Filter the DataFrame for the given Day (e.g., day_value = 3)350existing_time_values = df[df["Day"] == day]["time"].unique()351352existing_time_df = pd.DataFrame(353{"time": pd.to_datetime(existing_time_values)}354)355missing_times = existing_time_df[356~existing_time_df["time"].isin(times_for_this_tic_and_day)357]358359missing_times["open"] = np.nan # float64360missing_times["high"] = np.nan # float64361missing_times["low"] = np.nan # float64362missing_times["close"] = np.nan # float64363missing_times["volume"] = np.nan # float64364missing_times["tic"] = tic # object (empty string is still an object)365missing_times["Day"] = day # int64366missing_times = missing_times.astype(367{368"open": "float64",369"high": "float64",370"low": "float64",371"close": "float64",372"volume": "float64",373"tic": "object",374"Day": "int64",375}376)377378# 1) Add the sub df (missing_times) to df379# Example: insert after the last row where Day = 2 and tic = "AAPL"380mask = (df["Day"] == day) & (df["tic"] == tic)381insert_index = df[mask].index.max()382383# Split df_orig and insert df in between384df_before = df.iloc[: insert_index + 1]385df_after = df.iloc[insert_index + 1 :]386387df = pd.concat([df_before, missing_times, df_after], ignore_index=True)388389df.sort_values(by=["tic", "time"], inplace=True)390df.reset_index(drop=True, inplace=True)391392# Replace all 0 volume with a Nan (to allow for ffill and bfill to work)393df.loc[df["volume"] == 0, "volume"] = np.nan394395## FILLING THE MISSING ROWS396for tic in tics:397398print(f"Filling Missing Rows for tic {tic}")399400cols_to_ffill = ["close", "open", "high", "low", "volume"]401df.loc[df["tic"] == tic, cols_to_ffill] = (402df.loc[df["tic"] == tic, cols_to_ffill].ffill().bfill()403)404405return df406407def add_technical_indicator(408self,409df,410tech_indicator_list=[411"macd",412"boll_ub",413"boll_lb",414"rsi_30",415"dx_30",416"close_30_sma",417"close_60_sma",418],419):420df = df.rename(columns={"time": "date"})421df = df.copy()422df = df.sort_values(by=["tic", "date"])423stock = Sdf.retype(df.copy())424unique_ticker = stock.tic.unique()425tech_indicator_list = tech_indicator_list426427for indicator in tech_indicator_list:428print(f"doing indicator {indicator}")429indicator_df = pd.DataFrame()430for i in range(len(unique_ticker)):431# print(unique_ticker[i], i)432temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]433temp_indicator = pd.DataFrame(temp_indicator)434temp_indicator["tic"] = unique_ticker[i]435# print(len(df[df.tic == unique_ticker[i]]['date'].to_list()))436temp_indicator["date"] = df[df.tic == unique_ticker[i]][437"date"438].to_list()439# indicator_df = indicator_df.append(temp_indicator, ignore_index=True)440indicator_df = pd.concat(441[indicator_df, temp_indicator], axis=0, ignore_index=True442)443444df = df.merge(445indicator_df[["tic", "date", indicator]], on=["tic", "date"], how="left"446)447df = df.sort_values(by=["date", "tic"])448print("Succesfully add technical indicators")449return df450451def calculate_turbulence(self, data, time_period=252):452# can add other market assets453df = data.copy()454df_price_pivot = df.pivot(index="date", columns="tic", values="close")455# use returns to calculate turbulence456df_price_pivot = df_price_pivot.pct_change()457458unique_date = df.date.unique()459# start after a fixed time period460start = time_period461turbulence_index = [0] * start462# turbulence_index = [0]463count = 0464for i in range(start, len(unique_date)):465current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]466# use one year rolling window to calcualte covariance467hist_price = df_price_pivot[468(df_price_pivot.index < unique_date[i])469& (df_price_pivot.index >= unique_date[i - time_period])470]471# Drop tickers which has number missing values more than the "oldest" ticker472filtered_hist_price = hist_price.iloc[473hist_price.isna().sum().min() :474].dropna(axis=1)475476cov_temp = filtered_hist_price.cov()477current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(478filtered_hist_price, axis=0479)480temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(481current_temp.values.T482)483if temp > 0:484count += 1485if count > 2:486turbulence_temp = temp[0][0]487else:488# avoid large outlier because of the calculation just begins489turbulence_temp = 0490else:491turbulence_temp = 0492turbulence_index.append(turbulence_temp)493494turbulence_index = pd.DataFrame(495{"date": df_price_pivot.index, "turbulence": turbulence_index}496)497return turbulence_index498499def add_turbulence(self, data, time_period=252):500"""501add turbulence index from a precalcualted dataframe502:param data: (df) pandas dataframe503:return: (df) pandas dataframe504"""505df = data.copy()506turbulence_index = self.calculate_turbulence(df, time_period=time_period)507df = df.merge(turbulence_index, on="date")508df = df.sort_values(["date", "tic"]).reset_index(drop=True)509return df510511512