Path: blob/master/finrl/meta/env_stock_trading/env_stock_papertrading.py
732 views
from __future__ import annotations12import datetime3import threading4import time56import alpaca_trade_api as tradeapi7import gymnasium as gym8import numpy as np9import pandas as pd10import torch1112from finrl.meta.data_processors.processor_alpaca import AlpacaProcessor131415class AlpacaPaperTrading:16def __init__(17self,18ticker_list,19time_interval,20drl_lib,21agent,22cwd,23net_dim,24state_dim,25action_dim,26API_KEY,27API_SECRET,28API_BASE_URL,29tech_indicator_list,30turbulence_thresh=30,31max_stock=1e2,32latency=None,33):34# load agent35self.drl_lib = drl_lib36if agent == "ppo":37if drl_lib == "elegantrl":38from elegantrl.agents import AgentPPO39from elegantrl.train.run import init_agent40from elegantrl.train.config import (41Arguments,42) # bug fix:ModuleNotFoundError: No module named 'elegantrl.run'4344# load agent45config = {46"state_dim": state_dim,47"action_dim": action_dim,48}49args = Arguments(agent_class=AgentPPO, env=StockEnvEmpty(config))50args.cwd = cwd51args.net_dim = net_dim52# load agent53try:54agent = init_agent(args, gpu_id=0)55self.act = agent.act56self.device = agent.device57except BaseException:58raise ValueError("Fail to load agent!")5960elif drl_lib == "rllib":61from ray.rllib.agents import ppo62from ray.rllib.agents.ppo.ppo import PPOTrainer6364config = ppo.DEFAULT_CONFIG.copy()65config["env"] = StockEnvEmpty66config["log_level"] = "WARN"67config["env_config"] = {68"state_dim": state_dim,69"action_dim": action_dim,70}71trainer = PPOTrainer(env=StockEnvEmpty, config=config)72trainer.restore(cwd)73try:74trainer.restore(cwd)75self.agent = trainer76print("Restoring from checkpoint path", cwd)77except:78raise ValueError("Fail to load agent!")7980elif drl_lib == "stable_baselines3":81from stable_baselines3 import PPO8283try:84# load agent85self.model = PPO.load(cwd)86print("Successfully load model", cwd)87except:88raise ValueError("Fail to load agent!")8990else:91raise ValueError(92"The DRL library input is NOT supported yet. Please check your input."93)9495else:96raise ValueError("Agent input is NOT supported yet.")9798# connect to Alpaca trading API99try:100self.alpaca = tradeapi.REST(API_KEY, API_SECRET, API_BASE_URL, "v2")101except:102raise ValueError(103"Fail to connect Alpaca. Please check account info and internet connection."104)105106# read trading time interval107if time_interval == "1s":108self.time_interval = 1109elif time_interval == "5s":110self.time_interval = 5111elif time_interval == "1Min":112self.time_interval = 60113elif time_interval == "5Min":114self.time_interval = 60 * 5115elif time_interval == "15Min":116self.time_interval = 60 * 15117elif (118time_interval == "1D"119): # bug fix:1D ValueError: Time interval input is NOT supported yet. Maybe any other better ways120self.time_interval = 24 * 60 * 60121else:122raise ValueError("Time interval input is NOT supported yet.")123124# read trading settings125self.tech_indicator_list = tech_indicator_list126self.turbulence_thresh = turbulence_thresh127self.max_stock = max_stock128129# initialize account130self.stocks = np.asarray([0] * len(ticker_list)) # stocks holding131self.stocks_cd = np.zeros_like(self.stocks)132self.cash = None # cash record133self.stocks_df = pd.DataFrame(134self.stocks, columns=["stocks"], index=ticker_list135)136self.asset_list = []137self.price = np.asarray([0] * len(ticker_list))138self.stockUniverse = ticker_list139self.turbulence_bool = 0140self.equities = []141142def test_latency(self, test_times=10):143total_time = 0144for i in range(0, test_times):145time0 = time.time()146self.get_state()147time1 = time.time()148temp_time = time1 - time0149total_time += temp_time150latency = total_time / test_times151print("latency for data processing: ", latency)152return latency153154def run(self):155orders = self.alpaca.list_orders(status="open")156for order in orders:157self.alpaca.cancel_order(order.id)158159# Wait for market to open.160print("Waiting for market to open...")161tAMO = threading.Thread(target=self.awaitMarketOpen)162tAMO.start()163tAMO.join()164print("Market opened.")165while True:166# Figure out when the market will close so we can prepare to sell beforehand.167clock = self.alpaca.get_clock()168closingTime = clock.next_close.replace(169tzinfo=datetime.timezone.utc170).timestamp()171currTime = clock.timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()172self.timeToClose = closingTime - currTime173174if self.timeToClose < (60):175# Close all positions when 1 minutes til market close.176print("Market closing soon. Stop trading.")177break178179"""# Close all positions when 1 minutes til market close.180print("Market closing soon. Closing positions.")181182positions = self.alpaca.list_positions()183for position in positions:184if(position.side == 'long'):185orderSide = 'sell'186else:187orderSide = 'buy'188qty = abs(int(float(position.qty)))189respSO = []190tSubmitOrder = threading.Thread(target=self.submitOrder(qty, position.symbol, orderSide, respSO))191tSubmitOrder.start()192tSubmitOrder.join()193194# Run script again after market close for next trading day.195print("Sleeping until market close (15 minutes).")196time.sleep(60 * 15)"""197198else:199trade = threading.Thread(target=self.trade)200trade.start()201trade.join()202last_equity = float(self.alpaca.get_account().last_equity)203cur_time = time.time()204self.equities.append([cur_time, last_equity])205time.sleep(self.time_interval)206207def awaitMarketOpen(self):208isOpen = self.alpaca.get_clock().is_open209while not isOpen:210clock = self.alpaca.get_clock()211openingTime = clock.next_open.replace(212tzinfo=datetime.timezone.utc213).timestamp()214currTime = clock.timestamp.replace(tzinfo=datetime.timezone.utc).timestamp()215timeToOpen = int((openingTime - currTime) / 60)216print(str(timeToOpen) + " minutes til market open.")217time.sleep(60)218isOpen = self.alpaca.get_clock().is_open219220def trade(self):221state = self.get_state()222223if self.drl_lib == "elegantrl":224with torch.no_grad():225s_tensor = torch.as_tensor((state,), device=self.device)226a_tensor = self.act(s_tensor)227action = a_tensor.detach().cpu().numpy()[0]228229action = (action * self.max_stock).astype(int)230231elif self.drl_lib == "rllib":232action = self.agent.compute_single_action(state)233234elif self.drl_lib == "stable_baselines3":235action = self.model.predict(state)[0]236237else:238raise ValueError(239"The DRL library input is NOT supported yet. Please check your input."240)241242self.stocks_cd += 1243if self.turbulence_bool == 0:244min_action = 10 # stock_cd245for index in np.where(action < -min_action)[0]: # sell_index:246sell_num_shares = min(self.stocks[index], -action[index])247qty = abs(int(sell_num_shares))248respSO = []249tSubmitOrder = threading.Thread(250target=self.submitOrder(251qty, self.stockUniverse[index], "sell", respSO252)253)254tSubmitOrder.start()255tSubmitOrder.join()256self.cash = float(self.alpaca.get_account().cash)257self.stocks_cd[index] = 0258259for index in np.where(action > min_action)[0]: # buy_index:260if self.cash < 0:261tmp_cash = 0262else:263tmp_cash = self.cash264buy_num_shares = min(265tmp_cash // self.price[index], abs(int(action[index]))266)267qty = abs(int(buy_num_shares))268respSO = []269tSubmitOrder = threading.Thread(270target=self.submitOrder(271qty, self.stockUniverse[index], "buy", respSO272)273)274tSubmitOrder.start()275tSubmitOrder.join()276self.cash = float(self.alpaca.get_account().cash)277self.stocks_cd[index] = 0278279else: # sell all when turbulence280positions = self.alpaca.list_positions()281for position in positions:282if position.side == "long":283orderSide = "sell"284else:285orderSide = "buy"286qty = abs(int(float(position.qty)))287respSO = []288tSubmitOrder = threading.Thread(289target=self.submitOrder(qty, position.symbol, orderSide, respSO)290)291tSubmitOrder.start()292tSubmitOrder.join()293294self.stocks_cd[:] = 0295296def get_state(self):297alpaca = AlpacaProcessor(api=self.alpaca)298price, tech, turbulence = alpaca.fetch_latest_data(299ticker_list=self.stockUniverse,300time_interval="1Min",301tech_indicator_list=self.tech_indicator_list,302)303turbulence_bool = 1 if turbulence >= self.turbulence_thresh else 0304305turbulence = (306self.sigmoid_sign(turbulence, self.turbulence_thresh) * 2**-5307).astype(np.float32)308309tech = tech * 2**-7310positions = self.alpaca.list_positions()311stocks = [0] * len(self.stockUniverse)312for position in positions:313ind = self.stockUniverse.index(position.symbol)314stocks[ind] = abs(int(float(position.qty)))315316stocks = np.asarray(stocks, dtype=float)317cash = float(self.alpaca.get_account().cash)318self.cash = cash319self.stocks = stocks320self.turbulence_bool = turbulence_bool321self.price = price322323amount = np.array(self.cash * (2**-12), dtype=np.float32)324scale = np.array(2**-6, dtype=np.float32)325state = np.hstack(326(327amount,328turbulence,329self.turbulence_bool,330price * scale,331self.stocks * scale,332self.stocks_cd,333tech,334)335).astype(np.float32)336print(len(self.stockUniverse))337return state338339def submitOrder(self, qty, stock, side, resp):340if qty > 0:341try:342self.alpaca.submit_order(stock, qty, side, "market", "day")343print(344"Market order of | "345+ str(qty)346+ " "347+ stock348+ " "349+ side350+ " | completed."351)352resp.append(True)353except:354print(355"Order of | "356+ str(qty)357+ " "358+ stock359+ " "360+ side361+ " | did not go through."362)363resp.append(False)364else:365print(366"Quantity is 0, order of | "367+ str(qty)368+ " "369+ stock370+ " "371+ side372+ " | not completed."373)374resp.append(True)375376@staticmethod377def sigmoid_sign(ary, thresh):378def sigmoid(x):379return 1 / (1 + np.exp(-x * np.e)) - 0.5380381return sigmoid(ary / thresh) * thresh382383384class StockEnvEmpty(gym.Env):385# Empty Env used for loading rllib agent386def __init__(self, config):387state_dim = config["state_dim"]388action_dim = config["action_dim"]389self.env_num = 1390self.max_step = 10000391self.env_name = "StockEnvEmpty"392self.state_dim = state_dim393self.action_dim = action_dim394self.if_discrete = False395self.target_return = 9999396self.observation_space = gym.spaces.Box(397low=-3000, high=3000, shape=(state_dim,), dtype=np.float32398)399self.action_space = gym.spaces.Box(400low=-1, high=1, shape=(action_dim,), dtype=np.float32401)402403def reset(404self,405*,406seed=None,407options=None,408):409return410411def step(self, actions):412return413414415