Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/env_stock_trading/env_stocktrading_np.py
732 views
1
from __future__ import annotations
2
3
import gymnasium as gym
4
import numpy as np
5
from numpy import random as rd
6
7
8
class StockTradingEnv(gym.Env):
9
def __init__(
10
self,
11
config,
12
initial_account=1e6,
13
gamma=0.99,
14
turbulence_thresh=99,
15
min_stock_rate=0.1,
16
max_stock=1e2,
17
initial_capital=1e6,
18
buy_cost_pct=1e-3,
19
sell_cost_pct=1e-3,
20
reward_scaling=2**-11,
21
initial_stocks=None,
22
):
23
price_ary = config["price_array"]
24
tech_ary = config["tech_array"]
25
turbulence_ary = config["turbulence_array"]
26
if_train = config["if_train"]
27
self.price_ary = price_ary.astype(np.float32)
28
self.tech_ary = tech_ary.astype(np.float32)
29
self.turbulence_ary = turbulence_ary
30
31
self.tech_ary = self.tech_ary * 2**-7
32
self.turbulence_bool = (turbulence_ary > turbulence_thresh).astype(np.float32)
33
self.turbulence_ary = (
34
self.sigmoid_sign(turbulence_ary, turbulence_thresh) * 2**-5
35
).astype(np.float32)
36
37
stock_dim = self.price_ary.shape[1]
38
self.gamma = gamma
39
self.max_stock = max_stock
40
self.min_stock_rate = min_stock_rate
41
self.buy_cost_pct = buy_cost_pct
42
self.sell_cost_pct = sell_cost_pct
43
self.reward_scaling = reward_scaling
44
self.initial_capital = initial_capital
45
self.initial_stocks = (
46
np.zeros(stock_dim, dtype=np.float32)
47
if initial_stocks is None
48
else initial_stocks
49
)
50
51
# reset()
52
self.day = None
53
self.amount = None
54
self.stocks = None
55
self.total_asset = None
56
self.gamma_reward = None
57
self.initial_total_asset = None
58
59
# environment information
60
self.env_name = "StockEnv"
61
# self.state_dim = 1 + 2 + 2 * stock_dim + self.tech_ary.shape[1]
62
# # amount + (turbulence, turbulence_bool) + (price, stock) * stock_dim + tech_dim
63
self.state_dim = 1 + 2 + 3 * stock_dim + self.tech_ary.shape[1]
64
# amount + (turbulence, turbulence_bool) + (price, stock) * stock_dim + tech_dim
65
self.stocks_cd = None
66
self.action_dim = stock_dim
67
self.max_step = self.price_ary.shape[0] - 1
68
self.if_train = if_train
69
self.if_discrete = False
70
self.target_return = 10.0
71
self.episode_return = 0.0
72
73
self.observation_space = gym.spaces.Box(
74
low=-3000, high=3000, shape=(self.state_dim,), dtype=np.float32
75
)
76
self.action_space = gym.spaces.Box(
77
low=-1, high=1, shape=(self.action_dim,), dtype=np.float32
78
)
79
80
def reset(
81
self,
82
*,
83
seed=None,
84
options=None,
85
):
86
self.day = 0
87
price = self.price_ary[self.day]
88
89
if self.if_train:
90
self.stocks = (
91
self.initial_stocks + rd.randint(0, 64, size=self.initial_stocks.shape)
92
).astype(np.float32)
93
self.stocks_cool_down = np.zeros_like(self.stocks)
94
self.amount = (
95
self.initial_capital * rd.uniform(0.95, 1.05)
96
- (self.stocks * price).sum()
97
)
98
else:
99
self.stocks = self.initial_stocks.astype(np.float32)
100
self.stocks_cool_down = np.zeros_like(self.stocks)
101
self.amount = self.initial_capital
102
103
self.total_asset = self.amount + (self.stocks * price).sum()
104
self.initial_total_asset = self.total_asset
105
self.gamma_reward = 0.0
106
return self.get_state(price), {} # state
107
108
def step(self, actions):
109
actions = (actions * self.max_stock).astype(int)
110
111
self.day += 1
112
price = self.price_ary[self.day]
113
self.stocks_cool_down += 1
114
115
if self.turbulence_bool[self.day] == 0:
116
min_action = int(self.max_stock * self.min_stock_rate) # stock_cd
117
for index in np.where(actions < -min_action)[0]: # sell_index:
118
if price[index] > 0: # Sell only if current asset is > 0
119
sell_num_shares = min(self.stocks[index], -actions[index])
120
self.stocks[index] -= sell_num_shares
121
self.amount += (
122
price[index] * sell_num_shares * (1 - self.sell_cost_pct)
123
)
124
self.stocks_cool_down[index] = 0
125
for index in np.where(actions > min_action)[0]: # buy_index:
126
if (
127
price[index] > 0
128
): # Buy only if the price is > 0 (no missing data in this particular date)
129
buy_num_shares = min(self.amount // price[index], actions[index])
130
self.stocks[index] += buy_num_shares
131
self.amount -= (
132
price[index] * buy_num_shares * (1 + self.buy_cost_pct)
133
)
134
self.stocks_cool_down[index] = 0
135
136
else: # sell all when turbulence
137
self.amount += (self.stocks * price).sum() * (1 - self.sell_cost_pct)
138
self.stocks[:] = 0
139
self.stocks_cool_down[:] = 0
140
141
state = self.get_state(price)
142
total_asset = self.amount + (self.stocks * price).sum()
143
reward = (total_asset - self.total_asset) * self.reward_scaling
144
self.total_asset = total_asset
145
146
self.gamma_reward = self.gamma_reward * self.gamma + reward
147
done = self.day == self.max_step
148
if done:
149
reward = self.gamma_reward
150
self.episode_return = total_asset / self.initial_total_asset
151
152
return state, reward, done, False, dict()
153
154
def get_state(self, price):
155
amount = np.array(self.amount * (2**-12), dtype=np.float32)
156
scale = np.array(2**-6, dtype=np.float32)
157
return np.hstack(
158
(
159
amount,
160
self.turbulence_ary[self.day],
161
self.turbulence_bool[self.day],
162
price * scale,
163
self.stocks * scale,
164
self.stocks_cool_down,
165
self.tech_ary[self.day],
166
)
167
) # state.astype(np.float32)
168
169
@staticmethod
170
def sigmoid_sign(ary, thresh):
171
def sigmoid(x):
172
return 1 / (1 + np.exp(-x * np.e)) - 0.5
173
174
return sigmoid(ary / thresh) * thresh
175
176