Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/data_processors/processor_sinopac.py
732 views
1
from __future__ import annotations
2
3
import numpy as np
4
import pandas as pd
5
import pandas_market_calendars as tc
6
import shioaji as sj
7
import talib
8
from shioaji import Exchange
9
from shioaji import TickSTKv1
10
11
from finrl.meta.preprocessor.shioajidownloader import SinopacDownloader
12
13
14
class SinopacProcessor:
15
def __init__(self, API_KEY=None, API_SECRET=None, api=None):
16
if api is None:
17
try:
18
self.api = sj.Shioaji()
19
self.api.login(
20
api_key=API_KEY,
21
secret_key=API_SECRET,
22
contracts_cb=lambda security_type: print(
23
f"{repr(security_type)} fetch done."
24
),
25
)
26
except BaseException:
27
raise ValueError("Wrong Account Info!")
28
else:
29
self.api = api
30
31
def download_data(self):
32
ticker_list = ticker_list.astype(str).split(",")
33
downloader = SinopacDownloader(
34
api=self.api,
35
start_date=self.start_date,
36
end_date=self.end_date,
37
ticker_list=self.ticker_list,
38
)
39
# 使用 downloader 獲取數據
40
data = downloader.fetch_data(api=self.api)
41
return data
42
43
@staticmethod
44
def clean_individual_ticker(args):
45
tic, df, times = args
46
tic_df = df[df["tic"] == tic].set_index("timestamp")
47
48
# Create a new DataFrame to ensure all time points are included
49
tmp_df = pd.DataFrame(index=times)
50
tmp_df = tmp_df.join(
51
tic_df[["Open", "High", "Low", "Close", "Volume", "Amount"]], how="left"
52
)
53
54
# Fill NaN values using forward fill
55
tmp_df.ffill(inplace=True)
56
57
# Append ticker code and date
58
tmp_df["tic"] = tic
59
tmp_df["date"] = tmp_df.index.strftime("%Y-%m-%d")
60
61
tmp_df.reset_index(inplace=True)
62
tmp_df.rename(columns={"index": "timestamp"}, inplace=True)
63
64
return tmp_df
65
66
def clean_data(self, df):
67
68
print("Data cleaning started")
69
tic_list = df["tic"].unique()
70
n_tickers = len(tic_list)
71
self.start = df["timestamp"].min()
72
self.end = df["timestamp"].max()
73
74
# 生成全时间序列
75
times = pd.date_range(
76
start=self.start, end=self.end, freq="min"
77
) # 'T' 代表分钟级别的频率
78
79
# 处理每个股票的数据
80
results = []
81
for tic in tic_list:
82
cleaned_data = self.clean_individual_ticker((tic, df, times))
83
results.append(cleaned_data)
84
85
# 合并结果
86
new_df = pd.concat(results)
87
print(new_df.columns)
88
print("Data cleaning finished!")
89
return new_df.reset_index(drop=True)
90
91
def add_technical_indicator(self, df):
92
print("Started adding Indicators")
93
print(df.columns)
94
tech_indicator_list = talib.get_functions() # 获取所有 TA-Lib 可用指标
95
96
# 调整列名以匹配 TA-Lib 的需求
97
df.rename(
98
columns={
99
"Open": "open",
100
"High": "high",
101
"Low": "low",
102
"Close": "close",
103
"Volume": "volume",
104
},
105
inplace=True,
106
)
107
108
# 循环添加每个指标
109
for indicator in tech_indicator_list:
110
try:
111
if indicator == "MAVP":
112
pass
113
else:
114
# 获取指标函数
115
indicator_function = getattr(talib.abstract, indicator)
116
# 计算指标
117
result = indicator_function(df)
118
119
# 如果结果是 Series,转换为 DataFrame 并重命名列
120
if isinstance(result, pd.Series):
121
df[indicator.lower()] = result
122
else: # 如果结果是 DataFrame,合并所有列
123
result.columns = [
124
f"{indicator.lower()}_{col}" for col in result.columns
125
]
126
df = pd.concat([df, result], axis=1)
127
except Exception as e:
128
print(f"Error calculating {indicator}: {str(e)}")
129
print(df.head())
130
print(df.tail())
131
print("Finished adding Indicators")
132
df.rename(
133
columns={
134
"open": "Open",
135
"high": "High",
136
"low": "Low",
137
"close": "Close",
138
"volume": "Volume",
139
},
140
inplace=True,
141
)
142
print(df.columns)
143
return df
144
145
# Allows to multithread the add_vix function for quicker execution
146
def download_and_clean_data(self):
147
# VIX_index start at 2023-04-12
148
vix_kbars = self.api.kbars(
149
contract=self.api.Contracts.Indexs.TAIFEX["TAIFEXTAIWANVIX"],
150
start=self.start.strftime("%Y-%m-%d"),
151
end=self.end.strftime("%Y-%m-%d"),
152
)
153
vix_df = pd.DataFrame({**vix_kbars})
154
vix_df.ts = pd.to_datetime(vix_df.ts)
155
return vix_df
156
157
def add_vix(self, data):
158
cleaned_vix = self.download_and_clean_data()
159
vix = cleaned_vix[["ts", "Close"]]
160
vix = vix.rename(columns={"ts": "timestamp", "Close": "VIXY"})
161
print("Started adding VIX data")
162
print(vix.head())
163
print(data.columns)
164
if "timestamp" not in data.columns:
165
print("No timestamp column found")
166
data = data.copy()
167
data = data.merge(vix, on="timestamp")
168
data = data.sort_values(["timestamp", "tic"]).reset_index(drop=True)
169
print("Finished adding VIX data")
170
return data
171
172
def calculate_turbulence(self, data, time_period=252):
173
# can add other market assets
174
df = data.copy()
175
df_price_pivot = df.pivot(index="timestamp", columns="tic", values="Close")
176
# use returns to calculate turbulence
177
df_price_pivot = df_price_pivot.pct_change()
178
179
unique_date = df.timestamp.unique()
180
# start after a fixed timestamp period
181
start = time_period
182
turbulence_index = [0] * start
183
# turbulence_index = [0]
184
count = 0
185
for i in range(start, len(unique_date)):
186
current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
187
# use one year rolling window to calcualte covariance
188
hist_price = df_price_pivot[
189
(df_price_pivot.index < unique_date[i])
190
& (df_price_pivot.index >= unique_date[i - time_period])
191
]
192
# Drop tickers which has number missing values more than the "oldest" ticker
193
filtered_hist_price = hist_price.iloc[
194
hist_price.isna().sum().min() :
195
].dropna(axis=1)
196
197
cov_temp = filtered_hist_price.cov()
198
current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(
199
filtered_hist_price, axis=0
200
)
201
temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
202
current_temp.values.T
203
)
204
if temp > 0:
205
count += 1
206
if count > 2:
207
turbulence_temp = temp[0][0]
208
else:
209
# avoid large outlier because of the calculation just begins
210
turbulence_temp = 0
211
else:
212
turbulence_temp = 0
213
turbulence_index.append(turbulence_temp)
214
215
turbulence_index = pd.DataFrame(
216
{"timestamp": df_price_pivot.index, "turbulence": turbulence_index}
217
)
218
219
# print("turbulence_index\n", turbulence_index)
220
221
return turbulence_index
222
223
def add_turbulence(self, data, time_period=252):
224
"""
225
add turbulence index from a precalcualted dataframe
226
:param data: (df) pandas dataframe
227
:return: (df) pandas dataframe
228
"""
229
df = data.copy()
230
turbulence_index = self.calculate_turbulence(df, time_period=time_period)
231
df = df.merge(turbulence_index, on="timestamp")
232
df = df.sort_values(["timestamp", "tic"]).reset_index(drop=True)
233
return df
234
235
def df_to_array(self, df, tech_indicator_list, if_vix):
236
df = df.copy()
237
unique_ticker = df.tic.unique()
238
if_first_time = True
239
for tic in unique_ticker:
240
if if_first_time:
241
price_array = df[df.tic == tic][["Close"]].values
242
tech_array = df[df.tic == tic][tech_indicator_list].values
243
if if_vix:
244
turbulence_array = df[df.tic == tic]["VIXY"].values
245
else:
246
turbulence_array = df[df.tic == tic]["turbulence"].values
247
if_first_time = False
248
else:
249
price_array = np.hstack(
250
[price_array, df[df.tic == tic][["Close"]].values]
251
)
252
tech_array = np.hstack(
253
[tech_array, df[df.tic == tic][tech_indicator_list].values]
254
)
255
# print("Successfully transformed into array")
256
return price_array, tech_array, turbulence_array
257
258
def get_trading_days(self, start, end):
259
xtai = tc.get_calendar("XTAI")
260
# df = xtai.sessions_in_range(
261
# pd.Timestamp(start).tz_localize(None), pd.Timestamp(end).tz_localize(None)
262
# )
263
df = xtai.date_range_htf("1D", pd.Timestamp(start), pd.Timestamp(end))
264
trading_days = []
265
for day in df:
266
trading_days.append(str(day)[:10])
267
return trading_days
268
269
def on_tick(self, exchange: Exchange, tick: TickSTKv1):
270
tick_data = {
271
"timestamp": tick.datetime,
272
"tic": tick.code,
273
"Open": float(tick.open),
274
"High": float(tick.high),
275
"Low": float(tick.low),
276
"Close": float(tick.close),
277
"Volume": tick.volume,
278
}
279
self.data = self.data.append(tick_data, ignore_index=True)
280
281
def fetch_latest_data(
282
self, ticker_list, time_interval, tech_indicator_list, limit=100
283
) -> pd.DataFrame:
284
data_df = pd.DataFrame()
285
for tic in ticker_list:
286
contract = self.api.Contracts.Stocks[tic]
287
self.api.quote.subscribe(
288
contract,
289
quote_type=sj.constant.QuoteType.Tick,
290
version=sj.constant.QuoteVersion.v1,
291
)
292
293
def resample_to_kbars(group):
294
group.set_index("timestamp", inplace=True)
295
ohlc_dict = {"price": "ohlc", "volume": "sum"}
296
kbars = group.resample("1T").apply(ohlc_dict)
297
kbars.columns = ["Open", "High", "Low", "Close", "Volume"]
298
return kbars
299
300
kbars_data = []
301
for tic in ticker_list:
302
tic_data = self.data[self.data.tic == tic]
303
kbars = resample_to_kbars(tic_data)
304
kbars["tic"] = tic
305
kbars_data.append(kbars)
306
307
self.data = pd.concat(kbars_data).reset_index()
308
self.data = self.data.sort_values(["timestamp", "tic"]).reset_index(drop=True)
309
310
df = self.add_technical_indicator(self.data, tech_indicator_list)
311
df["VIXY"] = 0
312
313
price_array, tech_array, turbulence_array = self.df_to_array(
314
df, tech_indicator_list, if_vix=True
315
)
316
latest_price = price_array[-1]
317
latest_tech = tech_array[-1]
318
turb_df = self.api.kbars(
319
contract=self.api.Contracts.Indexs.TAIFEX["TAIFEXTAIWANVIX"],
320
start=self.end_date,
321
end=self.end_date,
322
)
323
latest_turb = pd.DataFrame({**turb_df})["Close"].values
324
return latest_price, latest_tech, latest_turb
325
326