Path: blob/master/finrl/meta/data_processors/processor_yahoofinance.py
732 views
"""Reference: https://github.com/AI4Finance-LLC/FinRL"""12from __future__ import annotations34import datetime5import time6from datetime import date7from datetime import timedelta8from sqlite3 import Timestamp9from typing import Any10from typing import Dict11from typing import List12from typing import Optional13from typing import Type14from typing import TypeVar15from typing import Union1617import numpy as np18import pandas as pd19import pandas_market_calendars as tc20import pytz21import yfinance as yf22from bs4 import BeautifulSoup23from selenium import webdriver24from selenium.webdriver.chrome.options import Options25from selenium.webdriver.chrome.service import Service26from selenium.webdriver.common.action_chains import ActionChains27from selenium.webdriver.common.by import By28from stockstats import StockDataFrame as Sdf29from webdriver_manager.chrome import ChromeDriverManager3031### Added by aymeric75 for scrap_data function323334class YahooFinanceProcessor:35"""Provides methods for retrieving daily stock data from36Yahoo Finance API37"""3839def __init__(self):40pass4142"""43Param44----------45start_date : str46start date of the data47end_date : str48end date of the data49ticker_list : list50a list of stock tickers51Example52-------53input:54ticker_list = config_tickers.DOW_30_TICKER55start_date = '2009-01-01'56end_date = '2021-10-31'57time_interval == "1D"5859output:60date tic open high low close volume610 2009-01-02 AAPL 3.067143 3.251429 3.041429 2.767330 746015200.0621 2009-01-02 AMGN 58.590000 59.080002 57.750000 44.523766 6547900.0632 2009-01-02 AXP 18.570000 19.520000 18.400000 15.477426 10955700.0643 2009-01-02 BA 42.799999 45.560001 42.779999 33.941093 7010200.065...66"""6768######## ADDED BY aymeric75 ###################6970def date_to_unix(self, date_str) -> int:71"""Convert a date string in yyyy-mm-dd format to Unix timestamp."""72dt = datetime.datetime.strptime(date_str, "%Y-%m-%d")73return int(dt.timestamp())7475def fetch_stock_data(self, stock_name, period1, period2) -> pd.DataFrame:76# Base URL77url = f"https://finance.yahoo.com/quote/{stock_name}/history/?period1={period1}&period2={period2}&filter=history"7879# Selenium WebDriver Setup80options = Options()81options.add_argument("--headless") # Headless for performance82options.add_argument("--disable-gpu") # Disable GPU for compatibility83driver = webdriver.Chrome(84service=Service(ChromeDriverManager().install()), options=options85)8687# Navigate to the URL88driver.get(url)89driver.maximize_window()90time.sleep(5) # Wait for redirection and page load9192# Handle potential popup93try:94RejectAll = driver.find_element(95By.XPATH, '//button[@class="btn secondary reject-all"]'96)97action = ActionChains(driver)98action.click(on_element=RejectAll)99action.perform()100time.sleep(5)101102except Exception as e:103print("Popup not found or handled:", e)104105# Parse the page for the table106soup = BeautifulSoup(driver.page_source, "html.parser")107table = soup.find("table")108if not table:109raise Exception("No table found after handling redirection and popup.")110111# Extract headers112headers = [th.text.strip() for th in table.find_all("th")]113headers[4] = "Close"114headers[5] = "Adj Close"115headers = ["date", "open", "high", "low", "close", "adjcp", "volume"]116# , 'tic', 'day'117118# Extract rows119rows = []120for tr in table.find_all("tr")[1:]: # Skip header row121cells = [td.text.strip() for td in tr.find_all("td")]122if len(cells) == len(headers): # Only add rows with correct column count123rows.append(cells)124125# Create DataFrame126df = pd.DataFrame(rows, columns=headers)127128# Convert columns to appropriate data types129def safe_convert(value, dtype):130try:131return dtype(value.replace(",", ""))132except ValueError:133return value134135df["open"] = df["open"].apply(lambda x: safe_convert(x, float))136df["high"] = df["high"].apply(lambda x: safe_convert(x, float))137df["low"] = df["low"].apply(lambda x: safe_convert(x, float))138df["close"] = df["close"].apply(lambda x: safe_convert(x, float))139df["adjcp"] = df["adjcp"].apply(lambda x: safe_convert(x, float))140df["volume"] = df["volume"].apply(lambda x: safe_convert(x, int))141142# Add 'tic' column143df["tic"] = stock_name144145# Add 'day' column146start_date = datetime.datetime.fromtimestamp(period1)147df["date"] = pd.to_datetime(df["date"])148df["day"] = (df["date"] - start_date).dt.days149df = df[df["day"] >= 0] # Exclude rows with days before the start date150151# Reverse the DataFrame rows152df = df.iloc[::-1].reset_index(drop=True)153154return df155156def scrap_data(self, stock_names, start_date, end_date) -> pd.DataFrame:157"""Fetch and combine stock data for multiple stock names."""158period1 = self.date_to_unix(start_date)159period2 = self.date_to_unix(end_date)160161all_dataframes = []162total_stocks = len(stock_names)163164for i, stock_name in enumerate(stock_names):165try:166print(167f"Processing {stock_name} ({i + 1}/{total_stocks})... {(i + 1) / total_stocks * 100:.2f}% complete."168)169df = self.fetch_stock_data(stock_name, period1, period2)170all_dataframes.append(df)171except Exception as e:172print(f"Error fetching data for {stock_name}: {e}")173174combined_df = pd.concat(all_dataframes, ignore_index=True)175combined_df = combined_df.sort_values(by=["day", "tick"]).reset_index(drop=True)176177return combined_df178179######## END ADDED BY aymeric75 ###################180181def convert_interval(self, time_interval: str) -> str:182# Convert FinRL 'standardised' time periods to Yahoo format: 1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo183yahoo_intervals = [184"1m",185"2m",186"5m",187"15m",188"30m",189"60m",190"90m",191"1h",192"1d",193"5d",194"1wk",195"1mo",196"3mo",197]198if time_interval in yahoo_intervals:199return time_interval200if time_interval in [201"1Min",202"2Min",203"5Min",204"15Min",205"30Min",206"60Min",207"90Min",208]:209time_interval = time_interval.replace("Min", "m")210elif time_interval in ["1H", "1D", "5D", "1h", "1d", "5d"]:211time_interval = time_interval.lower()212elif time_interval == "1W":213time_interval = "1wk"214elif time_interval in ["1M", "3M"]:215time_interval = time_interval.replace("M", "mo")216else:217raise ValueError("wrong time_interval")218219return time_interval220221def download_data(222self,223ticker_list: list[str],224start_date: str,225end_date: str,226time_interval: str,227proxy: str | dict = None,228) -> pd.DataFrame:229time_interval = self.convert_interval(time_interval)230231self.start = start_date232self.end = end_date233self.time_interval = time_interval234235# Download and save the data in a pandas DataFrame236start_date = pd.Timestamp(start_date)237end_date = pd.Timestamp(end_date)238delta = timedelta(days=1)239data_df = pd.DataFrame()240for tic in ticker_list:241current_tic_start_date = start_date242while (243current_tic_start_date <= end_date244): # downloading daily to workaround yfinance only allowing max 7 calendar (not trading) days of 1 min data per single download245temp_df = yf.download(246tic,247start=current_tic_start_date,248end=current_tic_start_date + delta,249interval=self.time_interval,250proxy=proxy,251)252if temp_df.columns.nlevels != 1:253temp_df.columns = temp_df.columns.droplevel(1)254255temp_df["tic"] = tic256data_df = pd.concat([data_df, temp_df])257current_tic_start_date += delta258259data_df = data_df.reset_index().drop(columns=["Adj Close"])260# convert the column names to match processor_alpaca.py as far as poss261data_df.columns = [262"timestamp",263"open",264"high",265"low",266"close",267"volume",268"tic",269]270271return data_df272273def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:274tic_list = np.unique(df.tic.values)275NY = "America/New_York"276277trading_days = self.get_trading_days(start=self.start, end=self.end)278# produce full timestamp index279if self.time_interval == "1d":280times = trading_days281elif self.time_interval == "1m":282times = []283for day in trading_days:284# NY = "America/New_York"285current_time = pd.Timestamp(day + " 09:30:00").tz_localize(NY)286for i in range(390): # 390 minutes in trading day287times.append(current_time)288current_time += pd.Timedelta(minutes=1)289else:290raise ValueError(291"Data clean at given time interval is not supported for YahooFinance data."292)293294# create a new dataframe with full timestamp series295new_df = pd.DataFrame()296for tic in tic_list:297tmp_df = pd.DataFrame(298columns=["open", "high", "low", "close", "volume"], index=times299)300tic_df = df[301df.tic == tic302] # extract just the rows from downloaded data relating to this tic303for i in range(tic_df.shape[0]): # fill empty DataFrame using original data304tmp_timestamp = tic_df.iloc[i]["timestamp"]305if tmp_timestamp.tzinfo is None:306tmp_timestamp = tmp_timestamp.tz_localize(NY)307else:308tmp_timestamp = tmp_timestamp.tz_convert(NY)309tmp_df.loc[tmp_timestamp] = tic_df.iloc[i][310["open", "high", "low", "close", "volume"]311]312# print("(9) tmp_df\n", tmp_df.to_string()) # print ALL dataframe to check for missing rows from download313314# if close on start date is NaN, fill data with first valid close315# and set volume to 0.316if str(tmp_df.iloc[0]["close"]) == "nan":317print("NaN data on start date, fill using first valid data.")318for i in range(tmp_df.shape[0]):319if str(tmp_df.iloc[i]["close"]) != "nan":320first_valid_close = tmp_df.iloc[i]["close"]321tmp_df.iloc[0] = [322first_valid_close,323first_valid_close,324first_valid_close,325first_valid_close,3260.0,327]328break329330# if the close price of the first row is still NaN (All the prices are NaN in this case)331if str(tmp_df.iloc[0]["close"]) == "nan":332print(333"Missing data for ticker: ",334tic,335" . The prices are all NaN. Fill with 0.",336)337tmp_df.iloc[0] = [3380.0,3390.0,3400.0,3410.0,3420.0,343]344345# fill NaN data with previous close and set volume to 0.346for i in range(tmp_df.shape[0]):347if str(tmp_df.iloc[i]["close"]) == "nan":348previous_close = tmp_df.iloc[i - 1]["close"]349if str(previous_close) == "nan":350raise ValueError351tmp_df.iloc[i] = [352previous_close,353previous_close,354previous_close,355previous_close,3560.0,357]358# print(tmp_df.iloc[i], " Filled NaN data with previous close and set volume to 0. ticker: ", tic)359360# merge single ticker data to new DataFrame361tmp_df = tmp_df.astype(float)362tmp_df["tic"] = tic363new_df = pd.concat([new_df, tmp_df])364365# print(("Data clean for ") + tic + (" is finished."))366367# reset index and rename columns368new_df = new_df.reset_index()369new_df = new_df.rename(columns={"index": "timestamp"})370371# print("Data clean all finished!")372373return new_df374375def add_technical_indicator(376self, data: pd.DataFrame, tech_indicator_list: list[str]377):378"""379calculate technical indicators380use stockstats package to add technical inidactors381:param data: (df) pandas dataframe382:return: (df) pandas dataframe383"""384df = data.copy()385df = df.sort_values(by=["tic", "timestamp"])386stock = Sdf.retype(df.copy())387unique_ticker = stock.tic.unique()388389for indicator in tech_indicator_list:390indicator_df = pd.DataFrame()391for i in range(len(unique_ticker)):392try:393temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]394temp_indicator = pd.DataFrame(temp_indicator)395temp_indicator["tic"] = unique_ticker[i]396temp_indicator["timestamp"] = df[df.tic == unique_ticker[i]][397"timestamp"398].to_list()399indicator_df = pd.concat(400[indicator_df, temp_indicator], ignore_index=True401)402except Exception as e:403print(e)404df = df.merge(405indicator_df[["tic", "timestamp", indicator]],406on=["tic", "timestamp"],407how="left",408)409df = df.sort_values(by=["timestamp", "tic"])410return df411412def add_vix(self, data: pd.DataFrame) -> pd.DataFrame:413"""414add vix from yahoo finance415:param data: (df) pandas dataframe416:return: (df) pandas dataframe417"""418vix_df = self.download_data(["VIXY"], self.start, self.end, self.time_interval)419cleaned_vix = self.clean_data(vix_df)420print("cleaned_vix\n", cleaned_vix)421vix = cleaned_vix[["timestamp", "close"]]422print('cleaned_vix[["timestamp", "close"]\n', vix)423vix = vix.rename(columns={"close": "VIXY"})424print('vix.rename(columns={"close": "VIXY"}\n', vix)425426df = data.copy()427print("df\n", df)428df = df.merge(vix, on="timestamp")429df = df.sort_values(["timestamp", "tic"]).reset_index(drop=True)430return df431432def calculate_turbulence(433self, data: pd.DataFrame, time_period: int = 252434) -> pd.DataFrame:435# can add other market assets436df = data.copy()437df_price_pivot = df.pivot(index="timestamp", columns="tic", values="close")438# use returns to calculate turbulence439df_price_pivot = df_price_pivot.pct_change()440441unique_date = df.timestamp.unique()442# start after a fixed timestamp period443start = time_period444turbulence_index = [0] * start445# turbulence_index = [0]446count = 0447for i in range(start, len(unique_date)):448current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]449# use one year rolling window to calcualte covariance450hist_price = df_price_pivot[451(df_price_pivot.index < unique_date[i])452& (df_price_pivot.index >= unique_date[i - time_period])453]454# Drop tickers which has number missing values more than the "oldest" ticker455filtered_hist_price = hist_price.iloc[456hist_price.isna().sum().min() :457].dropna(axis=1)458459cov_temp = filtered_hist_price.cov()460current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(461filtered_hist_price, axis=0462)463temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(464current_temp.values.T465)466if temp > 0:467count += 1468if count > 2:469turbulence_temp = temp[0][0]470else:471# avoid large outlier because of the calculation just begins472turbulence_temp = 0473else:474turbulence_temp = 0475turbulence_index.append(turbulence_temp)476477turbulence_index = pd.DataFrame(478{"timestamp": df_price_pivot.index, "turbulence": turbulence_index}479)480return turbulence_index481482def add_turbulence(483self, data: pd.DataFrame, time_period: int = 252484) -> pd.DataFrame:485"""486add turbulence index from a precalcualted dataframe487:param data: (df) pandas dataframe488:return: (df) pandas dataframe489"""490df = data.copy()491turbulence_index = self.calculate_turbulence(df, time_period=time_period)492df = df.merge(turbulence_index, on="timestamp")493df = df.sort_values(["timestamp", "tic"]).reset_index(drop=True)494return df495496def df_to_array(497self, df: pd.DataFrame, tech_indicator_list: list[str], if_vix: bool498) -> list[np.ndarray]:499df = df.copy()500unique_ticker = df.tic.unique()501if_first_time = True502for tic in unique_ticker:503if if_first_time:504price_array = df[df.tic == tic][["close"]].values505tech_array = df[df.tic == tic][tech_indicator_list].values506if if_vix:507turbulence_array = df[df.tic == tic]["VIXY"].values508else:509turbulence_array = df[df.tic == tic]["turbulence"].values510if_first_time = False511else:512price_array = np.hstack(513[price_array, df[df.tic == tic][["close"]].values]514)515tech_array = np.hstack(516[tech_array, df[df.tic == tic][tech_indicator_list].values]517)518# print("Successfully transformed into array")519return price_array, tech_array, turbulence_array520521def get_trading_days(self, start: str, end: str) -> list[str]:522nyse = tc.get_calendar("NYSE")523df = nyse.date_range_htf("1D", pd.Timestamp(start), pd.Timestamp(end))524trading_days = []525for day in df:526trading_days.append(str(day)[:10])527return trading_days528529# ****** NB: YAHOO FINANCE DATA MAY BE IN REAL-TIME OR DELAYED BY 15 MINUTES OR MORE, DEPENDING ON THE EXCHANGE ******530def fetch_latest_data(531self,532ticker_list: list[str],533time_interval: str,534tech_indicator_list: list[str],535limit: int = 100,536) -> pd.DataFrame:537time_interval = self.convert_interval(time_interval)538539end_datetime = datetime.datetime.now()540start_datetime = end_datetime - datetime.timedelta(541minutes=limit + 1542) # get the last rows up to limit543544data_df = pd.DataFrame()545for tic in ticker_list:546barset = yf.download(547tic, start_datetime, end_datetime, interval=time_interval548) # use start and end datetime to simulate the limit parameter549barset["tic"] = tic550data_df = pd.concat([data_df, barset])551552data_df = data_df.reset_index().drop(553columns=["Adj Close"]554) # Alpaca data does not have 'Adj Close'555556data_df.columns = [ # convert to Alpaca column names lowercase557"timestamp",558"open",559"high",560"low",561"close",562"volume",563"tic",564]565566start_time = data_df.timestamp.min()567end_time = data_df.timestamp.max()568times = []569current_time = start_time570end = end_time + pd.Timedelta(minutes=1)571while current_time != end:572times.append(current_time)573current_time += pd.Timedelta(minutes=1)574575df = data_df.copy()576new_df = pd.DataFrame()577for tic in ticker_list:578tmp_df = pd.DataFrame(579columns=["open", "high", "low", "close", "volume"], index=times580)581tic_df = df[df.tic == tic]582for i in range(tic_df.shape[0]):583tmp_df.loc[tic_df.iloc[i]["timestamp"]] = tic_df.iloc[i][584["open", "high", "low", "close", "volume"]585]586587if str(tmp_df.iloc[0]["close"]) == "nan":588for i in range(tmp_df.shape[0]):589if str(tmp_df.iloc[i]["close"]) != "nan":590first_valid_close = tmp_df.iloc[i]["close"]591tmp_df.iloc[0] = [592first_valid_close,593first_valid_close,594first_valid_close,595first_valid_close,5960.0,597]598break599if str(tmp_df.iloc[0]["close"]) == "nan":600print(601"Missing data for ticker: ",602tic,603" . The prices are all NaN. Fill with 0.",604)605tmp_df.iloc[0] = [6060.0,6070.0,6080.0,6090.0,6100.0,611]612613for i in range(tmp_df.shape[0]):614if str(tmp_df.iloc[i]["close"]) == "nan":615previous_close = tmp_df.iloc[i - 1]["close"]616if str(previous_close) == "nan":617previous_close = 0.0618tmp_df.iloc[i] = [619previous_close,620previous_close,621previous_close,622previous_close,6230.0,624]625tmp_df = tmp_df.astype(float)626tmp_df["tic"] = tic627new_df = pd.concat([new_df, tmp_df])628629new_df = new_df.reset_index()630new_df = new_df.rename(columns={"index": "timestamp"})631632df = self.add_technical_indicator(new_df, tech_indicator_list)633df["VIXY"] = 0634635price_array, tech_array, turbulence_array = self.df_to_array(636df, tech_indicator_list, if_vix=True637)638latest_price = price_array[-1]639latest_tech = tech_array[-1]640start_datetime = end_datetime - datetime.timedelta(minutes=1)641turb_df = yf.download("VIXY", start_datetime, limit=1)642latest_turb = turb_df["Close"].values643return latest_price, latest_tech, latest_turb644645646