Path: blob/master/finrl/meta/data_processor.py
732 views
from __future__ import annotations12import numpy as np3import pandas as pd45from finrl.meta.data_processors.processor_alpaca import AlpacaProcessor as Alpaca6from finrl.meta.data_processors.processor_wrds import WrdsProcessor as Wrds7from finrl.meta.data_processors.processor_yahoofinance import (8YahooFinanceProcessor as YahooFinance,9)101112class DataProcessor:13def __init__(self, data_source, tech_indicator=None, vix=None, **kwargs):14if data_source == "alpaca":15try:16API_KEY = kwargs.get("API_KEY")17API_SECRET = kwargs.get("API_SECRET")18API_BASE_URL = kwargs.get("API_BASE_URL")19self.processor = Alpaca(API_KEY, API_SECRET, API_BASE_URL)20print("Alpaca successfully connected")21except BaseException:22raise ValueError("Please input correct account info for alpaca!")2324elif data_source == "wrds":25self.processor = Wrds()2627elif data_source == "yahoofinance":28self.processor = YahooFinance()2930else:31raise ValueError("Data source input is NOT supported yet.")3233# Initialize variable in case it is using cache and does not use download_data() method34self.tech_indicator_list = tech_indicator35self.vix = vix3637def download_data(38self, ticker_list, start_date, end_date, time_interval39) -> pd.DataFrame:40df = self.processor.download_data(41ticker_list=ticker_list,42start_date=start_date,43end_date=end_date,44time_interval=time_interval,45)46return df4748def clean_data(self, df) -> pd.DataFrame:49df = self.processor.clean_data(df)50return df5152def add_technical_indicator(self, df, tech_indicator_list) -> pd.DataFrame:53self.tech_indicator_list = tech_indicator_list54df = self.processor.add_technical_indicator(df, tech_indicator_list)55return df5657def add_turbulence(self, df) -> pd.DataFrame:58df = self.processor.add_turbulence(df)59return df6061def add_vix(self, df) -> pd.DataFrame:62df = self.processor.add_vix(df)63return df6465def add_turbulence(self, df) -> pd.DataFrame:66df = self.processor.add_turbulence(df)67return df6869def add_vix(self, df) -> pd.DataFrame:70df = self.processor.add_vix(df)71return df7273def add_vixor(self, df) -> pd.DataFrame:74df = self.processor.add_vixor(df)75return df7677def df_to_array(self, df, if_vix) -> np.array:78price_array, tech_array, turbulence_array = self.processor.df_to_array(79df, self.tech_indicator_list, if_vix80)81# fill nan and inf values with 0 for technical indicators82tech_nan_positions = np.isnan(tech_array)83tech_array[tech_nan_positions] = 084tech_inf_positions = np.isinf(tech_array)85tech_array[tech_inf_positions] = 086return price_array, tech_array, turbulence_array878889