Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/paper_trading/alpaca.py
732 views
1
# Disclaimer: Nothing herein is financial advice, and NOT a recommendation to trade real money. Many platforms exist for simulated trading (paper trading) which can be used for building and developing the methods discussed. Please use common sense and always first consult a professional before trading or investing.
2
# Setup Alpaca Paper trading environment
3
from __future__ import annotations
4
5
import datetime
6
import threading
7
import time
8
9
import alpaca_trade_api as tradeapi
10
import gym
11
import numpy as np
12
import pandas as pd
13
import torch
14
15
from finrl.meta.data_processors.processor_alpaca import AlpacaProcessor
16
from finrl.meta.paper_trading.common import AgentPPO
17
18
19
class PaperTradingAlpaca:
20
def __init__(
21
self,
22
ticker_list,
23
time_interval,
24
drl_lib,
25
agent,
26
cwd,
27
net_dim,
28
state_dim,
29
action_dim,
30
API_KEY,
31
API_SECRET,
32
API_BASE_URL,
33
tech_indicator_list,
34
turbulence_thresh=30,
35
max_stock=1e2,
36
latency=None,
37
):
38
# load agent
39
self.drl_lib = drl_lib
40
if agent == "ppo":
41
if drl_lib == "elegantrl":
42
agent_class = AgentPPO
43
agent = agent_class(net_dim, state_dim, action_dim)
44
actor = agent.act
45
# load agent
46
try:
47
cwd = cwd + "/actor.pth"
48
print(f"| load actor from: {cwd}")
49
actor.load_state_dict(
50
torch.load(cwd, map_location=lambda storage, loc: storage)
51
)
52
self.act = actor
53
self.device = agent.device
54
except BaseException:
55
raise ValueError("Fail to load agent!")
56
57
elif drl_lib == "rllib":
58
from ray.rllib.agents import ppo
59
from ray.rllib.agents.ppo.ppo import PPOTrainer
60
61
config = ppo.DEFAULT_CONFIG.copy()
62
config["env"] = StockEnvEmpty
63
config["log_level"] = "WARN"
64
config["env_config"] = {
65
"state_dim": state_dim,
66
"action_dim": action_dim,
67
}
68
trainer = PPOTrainer(env=StockEnvEmpty, config=config)
69
trainer.restore(cwd)
70
try:
71
trainer.restore(cwd)
72
self.agent = trainer
73
print("Restoring from checkpoint path", cwd)
74
except:
75
raise ValueError("Fail to load agent!")
76
77
elif drl_lib == "stable_baselines3":
78
from stable_baselines3 import PPO
79
80
try:
81
# load agent
82
self.model = PPO.load(cwd)
83
print("Successfully load model", cwd)
84
except:
85
raise ValueError("Fail to load agent!")
86
87
else:
88
raise ValueError(
89
"The DRL library input is NOT supported yet. Please check your input."
90
)
91
92
else:
93
raise ValueError("Agent input is NOT supported yet.")
94
95
# connect to Alpaca trading API
96
try:
97
self.alpaca = tradeapi.REST(API_KEY, API_SECRET, API_BASE_URL, "v2")
98
except:
99
raise ValueError(
100
"Fail to connect Alpaca. Please check account info and internet connection."
101
)
102
103
# read trading time interval
104
if time_interval == "1s":
105
self.time_interval = 1
106
elif time_interval == "5s":
107
self.time_interval = 5
108
elif time_interval == "1Min":
109
self.time_interval = 60
110
elif time_interval == "5Min":
111
self.time_interval = 60 * 5
112
elif time_interval == "15Min":
113
self.time_interval = 60 * 15
114
else:
115
raise ValueError("Time interval input is NOT supported yet.")
116
117
# read trading settings
118
self.tech_indicator_list = tech_indicator_list
119
self.turbulence_thresh = turbulence_thresh
120
self.max_stock = max_stock
121
122
# initialize account
123
self.stocks = np.asarray([0] * len(ticker_list)) # stocks holding
124
self.stocks_cd = np.zeros_like(self.stocks)
125
self.cash = None # cash record
126
self.stocks_df = pd.DataFrame(
127
self.stocks, columns=["stocks"], index=ticker_list
128
)
129
self.asset_list = []
130
self.price = np.asarray([0] * len(ticker_list))
131
self.stockUniverse = ticker_list
132
self.turbulence_bool = 0
133
self.equities = []
134
135
def test_latency(self, test_times=10):
136
total_time = 0
137
for i in range(0, test_times):
138
time0 = time.time()
139
self.get_state()
140
time1 = time.time()
141
temp_time = time1 - time0
142
total_time += temp_time
143
latency = total_time / test_times
144
print("latency for data processing: ", latency)
145
return latency
146
147
def run(self):
148
orders = self.alpaca.list_orders(status="open")
149
for order in orders:
150
self.alpaca.cancel_order(order.id)
151
152
# Wait for market to open.
153
print("Waiting for market to open...")
154
self.awaitMarketOpen()
155
print("Market opened.")
156
while True:
157
# Figure out when the market will close so we can prepare to sell beforehand.
158
clock = self.alpaca.get_clock()
159
closingTime = clock.next_close.replace(
160
tzinfo=datetime.timezone.utc
161
).timestamp()
162
currTime = clock.timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()
163
self.timeToClose = closingTime - currTime
164
165
if self.timeToClose < (60 * 2):
166
# Close all positions when 2 minutes til market close. Any less and it will be in danger of not closing positions in time.
167
168
print("Market closing soon. Closing positions.")
169
170
threads = []
171
positions = self.alpaca.list_positions()
172
for position in positions:
173
if position.side == "long":
174
orderSide = "sell"
175
else:
176
orderSide = "buy"
177
qty = abs(int(float(position.qty)))
178
respSO = []
179
tSubmitOrder = threading.Thread(
180
target=self.submitOrder(qty, position.symbol, orderSide, respSO)
181
)
182
tSubmitOrder.start()
183
threads.append(tSubmitOrder) # record thread for joining later
184
185
for x in threads: # wait for all threads to complete
186
x.join()
187
188
# Run script again after market close for next trading day.
189
print("Sleeping until market close (15 minutes).")
190
time.sleep(60 * 15)
191
192
else:
193
self.trade()
194
last_equity = float(self.alpaca.get_account().last_equity)
195
cur_time = time.time()
196
self.equities.append([cur_time, last_equity])
197
time.sleep(self.time_interval)
198
199
def awaitMarketOpen(self):
200
isOpen = self.alpaca.get_clock().is_open
201
while not isOpen:
202
clock = self.alpaca.get_clock()
203
openingTime = clock.next_open.replace(
204
tzinfo=datetime.timezone.utc
205
).timestamp()
206
currTime = clock.timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()
207
timeToOpen = int((openingTime - currTime) / 60)
208
print(str(timeToOpen) + " minutes til market open.")
209
time.sleep(60)
210
isOpen = self.alpaca.get_clock().is_open
211
212
def trade(self):
213
state = self.get_state()
214
215
if self.drl_lib == "elegantrl":
216
with torch.no_grad():
217
s_tensor = torch.as_tensor((state,), device=self.device)
218
a_tensor = self.act(s_tensor)
219
action = a_tensor.detach().cpu().numpy()[0]
220
action = (action * self.max_stock).astype(int)
221
222
elif self.drl_lib == "rllib":
223
action = self.agent.compute_single_action(state)
224
225
elif self.drl_lib == "stable_baselines3":
226
action = self.model.predict(state)[0]
227
228
else:
229
raise ValueError(
230
"The DRL library input is NOT supported yet. Please check your input."
231
)
232
233
self.stocks_cd += 1
234
if self.turbulence_bool == 0:
235
min_action = 10 # stock_cd
236
threads = []
237
for index in np.where(action < -min_action)[0]: # sell_index:
238
sell_num_shares = min(self.stocks[index], -action[index])
239
qty = abs(int(sell_num_shares))
240
respSO = []
241
tSubmitOrder = threading.Thread(
242
target=self.submitOrder(
243
qty, self.stockUniverse[index], "sell", respSO
244
)
245
)
246
tSubmitOrder.start()
247
threads.append(tSubmitOrder) # record thread for joining later
248
self.cash = float(self.alpaca.get_account().cash)
249
self.stocks_cd[index] = 0
250
251
for x in threads: # wait for all threads to complete
252
x.join()
253
254
threads = []
255
for index in np.where(action > min_action)[0]: # buy_index:
256
if self.cash < 0:
257
tmp_cash = 0
258
else:
259
tmp_cash = self.cash
260
buy_num_shares = min(
261
tmp_cash // self.price[index], abs(int(action[index]))
262
)
263
if buy_num_shares != buy_num_shares: # if buy_num_change = nan
264
qty = 0 # set to 0 quantity
265
else:
266
qty = abs(int(buy_num_shares))
267
respSO = []
268
tSubmitOrder = threading.Thread(
269
target=self.submitOrder(
270
qty, self.stockUniverse[index], "buy", respSO
271
)
272
)
273
tSubmitOrder.start()
274
threads.append(tSubmitOrder) # record thread for joining later
275
self.cash = float(self.alpaca.get_account().cash)
276
self.stocks_cd[index] = 0
277
278
for x in threads: # wait for all threads to complete
279
x.join()
280
281
else: # sell all when turbulence
282
threads = []
283
positions = self.alpaca.list_positions()
284
for position in positions:
285
if position.side == "long":
286
orderSide = "sell"
287
else:
288
orderSide = "buy"
289
qty = abs(int(float(position.qty)))
290
respSO = []
291
tSubmitOrder = threading.Thread(
292
target=self.submitOrder(qty, position.symbol, orderSide, respSO)
293
)
294
tSubmitOrder.start()
295
threads.append(tSubmitOrder) # record thread for joining later
296
297
for x in threads: # wait for all threads to complete
298
x.join()
299
300
self.stocks_cd[:] = 0
301
302
def get_state(self):
303
alpaca = AlpacaProcessor(api=self.alpaca)
304
price, tech, turbulence = alpaca.fetch_latest_data(
305
ticker_list=self.stockUniverse,
306
time_interval="1Min",
307
tech_indicator_list=self.tech_indicator_list,
308
)
309
turbulence_bool = 1 if turbulence >= self.turbulence_thresh else 0
310
311
turbulence = (
312
self.sigmoid_sign(turbulence, self.turbulence_thresh) * 2**-5
313
).astype(np.float32)
314
315
tech = tech * 2**-7
316
positions = self.alpaca.list_positions()
317
stocks = [0] * len(self.stockUniverse)
318
for position in positions:
319
ind = self.stockUniverse.index(position.symbol)
320
stocks[ind] = abs(int(float(position.qty)))
321
322
stocks = np.asarray(stocks, dtype=float)
323
cash = float(self.alpaca.get_account().cash)
324
self.cash = cash
325
self.stocks = stocks
326
self.turbulence_bool = turbulence_bool
327
self.price = price
328
329
amount = np.array(self.cash * (2**-12), dtype=np.float32)
330
scale = np.array(2**-6, dtype=np.float32)
331
state = np.hstack(
332
(
333
amount,
334
turbulence,
335
self.turbulence_bool,
336
price * scale,
337
self.stocks * scale,
338
self.stocks_cd,
339
tech,
340
)
341
).astype(np.float32)
342
state[np.isnan(state)] = 0.0
343
state[np.isinf(state)] = 0.0
344
# print(len(self.stockUniverse))
345
return state
346
347
def submitOrder(self, qty, stock, side, resp):
348
if qty > 0:
349
try:
350
self.alpaca.submit_order(stock, qty, side, "market", "day")
351
print(
352
"Market order of | "
353
+ str(qty)
354
+ " "
355
+ stock
356
+ " "
357
+ side
358
+ " | completed."
359
)
360
resp.append(True)
361
except:
362
print(
363
"Order of | "
364
+ str(qty)
365
+ " "
366
+ stock
367
+ " "
368
+ side
369
+ " | did not go through."
370
)
371
resp.append(False)
372
else:
373
"""
374
print(
375
"Quantity is 0, order of | "
376
+ str(qty)
377
+ " "
378
+ stock
379
+ " "
380
+ side
381
+ " | not completed."
382
)
383
"""
384
resp.append(True)
385
386
@staticmethod
387
def sigmoid_sign(ary, thresh):
388
def sigmoid(x):
389
return 1 / (1 + np.exp(-x * np.e)) - 0.5
390
391
return sigmoid(ary / thresh) * thresh
392
393
394
class StockEnvEmpty(gym.Env):
395
# Empty Env used for loading rllib agent
396
def __init__(self, config):
397
state_dim = config["state_dim"]
398
action_dim = config["action_dim"]
399
self.env_num = 1
400
self.max_step = 10000
401
self.env_name = "StockEnvEmpty"
402
self.state_dim = state_dim
403
self.action_dim = action_dim
404
self.if_discrete = False
405
self.target_return = 9999
406
self.observation_space = gym.spaces.Box(
407
low=-3000, high=3000, shape=(state_dim,), dtype=np.float32
408
)
409
self.action_space = gym.spaces.Box(
410
low=-1, high=1, shape=(action_dim,), dtype=np.float32
411
)
412
413
def reset(
414
self,
415
*,
416
seed=None,
417
options=None,
418
):
419
return
420
421
def step(self, actions):
422
return
423
424