Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/data_processors/processor_alpaca.py
732 views
1
from __future__ import annotations
2
3
from concurrent.futures import ProcessPoolExecutor
4
from concurrent.futures import ThreadPoolExecutor
5
from datetime import datetime
6
from datetime import timedelta as td
7
8
import numpy as np
9
import pandas as pd
10
import pandas_market_calendars as tc
11
import pytz
12
from alpaca.data.historical import StockHistoricalDataClient
13
from alpaca.data.requests import StockBarsRequest
14
from alpaca.data.timeframe import TimeFrame
15
from stockstats import StockDataFrame as Sdf
16
17
# import alpaca_trade_api as tradeapi
18
19
20
class AlpacaProcessor:
21
def __init__(self, API_KEY=None, API_SECRET=None, API_BASE_URL=None, client=None):
22
if client is None:
23
try:
24
self.client = StockHistoricalDataClient(API_KEY, API_SECRET)
25
except BaseException:
26
raise ValueError("Wrong Account Info!")
27
else:
28
self.client = client
29
30
def _fetch_data_for_ticker(self, ticker, start_date, end_date, time_interval):
31
request_params = StockBarsRequest(
32
symbol_or_symbols=ticker,
33
timeframe=TimeFrame.Minute,
34
start=start_date,
35
end=end_date,
36
)
37
bars = self.client.get_stock_bars(request_params).df
38
39
return bars
40
41
def download_data(
42
self, ticker_list, start_date, end_date, time_interval
43
) -> pd.DataFrame:
44
"""
45
Downloads data using Alpaca's tradeapi.REST method.
46
47
Parameters:
48
- ticker_list : list of strings, each string is a ticker
49
- start_date : string in the format 'YYYY-MM-DD'
50
- end_date : string in the format 'YYYY-MM-DD'
51
- time_interval: string representing the interval ('1D', '1Min', etc.)
52
53
Returns:
54
- pd.DataFrame with the requested data
55
"""
56
self.start = start_date
57
self.end = end_date
58
self.time_interval = time_interval
59
60
NY = "America/New_York"
61
start_date = pd.Timestamp(start_date + " 09:30:00", tz=NY)
62
end_date = pd.Timestamp(end_date + " 15:59:00", tz=NY)
63
data_list = []
64
# Use ThreadPoolExecutor to fetch data for multiple tickers concurrently
65
with ThreadPoolExecutor(max_workers=10) as executor:
66
futures = [
67
executor.submit(
68
self._fetch_data_for_ticker,
69
ticker,
70
start_date,
71
end_date,
72
time_interval,
73
)
74
for ticker in ticker_list
75
]
76
for future in futures:
77
78
bars = future.result()
79
# fix start
80
# Reorganize the dataframes to be in original alpaca_trade_api structure
81
# Rename the existing 'symbol' column if it exists
82
if not bars.empty:
83
84
# Now reset the index
85
bars.reset_index(inplace=True)
86
87
# Set 'timestamp' as the new index
88
if "level_1" in bars.columns:
89
bars.rename(columns={"level_1": "timestamp"}, inplace=True)
90
if "level_0" in bars.columns:
91
bars.rename(columns={"level_0": "symbol"}, inplace=True)
92
93
bars.set_index("timestamp", inplace=True)
94
95
# Reorder and rename columns as needed
96
bars = bars[
97
[
98
"close",
99
"high",
100
"low",
101
"trade_count",
102
"open",
103
"volume",
104
"vwap",
105
"symbol",
106
]
107
]
108
109
data_list.append(bars)
110
else:
111
print("empty")
112
113
# Combine the data
114
data_df = pd.concat(data_list, axis=0)
115
116
# Convert the timezone
117
data_df = data_df.tz_convert(NY)
118
119
# If time_interval is less than a day, filter out the times outside of NYSE trading hours
120
if pd.Timedelta(time_interval) < pd.Timedelta(days=1):
121
data_df = data_df.between_time("09:30", "15:59")
122
123
# Reset the index and rename the columns for consistency
124
data_df = data_df.reset_index().rename(
125
columns={"index": "timestamp", "symbol": "tic"}
126
)
127
128
# Sort the data by both timestamp and tic for consistent ordering
129
data_df = data_df.sort_values(by=["tic", "timestamp"])
130
131
# Reset the index and drop the old index column
132
data_df = data_df.reset_index(drop=True)
133
134
return data_df
135
136
@staticmethod
137
def clean_individual_ticker(args):
138
tic, df, times = args
139
tmp_df = pd.DataFrame(index=times)
140
tic_df = df[df.tic == tic].set_index("timestamp")
141
142
# Step 1: Merging dataframes to avoid loop
143
tmp_df = tmp_df.merge(
144
tic_df[["open", "high", "low", "close", "volume"]],
145
left_index=True,
146
right_index=True,
147
how="left",
148
)
149
150
# Step 2: Handling NaN values efficiently
151
if pd.isna(tmp_df.iloc[0]["close"]):
152
first_valid_index = tmp_df["close"].first_valid_index()
153
if first_valid_index is not None:
154
first_valid_price = tmp_df.loc[first_valid_index, "close"]
155
print(
156
f"The price of the first row for ticker {tic} is NaN. It will be filled with the first valid price."
157
)
158
tmp_df.iloc[0] = [first_valid_price] * 4 + [0.0] # Set volume to zero
159
else:
160
print(
161
f"Missing data for ticker: {tic}. The prices are all NaN. Fill with 0."
162
)
163
tmp_df.iloc[0] = [0.0] * 5
164
165
for i in range(1, tmp_df.shape[0]):
166
if pd.isna(tmp_df.iloc[i]["close"]):
167
previous_close = tmp_df.iloc[i - 1]["close"]
168
tmp_df.iloc[i] = [previous_close] * 4 + [0.0]
169
170
# Setting the volume for the market opening timestamp to zero - Not needed
171
# tmp_df.loc[tmp_df.index.time == pd.Timestamp("09:30:00").time(), 'volume'] = 0.0
172
173
# Step 3: Data type conversion
174
tmp_df = tmp_df.astype(float)
175
176
tmp_df["tic"] = tic
177
178
return tmp_df
179
180
def clean_data(self, df):
181
print("Data cleaning started")
182
tic_list = np.unique(df.tic.values)
183
n_tickers = len(tic_list)
184
185
print("align start and end dates")
186
grouped = df.groupby("timestamp")
187
filter_mask = grouped.transform("count")["tic"] >= n_tickers
188
df = df[filter_mask]
189
190
# ... (generating 'times' series, same as in your existing code)
191
192
trading_days = self.get_trading_days(start=self.start, end=self.end)
193
194
# produce full timestamp index
195
print("produce full timestamp index")
196
times = []
197
for day in trading_days:
198
NY = "America/New_York"
199
current_time = pd.Timestamp(day + " 09:30:00").tz_localize(NY)
200
for i in range(390):
201
times.append(current_time)
202
current_time += pd.Timedelta(minutes=1)
203
204
print("Start processing tickers")
205
206
future_results = []
207
for tic in tic_list:
208
result = self.clean_individual_ticker((tic, df.copy(), times))
209
future_results.append(result)
210
211
print("ticker list complete")
212
213
print("Start concat and rename")
214
new_df = pd.concat(future_results)
215
new_df = new_df.reset_index()
216
new_df = new_df.rename(columns={"index": "timestamp"})
217
218
print("Data clean finished!")
219
220
return new_df
221
222
def add_technical_indicator(
223
self,
224
df,
225
tech_indicator_list=[
226
"macd",
227
"boll_ub",
228
"boll_lb",
229
"rsi_30",
230
"dx_30",
231
"close_30_sma",
232
"close_60_sma",
233
],
234
):
235
print("Started adding Indicators")
236
237
# Store the original data type of the 'timestamp' column
238
original_timestamp_dtype = df["timestamp"].dtype
239
240
# Convert df to stock data format just once
241
stock = Sdf.retype(df)
242
unique_ticker = stock.tic.unique()
243
244
# Convert timestamp to a consistent datatype (timezone-naive) before entering the loop
245
df["timestamp"] = df["timestamp"].dt.tz_convert(None)
246
247
print("Running Loop")
248
for indicator in tech_indicator_list:
249
indicator_dfs = []
250
for tic in unique_ticker:
251
tic_data = stock[stock.tic == tic]
252
indicator_series = tic_data[indicator]
253
254
tic_timestamps = df.loc[df.tic == tic, "timestamp"]
255
256
indicator_df = pd.DataFrame(
257
{
258
"tic": tic,
259
"date": tic_timestamps.values,
260
indicator: indicator_series.values,
261
}
262
)
263
indicator_dfs.append(indicator_df)
264
265
# Concatenate all intermediate dataframes at once
266
indicator_df = pd.concat(indicator_dfs, ignore_index=True)
267
268
# Merge the indicator data frame
269
df = df.merge(
270
indicator_df[["tic", "date", indicator]],
271
left_on=["tic", "timestamp"],
272
right_on=["tic", "date"],
273
how="left",
274
).drop(columns="date")
275
276
print("Restore Timestamps")
277
# Restore the original data type of the 'timestamp' column
278
if isinstance(original_timestamp_dtype, pd.DatetimeTZDtype):
279
if df["timestamp"].dt.tz is None:
280
df["timestamp"] = df["timestamp"].dt.tz_localize("UTC")
281
df["timestamp"] = df["timestamp"].dt.tz_convert(original_timestamp_dtype.tz)
282
else:
283
df["timestamp"] = df["timestamp"].astype(original_timestamp_dtype)
284
285
print("Finished adding Indicators")
286
return df
287
288
# Allows to multithread the add_vix function for quicker execution
289
def download_and_clean_data(self):
290
vix_df = self.download_data(["VIXY"], self.start, self.end, self.time_interval)
291
return self.clean_data(vix_df)
292
293
def add_vix(self, data):
294
with ThreadPoolExecutor() as executor:
295
future = executor.submit(self.download_and_clean_data)
296
cleaned_vix = future.result()
297
298
vix = cleaned_vix[["timestamp", "close"]]
299
300
merge_column = "date" if "date" in data.columns else "timestamp"
301
302
vix = vix.rename(
303
columns={"timestamp": merge_column, "close": "VIXY"}
304
) # Change column name dynamically
305
306
data = data.copy()
307
data = data.merge(
308
vix, on=merge_column
309
) # Use the dynamic column name for merging
310
data = data.sort_values([merge_column, "tic"]).reset_index(drop=True)
311
312
return data
313
314
def calculate_turbulence(self, data, time_period=252):
315
# can add other market assets
316
df = data.copy()
317
df_price_pivot = df.pivot(index="timestamp", columns="tic", values="close")
318
# use returns to calculate turbulence
319
df_price_pivot = df_price_pivot.pct_change()
320
321
unique_date = df.timestamp.unique()
322
# start after a fixed timestamp period
323
start = time_period
324
turbulence_index = [0] * start
325
# turbulence_index = [0]
326
count = 0
327
for i in range(start, len(unique_date)):
328
current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
329
# use one year rolling window to calcualte covariance
330
hist_price = df_price_pivot[
331
(df_price_pivot.index < unique_date[i])
332
& (df_price_pivot.index >= unique_date[i - time_period])
333
]
334
# Drop tickers which has number missing values more than the "oldest" ticker
335
filtered_hist_price = hist_price.iloc[
336
hist_price.isna().sum().min() :
337
].dropna(axis=1)
338
339
cov_temp = filtered_hist_price.cov()
340
current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(
341
filtered_hist_price, axis=0
342
)
343
temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
344
current_temp.values.T
345
)
346
if temp > 0:
347
count += 1
348
if count > 2:
349
turbulence_temp = temp[0][0]
350
else:
351
# avoid large outlier because of the calculation just begins
352
turbulence_temp = 0
353
else:
354
turbulence_temp = 0
355
turbulence_index.append(turbulence_temp)
356
357
turbulence_index = pd.DataFrame(
358
{"timestamp": df_price_pivot.index, "turbulence": turbulence_index}
359
)
360
361
# print("turbulence_index\n", turbulence_index)
362
363
return turbulence_index
364
365
def add_turbulence(self, data, time_period=252):
366
"""
367
add turbulence index from a precalcualted dataframe
368
:param data: (df) pandas dataframe
369
:return: (df) pandas dataframe
370
"""
371
df = data.copy()
372
turbulence_index = self.calculate_turbulence(df, time_period=time_period)
373
df = df.merge(turbulence_index, on="timestamp")
374
df = df.sort_values(["timestamp", "tic"]).reset_index(drop=True)
375
return df
376
377
def df_to_array(self, df, tech_indicator_list, if_vix):
378
df = df.copy()
379
unique_ticker = df.tic.unique()
380
if_first_time = True
381
for tic in unique_ticker:
382
if if_first_time:
383
price_array = df[df.tic == tic][["close"]].values
384
tech_array = df[df.tic == tic][tech_indicator_list].values
385
if if_vix:
386
turbulence_array = df[df.tic == tic]["VIXY"].values
387
else:
388
turbulence_array = df[df.tic == tic]["turbulence"].values
389
if_first_time = False
390
else:
391
price_array = np.hstack(
392
[price_array, df[df.tic == tic][["close"]].values]
393
)
394
tech_array = np.hstack(
395
[tech_array, df[df.tic == tic][tech_indicator_list].values]
396
)
397
# print("Successfully transformed into array")
398
return price_array, tech_array, turbulence_array
399
400
def get_trading_days(self, start, end):
401
nyse = tc.get_calendar("NYSE")
402
# df = nyse.sessions_in_range(
403
# pd.Timestamp(start).tz_localize(None), pd.Timestamp(end).tz_localize(None)
404
# )
405
df = nyse.date_range_htf("1D", pd.Timestamp(start), pd.Timestamp(end))
406
trading_days = []
407
for day in df:
408
trading_days.append(str(day)[:10])
409
return trading_days
410
411
def fetch_latest_data(
412
self, ticker_list, time_interval, tech_indicator_list, limit=100
413
) -> pd.DataFrame:
414
data_df = pd.DataFrame()
415
for tic in ticker_list:
416
request_params = StockBarsRequest(
417
symbol_or_symbols=[tic], timeframe=TimeFrame.Minute, limit=limit
418
)
419
420
barset = self.client.get_stock_bars(request_params).df
421
# Reorganize the dataframes to be in original alpaca_trade_api structure
422
# Rename the existing 'symbol' column if it exists
423
if "symbol" in barset.columns:
424
barset.rename(columns={"symbol": "symbol_old"}, inplace=True)
425
426
# Now reset the index
427
barset.reset_index(inplace=True)
428
429
# Set 'timestamp' as the new index
430
if "level_0" in barset.columns:
431
barset.rename(columns={"level_0": "symbol"}, inplace=True)
432
if "level_1" in bars.columns:
433
barset.rename(columns={"level_1": "timestamp"}, inplace=True)
434
barset.set_index("timestamp", inplace=True)
435
436
# Reorder and rename columns as needed
437
barset = bars[
438
[
439
"close",
440
"high",
441
"low",
442
"trade_count",
443
"open",
444
"volume",
445
"vwap",
446
"symbol",
447
]
448
]
449
450
barset["tic"] = tic
451
barset = barset.reset_index()
452
data_df = pd.concat([data_df, barset])
453
454
data_df = data_df.reset_index(drop=True)
455
start_time = data_df.timestamp.min()
456
end_time = data_df.timestamp.max()
457
times = []
458
current_time = start_time
459
end = end_time + pd.Timedelta(minutes=1)
460
while current_time != end:
461
times.append(current_time)
462
current_time += pd.Timedelta(minutes=1)
463
464
df = data_df.copy()
465
new_df = pd.DataFrame()
466
for tic in ticker_list:
467
tmp_df = pd.DataFrame(
468
columns=["open", "high", "low", "close", "volume"], index=times
469
)
470
tic_df = df[df.tic == tic]
471
for i in range(tic_df.shape[0]):
472
tmp_df.loc[tic_df.iloc[i]["timestamp"]] = tic_df.iloc[i][
473
["open", "high", "low", "close", "volume"]
474
]
475
476
if str(tmp_df.iloc[0]["close"]) == "nan":
477
for i in range(tmp_df.shape[0]):
478
if str(tmp_df.iloc[i]["close"]) != "nan":
479
first_valid_close = tmp_df.iloc[i]["close"]
480
tmp_df.iloc[0] = [
481
first_valid_close,
482
first_valid_close,
483
first_valid_close,
484
first_valid_close,
485
0.0,
486
]
487
break
488
if str(tmp_df.iloc[0]["close"]) == "nan":
489
print(
490
"Missing data for ticker: ",
491
tic,
492
" . The prices are all NaN. Fill with 0.",
493
)
494
tmp_df.iloc[0] = [
495
0.0,
496
0.0,
497
0.0,
498
0.0,
499
0.0,
500
]
501
502
for i in range(tmp_df.shape[0]):
503
if str(tmp_df.iloc[i]["close"]) == "nan":
504
previous_close = tmp_df.iloc[i - 1]["close"]
505
if str(previous_close) == "nan":
506
previous_close = 0.0
507
tmp_df.iloc[i] = [
508
previous_close,
509
previous_close,
510
previous_close,
511
previous_close,
512
0.0,
513
]
514
tmp_df = tmp_df.astype(float)
515
tmp_df["tic"] = tic
516
new_df = pd.concat([new_df, tmp_df])
517
518
new_df = new_df.reset_index()
519
new_df = new_df.rename(columns={"index": "timestamp"})
520
521
df = self.add_technical_indicator(new_df, tech_indicator_list)
522
df["VIXY"] = 0
523
524
price_array, tech_array, turbulence_array = self.df_to_array(
525
df, tech_indicator_list, if_vix=True
526
)
527
latest_price = price_array[-1]
528
latest_tech = tech_array[-1]
529
request_params = StockBarsRequest(
530
symbol_or_symbols="VIXY", timeframe=TimeFrame.Minute, limit=1
531
)
532
turb_df = self.client.get_stock_bars(request_params).df
533
latest_turb = turb_df["close"].values
534
return latest_price, latest_tech, latest_turb
535
536