Path: blob/master/finrl/meta/paper_trading/alpaca.py
732 views
# Disclaimer: Nothing herein is financial advice, and NOT a recommendation to trade real money. Many platforms exist for simulated trading (paper trading) which can be used for building and developing the methods discussed. Please use common sense and always first consult a professional before trading or investing.1# Setup Alpaca Paper trading environment2from __future__ import annotations34import datetime5import threading6import time78import alpaca_trade_api as tradeapi9import gym10import numpy as np11import pandas as pd12import torch1314from finrl.meta.data_processors.processor_alpaca import AlpacaProcessor15from finrl.meta.paper_trading.common import AgentPPO161718class PaperTradingAlpaca:19def __init__(20self,21ticker_list,22time_interval,23drl_lib,24agent,25cwd,26net_dim,27state_dim,28action_dim,29API_KEY,30API_SECRET,31API_BASE_URL,32tech_indicator_list,33turbulence_thresh=30,34max_stock=1e2,35latency=None,36):37# load agent38self.drl_lib = drl_lib39if agent == "ppo":40if drl_lib == "elegantrl":41agent_class = AgentPPO42agent = agent_class(net_dim, state_dim, action_dim)43actor = agent.act44# load agent45try:46cwd = cwd + "/actor.pth"47print(f"| load actor from: {cwd}")48actor.load_state_dict(49torch.load(cwd, map_location=lambda storage, loc: storage)50)51self.act = actor52self.device = agent.device53except BaseException:54raise ValueError("Fail to load agent!")5556elif drl_lib == "rllib":57from ray.rllib.agents import ppo58from ray.rllib.agents.ppo.ppo import PPOTrainer5960config = ppo.DEFAULT_CONFIG.copy()61config["env"] = StockEnvEmpty62config["log_level"] = "WARN"63config["env_config"] = {64"state_dim": state_dim,65"action_dim": action_dim,66}67trainer = PPOTrainer(env=StockEnvEmpty, config=config)68trainer.restore(cwd)69try:70trainer.restore(cwd)71self.agent = trainer72print("Restoring from checkpoint path", cwd)73except:74raise ValueError("Fail to load agent!")7576elif drl_lib == "stable_baselines3":77from stable_baselines3 import PPO7879try:80# load agent81self.model = PPO.load(cwd)82print("Successfully load model", cwd)83except:84raise ValueError("Fail to load agent!")8586else:87raise ValueError(88"The DRL library input is NOT supported yet. Please check your input."89)9091else:92raise ValueError("Agent input is NOT supported yet.")9394# connect to Alpaca trading API95try:96self.alpaca = tradeapi.REST(API_KEY, API_SECRET, API_BASE_URL, "v2")97except:98raise ValueError(99"Fail to connect Alpaca. Please check account info and internet connection."100)101102# read trading time interval103if time_interval == "1s":104self.time_interval = 1105elif time_interval == "5s":106self.time_interval = 5107elif time_interval == "1Min":108self.time_interval = 60109elif time_interval == "5Min":110self.time_interval = 60 * 5111elif time_interval == "15Min":112self.time_interval = 60 * 15113else:114raise ValueError("Time interval input is NOT supported yet.")115116# read trading settings117self.tech_indicator_list = tech_indicator_list118self.turbulence_thresh = turbulence_thresh119self.max_stock = max_stock120121# initialize account122self.stocks = np.asarray([0] * len(ticker_list)) # stocks holding123self.stocks_cd = np.zeros_like(self.stocks)124self.cash = None # cash record125self.stocks_df = pd.DataFrame(126self.stocks, columns=["stocks"], index=ticker_list127)128self.asset_list = []129self.price = np.asarray([0] * len(ticker_list))130self.stockUniverse = ticker_list131self.turbulence_bool = 0132self.equities = []133134def test_latency(self, test_times=10):135total_time = 0136for i in range(0, test_times):137time0 = time.time()138self.get_state()139time1 = time.time()140temp_time = time1 - time0141total_time += temp_time142latency = total_time / test_times143print("latency for data processing: ", latency)144return latency145146def run(self):147orders = self.alpaca.list_orders(status="open")148for order in orders:149self.alpaca.cancel_order(order.id)150151# Wait for market to open.152print("Waiting for market to open...")153self.awaitMarketOpen()154print("Market opened.")155while True:156# Figure out when the market will close so we can prepare to sell beforehand.157clock = self.alpaca.get_clock()158closingTime = clock.next_close.replace(159tzinfo=datetime.timezone.utc160).timestamp()161currTime = clock.timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()162self.timeToClose = closingTime - currTime163164if self.timeToClose < (60 * 2):165# Close all positions when 2 minutes til market close. Any less and it will be in danger of not closing positions in time.166167print("Market closing soon. Closing positions.")168169threads = []170positions = self.alpaca.list_positions()171for position in positions:172if position.side == "long":173orderSide = "sell"174else:175orderSide = "buy"176qty = abs(int(float(position.qty)))177respSO = []178tSubmitOrder = threading.Thread(179target=self.submitOrder(qty, position.symbol, orderSide, respSO)180)181tSubmitOrder.start()182threads.append(tSubmitOrder) # record thread for joining later183184for x in threads: # wait for all threads to complete185x.join()186187# Run script again after market close for next trading day.188print("Sleeping until market close (15 minutes).")189time.sleep(60 * 15)190191else:192self.trade()193last_equity = float(self.alpaca.get_account().last_equity)194cur_time = time.time()195self.equities.append([cur_time, last_equity])196time.sleep(self.time_interval)197198def awaitMarketOpen(self):199isOpen = self.alpaca.get_clock().is_open200while not isOpen:201clock = self.alpaca.get_clock()202openingTime = clock.next_open.replace(203tzinfo=datetime.timezone.utc204).timestamp()205currTime = clock.timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()206timeToOpen = int((openingTime - currTime) / 60)207print(str(timeToOpen) + " minutes til market open.")208time.sleep(60)209isOpen = self.alpaca.get_clock().is_open210211def trade(self):212state = self.get_state()213214if self.drl_lib == "elegantrl":215with torch.no_grad():216s_tensor = torch.as_tensor((state,), device=self.device)217a_tensor = self.act(s_tensor)218action = a_tensor.detach().cpu().numpy()[0]219action = (action * self.max_stock).astype(int)220221elif self.drl_lib == "rllib":222action = self.agent.compute_single_action(state)223224elif self.drl_lib == "stable_baselines3":225action = self.model.predict(state)[0]226227else:228raise ValueError(229"The DRL library input is NOT supported yet. Please check your input."230)231232self.stocks_cd += 1233if self.turbulence_bool == 0:234min_action = 10 # stock_cd235threads = []236for index in np.where(action < -min_action)[0]: # sell_index:237sell_num_shares = min(self.stocks[index], -action[index])238qty = abs(int(sell_num_shares))239respSO = []240tSubmitOrder = threading.Thread(241target=self.submitOrder(242qty, self.stockUniverse[index], "sell", respSO243)244)245tSubmitOrder.start()246threads.append(tSubmitOrder) # record thread for joining later247self.cash = float(self.alpaca.get_account().cash)248self.stocks_cd[index] = 0249250for x in threads: # wait for all threads to complete251x.join()252253threads = []254for index in np.where(action > min_action)[0]: # buy_index:255if self.cash < 0:256tmp_cash = 0257else:258tmp_cash = self.cash259buy_num_shares = min(260tmp_cash // self.price[index], abs(int(action[index]))261)262if buy_num_shares != buy_num_shares: # if buy_num_change = nan263qty = 0 # set to 0 quantity264else:265qty = abs(int(buy_num_shares))266respSO = []267tSubmitOrder = threading.Thread(268target=self.submitOrder(269qty, self.stockUniverse[index], "buy", respSO270)271)272tSubmitOrder.start()273threads.append(tSubmitOrder) # record thread for joining later274self.cash = float(self.alpaca.get_account().cash)275self.stocks_cd[index] = 0276277for x in threads: # wait for all threads to complete278x.join()279280else: # sell all when turbulence281threads = []282positions = self.alpaca.list_positions()283for position in positions:284if position.side == "long":285orderSide = "sell"286else:287orderSide = "buy"288qty = abs(int(float(position.qty)))289respSO = []290tSubmitOrder = threading.Thread(291target=self.submitOrder(qty, position.symbol, orderSide, respSO)292)293tSubmitOrder.start()294threads.append(tSubmitOrder) # record thread for joining later295296for x in threads: # wait for all threads to complete297x.join()298299self.stocks_cd[:] = 0300301def get_state(self):302alpaca = AlpacaProcessor(api=self.alpaca)303price, tech, turbulence = alpaca.fetch_latest_data(304ticker_list=self.stockUniverse,305time_interval="1Min",306tech_indicator_list=self.tech_indicator_list,307)308turbulence_bool = 1 if turbulence >= self.turbulence_thresh else 0309310turbulence = (311self.sigmoid_sign(turbulence, self.turbulence_thresh) * 2**-5312).astype(np.float32)313314tech = tech * 2**-7315positions = self.alpaca.list_positions()316stocks = [0] * len(self.stockUniverse)317for position in positions:318ind = self.stockUniverse.index(position.symbol)319stocks[ind] = abs(int(float(position.qty)))320321stocks = np.asarray(stocks, dtype=float)322cash = float(self.alpaca.get_account().cash)323self.cash = cash324self.stocks = stocks325self.turbulence_bool = turbulence_bool326self.price = price327328amount = np.array(self.cash * (2**-12), dtype=np.float32)329scale = np.array(2**-6, dtype=np.float32)330state = np.hstack(331(332amount,333turbulence,334self.turbulence_bool,335price * scale,336self.stocks * scale,337self.stocks_cd,338tech,339)340).astype(np.float32)341state[np.isnan(state)] = 0.0342state[np.isinf(state)] = 0.0343# print(len(self.stockUniverse))344return state345346def submitOrder(self, qty, stock, side, resp):347if qty > 0:348try:349self.alpaca.submit_order(stock, qty, side, "market", "day")350print(351"Market order of | "352+ str(qty)353+ " "354+ stock355+ " "356+ side357+ " | completed."358)359resp.append(True)360except:361print(362"Order of | "363+ str(qty)364+ " "365+ stock366+ " "367+ side368+ " | did not go through."369)370resp.append(False)371else:372"""373print(374"Quantity is 0, order of | "375+ str(qty)376+ " "377+ stock378+ " "379+ side380+ " | not completed."381)382"""383resp.append(True)384385@staticmethod386def sigmoid_sign(ary, thresh):387def sigmoid(x):388return 1 / (1 + np.exp(-x * np.e)) - 0.5389390return sigmoid(ary / thresh) * thresh391392393class StockEnvEmpty(gym.Env):394# Empty Env used for loading rllib agent395def __init__(self, config):396state_dim = config["state_dim"]397action_dim = config["action_dim"]398self.env_num = 1399self.max_step = 10000400self.env_name = "StockEnvEmpty"401self.state_dim = state_dim402self.action_dim = action_dim403self.if_discrete = False404self.target_return = 9999405self.observation_space = gym.spaces.Box(406low=-3000, high=3000, shape=(state_dim,), dtype=np.float32407)408self.action_space = gym.spaces.Box(409low=-1, high=1, shape=(action_dim,), dtype=np.float32410)411412def reset(413self,414*,415seed=None,416options=None,417):418return419420def step(self, actions):421return422423424