Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/data_processor.py
732 views
1
from __future__ import annotations
2
3
import numpy as np
4
import pandas as pd
5
6
from finrl.meta.data_processors.processor_alpaca import AlpacaProcessor as Alpaca
7
from finrl.meta.data_processors.processor_wrds import WrdsProcessor as Wrds
8
from finrl.meta.data_processors.processor_yahoofinance import (
9
YahooFinanceProcessor as YahooFinance,
10
)
11
12
13
class DataProcessor:
14
def __init__(self, data_source, tech_indicator=None, vix=None, **kwargs):
15
if data_source == "alpaca":
16
try:
17
API_KEY = kwargs.get("API_KEY")
18
API_SECRET = kwargs.get("API_SECRET")
19
API_BASE_URL = kwargs.get("API_BASE_URL")
20
self.processor = Alpaca(API_KEY, API_SECRET, API_BASE_URL)
21
print("Alpaca successfully connected")
22
except BaseException:
23
raise ValueError("Please input correct account info for alpaca!")
24
25
elif data_source == "wrds":
26
self.processor = Wrds()
27
28
elif data_source == "yahoofinance":
29
self.processor = YahooFinance()
30
31
else:
32
raise ValueError("Data source input is NOT supported yet.")
33
34
# Initialize variable in case it is using cache and does not use download_data() method
35
self.tech_indicator_list = tech_indicator
36
self.vix = vix
37
38
def download_data(
39
self, ticker_list, start_date, end_date, time_interval
40
) -> pd.DataFrame:
41
df = self.processor.download_data(
42
ticker_list=ticker_list,
43
start_date=start_date,
44
end_date=end_date,
45
time_interval=time_interval,
46
)
47
return df
48
49
def clean_data(self, df) -> pd.DataFrame:
50
df = self.processor.clean_data(df)
51
return df
52
53
def add_technical_indicator(self, df, tech_indicator_list) -> pd.DataFrame:
54
self.tech_indicator_list = tech_indicator_list
55
df = self.processor.add_technical_indicator(df, tech_indicator_list)
56
return df
57
58
def add_turbulence(self, df) -> pd.DataFrame:
59
df = self.processor.add_turbulence(df)
60
return df
61
62
def add_vix(self, df) -> pd.DataFrame:
63
df = self.processor.add_vix(df)
64
return df
65
66
def add_turbulence(self, df) -> pd.DataFrame:
67
df = self.processor.add_turbulence(df)
68
return df
69
70
def add_vix(self, df) -> pd.DataFrame:
71
df = self.processor.add_vix(df)
72
return df
73
74
def add_vixor(self, df) -> pd.DataFrame:
75
df = self.processor.add_vixor(df)
76
return df
77
78
def df_to_array(self, df, if_vix) -> np.array:
79
price_array, tech_array, turbulence_array = self.processor.df_to_array(
80
df, self.tech_indicator_list, if_vix
81
)
82
# fill nan and inf values with 0 for technical indicators
83
tech_nan_positions = np.isnan(tech_array)
84
tech_array[tech_nan_positions] = 0
85
tech_inf_positions = np.isinf(tech_array)
86
tech_array[tech_inf_positions] = 0
87
return price_array, tech_array, turbulence_array
88
89