Path: blob/master/finrl/meta/data_processors/processor_alpaca.py
732 views
from __future__ import annotations12from concurrent.futures import ProcessPoolExecutor3from concurrent.futures import ThreadPoolExecutor4from datetime import datetime5from datetime import timedelta as td67import numpy as np8import pandas as pd9import pandas_market_calendars as tc10import pytz11from alpaca.data.historical import StockHistoricalDataClient12from alpaca.data.requests import StockBarsRequest13from alpaca.data.timeframe import TimeFrame14from stockstats import StockDataFrame as Sdf1516# import alpaca_trade_api as tradeapi171819class AlpacaProcessor:20def __init__(self, API_KEY=None, API_SECRET=None, API_BASE_URL=None, client=None):21if client is None:22try:23self.client = StockHistoricalDataClient(API_KEY, API_SECRET)24except BaseException:25raise ValueError("Wrong Account Info!")26else:27self.client = client2829def _fetch_data_for_ticker(self, ticker, start_date, end_date, time_interval):30request_params = StockBarsRequest(31symbol_or_symbols=ticker,32timeframe=TimeFrame.Minute,33start=start_date,34end=end_date,35)36bars = self.client.get_stock_bars(request_params).df3738return bars3940def download_data(41self, ticker_list, start_date, end_date, time_interval42) -> pd.DataFrame:43"""44Downloads data using Alpaca's tradeapi.REST method.4546Parameters:47- ticker_list : list of strings, each string is a ticker48- start_date : string in the format 'YYYY-MM-DD'49- end_date : string in the format 'YYYY-MM-DD'50- time_interval: string representing the interval ('1D', '1Min', etc.)5152Returns:53- pd.DataFrame with the requested data54"""55self.start = start_date56self.end = end_date57self.time_interval = time_interval5859NY = "America/New_York"60start_date = pd.Timestamp(start_date + " 09:30:00", tz=NY)61end_date = pd.Timestamp(end_date + " 15:59:00", tz=NY)62data_list = []63# Use ThreadPoolExecutor to fetch data for multiple tickers concurrently64with ThreadPoolExecutor(max_workers=10) as executor:65futures = [66executor.submit(67self._fetch_data_for_ticker,68ticker,69start_date,70end_date,71time_interval,72)73for ticker in ticker_list74]75for future in futures:7677bars = future.result()78# fix start79# Reorganize the dataframes to be in original alpaca_trade_api structure80# Rename the existing 'symbol' column if it exists81if not bars.empty:8283# Now reset the index84bars.reset_index(inplace=True)8586# Set 'timestamp' as the new index87if "level_1" in bars.columns:88bars.rename(columns={"level_1": "timestamp"}, inplace=True)89if "level_0" in bars.columns:90bars.rename(columns={"level_0": "symbol"}, inplace=True)9192bars.set_index("timestamp", inplace=True)9394# Reorder and rename columns as needed95bars = bars[96[97"close",98"high",99"low",100"trade_count",101"open",102"volume",103"vwap",104"symbol",105]106]107108data_list.append(bars)109else:110print("empty")111112# Combine the data113data_df = pd.concat(data_list, axis=0)114115# Convert the timezone116data_df = data_df.tz_convert(NY)117118# If time_interval is less than a day, filter out the times outside of NYSE trading hours119if pd.Timedelta(time_interval) < pd.Timedelta(days=1):120data_df = data_df.between_time("09:30", "15:59")121122# Reset the index and rename the columns for consistency123data_df = data_df.reset_index().rename(124columns={"index": "timestamp", "symbol": "tic"}125)126127# Sort the data by both timestamp and tic for consistent ordering128data_df = data_df.sort_values(by=["tic", "timestamp"])129130# Reset the index and drop the old index column131data_df = data_df.reset_index(drop=True)132133return data_df134135@staticmethod136def clean_individual_ticker(args):137tic, df, times = args138tmp_df = pd.DataFrame(index=times)139tic_df = df[df.tic == tic].set_index("timestamp")140141# Step 1: Merging dataframes to avoid loop142tmp_df = tmp_df.merge(143tic_df[["open", "high", "low", "close", "volume"]],144left_index=True,145right_index=True,146how="left",147)148149# Step 2: Handling NaN values efficiently150if pd.isna(tmp_df.iloc[0]["close"]):151first_valid_index = tmp_df["close"].first_valid_index()152if first_valid_index is not None:153first_valid_price = tmp_df.loc[first_valid_index, "close"]154print(155f"The price of the first row for ticker {tic} is NaN. It will be filled with the first valid price."156)157tmp_df.iloc[0] = [first_valid_price] * 4 + [0.0] # Set volume to zero158else:159print(160f"Missing data for ticker: {tic}. The prices are all NaN. Fill with 0."161)162tmp_df.iloc[0] = [0.0] * 5163164for i in range(1, tmp_df.shape[0]):165if pd.isna(tmp_df.iloc[i]["close"]):166previous_close = tmp_df.iloc[i - 1]["close"]167tmp_df.iloc[i] = [previous_close] * 4 + [0.0]168169# Setting the volume for the market opening timestamp to zero - Not needed170# tmp_df.loc[tmp_df.index.time == pd.Timestamp("09:30:00").time(), 'volume'] = 0.0171172# Step 3: Data type conversion173tmp_df = tmp_df.astype(float)174175tmp_df["tic"] = tic176177return tmp_df178179def clean_data(self, df):180print("Data cleaning started")181tic_list = np.unique(df.tic.values)182n_tickers = len(tic_list)183184print("align start and end dates")185grouped = df.groupby("timestamp")186filter_mask = grouped.transform("count")["tic"] >= n_tickers187df = df[filter_mask]188189# ... (generating 'times' series, same as in your existing code)190191trading_days = self.get_trading_days(start=self.start, end=self.end)192193# produce full timestamp index194print("produce full timestamp index")195times = []196for day in trading_days:197NY = "America/New_York"198current_time = pd.Timestamp(day + " 09:30:00").tz_localize(NY)199for i in range(390):200times.append(current_time)201current_time += pd.Timedelta(minutes=1)202203print("Start processing tickers")204205future_results = []206for tic in tic_list:207result = self.clean_individual_ticker((tic, df.copy(), times))208future_results.append(result)209210print("ticker list complete")211212print("Start concat and rename")213new_df = pd.concat(future_results)214new_df = new_df.reset_index()215new_df = new_df.rename(columns={"index": "timestamp"})216217print("Data clean finished!")218219return new_df220221def add_technical_indicator(222self,223df,224tech_indicator_list=[225"macd",226"boll_ub",227"boll_lb",228"rsi_30",229"dx_30",230"close_30_sma",231"close_60_sma",232],233):234print("Started adding Indicators")235236# Store the original data type of the 'timestamp' column237original_timestamp_dtype = df["timestamp"].dtype238239# Convert df to stock data format just once240stock = Sdf.retype(df)241unique_ticker = stock.tic.unique()242243# Convert timestamp to a consistent datatype (timezone-naive) before entering the loop244df["timestamp"] = df["timestamp"].dt.tz_convert(None)245246print("Running Loop")247for indicator in tech_indicator_list:248indicator_dfs = []249for tic in unique_ticker:250tic_data = stock[stock.tic == tic]251indicator_series = tic_data[indicator]252253tic_timestamps = df.loc[df.tic == tic, "timestamp"]254255indicator_df = pd.DataFrame(256{257"tic": tic,258"date": tic_timestamps.values,259indicator: indicator_series.values,260}261)262indicator_dfs.append(indicator_df)263264# Concatenate all intermediate dataframes at once265indicator_df = pd.concat(indicator_dfs, ignore_index=True)266267# Merge the indicator data frame268df = df.merge(269indicator_df[["tic", "date", indicator]],270left_on=["tic", "timestamp"],271right_on=["tic", "date"],272how="left",273).drop(columns="date")274275print("Restore Timestamps")276# Restore the original data type of the 'timestamp' column277if isinstance(original_timestamp_dtype, pd.DatetimeTZDtype):278if df["timestamp"].dt.tz is None:279df["timestamp"] = df["timestamp"].dt.tz_localize("UTC")280df["timestamp"] = df["timestamp"].dt.tz_convert(original_timestamp_dtype.tz)281else:282df["timestamp"] = df["timestamp"].astype(original_timestamp_dtype)283284print("Finished adding Indicators")285return df286287# Allows to multithread the add_vix function for quicker execution288def download_and_clean_data(self):289vix_df = self.download_data(["VIXY"], self.start, self.end, self.time_interval)290return self.clean_data(vix_df)291292def add_vix(self, data):293with ThreadPoolExecutor() as executor:294future = executor.submit(self.download_and_clean_data)295cleaned_vix = future.result()296297vix = cleaned_vix[["timestamp", "close"]]298299merge_column = "date" if "date" in data.columns else "timestamp"300301vix = vix.rename(302columns={"timestamp": merge_column, "close": "VIXY"}303) # Change column name dynamically304305data = data.copy()306data = data.merge(307vix, on=merge_column308) # Use the dynamic column name for merging309data = data.sort_values([merge_column, "tic"]).reset_index(drop=True)310311return data312313def calculate_turbulence(self, data, time_period=252):314# can add other market assets315df = data.copy()316df_price_pivot = df.pivot(index="timestamp", columns="tic", values="close")317# use returns to calculate turbulence318df_price_pivot = df_price_pivot.pct_change()319320unique_date = df.timestamp.unique()321# start after a fixed timestamp period322start = time_period323turbulence_index = [0] * start324# turbulence_index = [0]325count = 0326for i in range(start, len(unique_date)):327current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]328# use one year rolling window to calcualte covariance329hist_price = df_price_pivot[330(df_price_pivot.index < unique_date[i])331& (df_price_pivot.index >= unique_date[i - time_period])332]333# Drop tickers which has number missing values more than the "oldest" ticker334filtered_hist_price = hist_price.iloc[335hist_price.isna().sum().min() :336].dropna(axis=1)337338cov_temp = filtered_hist_price.cov()339current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(340filtered_hist_price, axis=0341)342temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(343current_temp.values.T344)345if temp > 0:346count += 1347if count > 2:348turbulence_temp = temp[0][0]349else:350# avoid large outlier because of the calculation just begins351turbulence_temp = 0352else:353turbulence_temp = 0354turbulence_index.append(turbulence_temp)355356turbulence_index = pd.DataFrame(357{"timestamp": df_price_pivot.index, "turbulence": turbulence_index}358)359360# print("turbulence_index\n", turbulence_index)361362return turbulence_index363364def add_turbulence(self, data, time_period=252):365"""366add turbulence index from a precalcualted dataframe367:param data: (df) pandas dataframe368:return: (df) pandas dataframe369"""370df = data.copy()371turbulence_index = self.calculate_turbulence(df, time_period=time_period)372df = df.merge(turbulence_index, on="timestamp")373df = df.sort_values(["timestamp", "tic"]).reset_index(drop=True)374return df375376def df_to_array(self, df, tech_indicator_list, if_vix):377df = df.copy()378unique_ticker = df.tic.unique()379if_first_time = True380for tic in unique_ticker:381if if_first_time:382price_array = df[df.tic == tic][["close"]].values383tech_array = df[df.tic == tic][tech_indicator_list].values384if if_vix:385turbulence_array = df[df.tic == tic]["VIXY"].values386else:387turbulence_array = df[df.tic == tic]["turbulence"].values388if_first_time = False389else:390price_array = np.hstack(391[price_array, df[df.tic == tic][["close"]].values]392)393tech_array = np.hstack(394[tech_array, df[df.tic == tic][tech_indicator_list].values]395)396# print("Successfully transformed into array")397return price_array, tech_array, turbulence_array398399def get_trading_days(self, start, end):400nyse = tc.get_calendar("NYSE")401# df = nyse.sessions_in_range(402# pd.Timestamp(start).tz_localize(None), pd.Timestamp(end).tz_localize(None)403# )404df = nyse.date_range_htf("1D", pd.Timestamp(start), pd.Timestamp(end))405trading_days = []406for day in df:407trading_days.append(str(day)[:10])408return trading_days409410def fetch_latest_data(411self, ticker_list, time_interval, tech_indicator_list, limit=100412) -> pd.DataFrame:413data_df = pd.DataFrame()414for tic in ticker_list:415request_params = StockBarsRequest(416symbol_or_symbols=[tic], timeframe=TimeFrame.Minute, limit=limit417)418419barset = self.client.get_stock_bars(request_params).df420# Reorganize the dataframes to be in original alpaca_trade_api structure421# Rename the existing 'symbol' column if it exists422if "symbol" in barset.columns:423barset.rename(columns={"symbol": "symbol_old"}, inplace=True)424425# Now reset the index426barset.reset_index(inplace=True)427428# Set 'timestamp' as the new index429if "level_0" in barset.columns:430barset.rename(columns={"level_0": "symbol"}, inplace=True)431if "level_1" in bars.columns:432barset.rename(columns={"level_1": "timestamp"}, inplace=True)433barset.set_index("timestamp", inplace=True)434435# Reorder and rename columns as needed436barset = bars[437[438"close",439"high",440"low",441"trade_count",442"open",443"volume",444"vwap",445"symbol",446]447]448449barset["tic"] = tic450barset = barset.reset_index()451data_df = pd.concat([data_df, barset])452453data_df = data_df.reset_index(drop=True)454start_time = data_df.timestamp.min()455end_time = data_df.timestamp.max()456times = []457current_time = start_time458end = end_time + pd.Timedelta(minutes=1)459while current_time != end:460times.append(current_time)461current_time += pd.Timedelta(minutes=1)462463df = data_df.copy()464new_df = pd.DataFrame()465for tic in ticker_list:466tmp_df = pd.DataFrame(467columns=["open", "high", "low", "close", "volume"], index=times468)469tic_df = df[df.tic == tic]470for i in range(tic_df.shape[0]):471tmp_df.loc[tic_df.iloc[i]["timestamp"]] = tic_df.iloc[i][472["open", "high", "low", "close", "volume"]473]474475if str(tmp_df.iloc[0]["close"]) == "nan":476for i in range(tmp_df.shape[0]):477if str(tmp_df.iloc[i]["close"]) != "nan":478first_valid_close = tmp_df.iloc[i]["close"]479tmp_df.iloc[0] = [480first_valid_close,481first_valid_close,482first_valid_close,483first_valid_close,4840.0,485]486break487if str(tmp_df.iloc[0]["close"]) == "nan":488print(489"Missing data for ticker: ",490tic,491" . The prices are all NaN. Fill with 0.",492)493tmp_df.iloc[0] = [4940.0,4950.0,4960.0,4970.0,4980.0,499]500501for i in range(tmp_df.shape[0]):502if str(tmp_df.iloc[i]["close"]) == "nan":503previous_close = tmp_df.iloc[i - 1]["close"]504if str(previous_close) == "nan":505previous_close = 0.0506tmp_df.iloc[i] = [507previous_close,508previous_close,509previous_close,510previous_close,5110.0,512]513tmp_df = tmp_df.astype(float)514tmp_df["tic"] = tic515new_df = pd.concat([new_df, tmp_df])516517new_df = new_df.reset_index()518new_df = new_df.rename(columns={"index": "timestamp"})519520df = self.add_technical_indicator(new_df, tech_indicator_list)521df["VIXY"] = 0522523price_array, tech_array, turbulence_array = self.df_to_array(524df, tech_indicator_list, if_vix=True525)526latest_price = price_array[-1]527latest_tech = tech_array[-1]528request_params = StockBarsRequest(529symbol_or_symbols="VIXY", timeframe=TimeFrame.Minute, limit=1530)531turb_df = self.client.get_stock_bars(request_params).df532latest_turb = turb_df["close"].values533return latest_price, latest_tech, latest_turb534535536