Path: blob/master/finrl/meta/data_processors/processor_sinopac.py
732 views
from __future__ import annotations12import numpy as np3import pandas as pd4import pandas_market_calendars as tc5import shioaji as sj6import talib7from shioaji import Exchange8from shioaji import TickSTKv1910from finrl.meta.preprocessor.shioajidownloader import SinopacDownloader111213class SinopacProcessor:14def __init__(self, API_KEY=None, API_SECRET=None, api=None):15if api is None:16try:17self.api = sj.Shioaji()18self.api.login(19api_key=API_KEY,20secret_key=API_SECRET,21contracts_cb=lambda security_type: print(22f"{repr(security_type)} fetch done."23),24)25except BaseException:26raise ValueError("Wrong Account Info!")27else:28self.api = api2930def download_data(self):31ticker_list = ticker_list.astype(str).split(",")32downloader = SinopacDownloader(33api=self.api,34start_date=self.start_date,35end_date=self.end_date,36ticker_list=self.ticker_list,37)38# 使用 downloader 獲取數據39data = downloader.fetch_data(api=self.api)40return data4142@staticmethod43def clean_individual_ticker(args):44tic, df, times = args45tic_df = df[df["tic"] == tic].set_index("timestamp")4647# Create a new DataFrame to ensure all time points are included48tmp_df = pd.DataFrame(index=times)49tmp_df = tmp_df.join(50tic_df[["Open", "High", "Low", "Close", "Volume", "Amount"]], how="left"51)5253# Fill NaN values using forward fill54tmp_df.ffill(inplace=True)5556# Append ticker code and date57tmp_df["tic"] = tic58tmp_df["date"] = tmp_df.index.strftime("%Y-%m-%d")5960tmp_df.reset_index(inplace=True)61tmp_df.rename(columns={"index": "timestamp"}, inplace=True)6263return tmp_df6465def clean_data(self, df):6667print("Data cleaning started")68tic_list = df["tic"].unique()69n_tickers = len(tic_list)70self.start = df["timestamp"].min()71self.end = df["timestamp"].max()7273# 生成全时间序列74times = pd.date_range(75start=self.start, end=self.end, freq="min"76) # 'T' 代表分钟级别的频率7778# 处理每个股票的数据79results = []80for tic in tic_list:81cleaned_data = self.clean_individual_ticker((tic, df, times))82results.append(cleaned_data)8384# 合并结果85new_df = pd.concat(results)86print(new_df.columns)87print("Data cleaning finished!")88return new_df.reset_index(drop=True)8990def add_technical_indicator(self, df):91print("Started adding Indicators")92print(df.columns)93tech_indicator_list = talib.get_functions() # 获取所有 TA-Lib 可用指标9495# 调整列名以匹配 TA-Lib 的需求96df.rename(97columns={98"Open": "open",99"High": "high",100"Low": "low",101"Close": "close",102"Volume": "volume",103},104inplace=True,105)106107# 循环添加每个指标108for indicator in tech_indicator_list:109try:110if indicator == "MAVP":111pass112else:113# 获取指标函数114indicator_function = getattr(talib.abstract, indicator)115# 计算指标116result = indicator_function(df)117118# 如果结果是 Series,转换为 DataFrame 并重命名列119if isinstance(result, pd.Series):120df[indicator.lower()] = result121else: # 如果结果是 DataFrame,合并所有列122result.columns = [123f"{indicator.lower()}_{col}" for col in result.columns124]125df = pd.concat([df, result], axis=1)126except Exception as e:127print(f"Error calculating {indicator}: {str(e)}")128print(df.head())129print(df.tail())130print("Finished adding Indicators")131df.rename(132columns={133"open": "Open",134"high": "High",135"low": "Low",136"close": "Close",137"volume": "Volume",138},139inplace=True,140)141print(df.columns)142return df143144# Allows to multithread the add_vix function for quicker execution145def download_and_clean_data(self):146# VIX_index start at 2023-04-12147vix_kbars = self.api.kbars(148contract=self.api.Contracts.Indexs.TAIFEX["TAIFEXTAIWANVIX"],149start=self.start.strftime("%Y-%m-%d"),150end=self.end.strftime("%Y-%m-%d"),151)152vix_df = pd.DataFrame({**vix_kbars})153vix_df.ts = pd.to_datetime(vix_df.ts)154return vix_df155156def add_vix(self, data):157cleaned_vix = self.download_and_clean_data()158vix = cleaned_vix[["ts", "Close"]]159vix = vix.rename(columns={"ts": "timestamp", "Close": "VIXY"})160print("Started adding VIX data")161print(vix.head())162print(data.columns)163if "timestamp" not in data.columns:164print("No timestamp column found")165data = data.copy()166data = data.merge(vix, on="timestamp")167data = data.sort_values(["timestamp", "tic"]).reset_index(drop=True)168print("Finished adding VIX data")169return data170171def calculate_turbulence(self, data, time_period=252):172# can add other market assets173df = data.copy()174df_price_pivot = df.pivot(index="timestamp", columns="tic", values="Close")175# use returns to calculate turbulence176df_price_pivot = df_price_pivot.pct_change()177178unique_date = df.timestamp.unique()179# start after a fixed timestamp period180start = time_period181turbulence_index = [0] * start182# turbulence_index = [0]183count = 0184for i in range(start, len(unique_date)):185current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]186# use one year rolling window to calcualte covariance187hist_price = df_price_pivot[188(df_price_pivot.index < unique_date[i])189& (df_price_pivot.index >= unique_date[i - time_period])190]191# Drop tickers which has number missing values more than the "oldest" ticker192filtered_hist_price = hist_price.iloc[193hist_price.isna().sum().min() :194].dropna(axis=1)195196cov_temp = filtered_hist_price.cov()197current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(198filtered_hist_price, axis=0199)200temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(201current_temp.values.T202)203if temp > 0:204count += 1205if count > 2:206turbulence_temp = temp[0][0]207else:208# avoid large outlier because of the calculation just begins209turbulence_temp = 0210else:211turbulence_temp = 0212turbulence_index.append(turbulence_temp)213214turbulence_index = pd.DataFrame(215{"timestamp": df_price_pivot.index, "turbulence": turbulence_index}216)217218# print("turbulence_index\n", turbulence_index)219220return turbulence_index221222def add_turbulence(self, data, time_period=252):223"""224add turbulence index from a precalcualted dataframe225:param data: (df) pandas dataframe226:return: (df) pandas dataframe227"""228df = data.copy()229turbulence_index = self.calculate_turbulence(df, time_period=time_period)230df = df.merge(turbulence_index, on="timestamp")231df = df.sort_values(["timestamp", "tic"]).reset_index(drop=True)232return df233234def df_to_array(self, df, tech_indicator_list, if_vix):235df = df.copy()236unique_ticker = df.tic.unique()237if_first_time = True238for tic in unique_ticker:239if if_first_time:240price_array = df[df.tic == tic][["Close"]].values241tech_array = df[df.tic == tic][tech_indicator_list].values242if if_vix:243turbulence_array = df[df.tic == tic]["VIXY"].values244else:245turbulence_array = df[df.tic == tic]["turbulence"].values246if_first_time = False247else:248price_array = np.hstack(249[price_array, df[df.tic == tic][["Close"]].values]250)251tech_array = np.hstack(252[tech_array, df[df.tic == tic][tech_indicator_list].values]253)254# print("Successfully transformed into array")255return price_array, tech_array, turbulence_array256257def get_trading_days(self, start, end):258xtai = tc.get_calendar("XTAI")259# df = xtai.sessions_in_range(260# pd.Timestamp(start).tz_localize(None), pd.Timestamp(end).tz_localize(None)261# )262df = xtai.date_range_htf("1D", pd.Timestamp(start), pd.Timestamp(end))263trading_days = []264for day in df:265trading_days.append(str(day)[:10])266return trading_days267268def on_tick(self, exchange: Exchange, tick: TickSTKv1):269tick_data = {270"timestamp": tick.datetime,271"tic": tick.code,272"Open": float(tick.open),273"High": float(tick.high),274"Low": float(tick.low),275"Close": float(tick.close),276"Volume": tick.volume,277}278self.data = self.data.append(tick_data, ignore_index=True)279280def fetch_latest_data(281self, ticker_list, time_interval, tech_indicator_list, limit=100282) -> pd.DataFrame:283data_df = pd.DataFrame()284for tic in ticker_list:285contract = self.api.Contracts.Stocks[tic]286self.api.quote.subscribe(287contract,288quote_type=sj.constant.QuoteType.Tick,289version=sj.constant.QuoteVersion.v1,290)291292def resample_to_kbars(group):293group.set_index("timestamp", inplace=True)294ohlc_dict = {"price": "ohlc", "volume": "sum"}295kbars = group.resample("1T").apply(ohlc_dict)296kbars.columns = ["Open", "High", "Low", "Close", "Volume"]297return kbars298299kbars_data = []300for tic in ticker_list:301tic_data = self.data[self.data.tic == tic]302kbars = resample_to_kbars(tic_data)303kbars["tic"] = tic304kbars_data.append(kbars)305306self.data = pd.concat(kbars_data).reset_index()307self.data = self.data.sort_values(["timestamp", "tic"]).reset_index(drop=True)308309df = self.add_technical_indicator(self.data, tech_indicator_list)310df["VIXY"] = 0311312price_array, tech_array, turbulence_array = self.df_to_array(313df, tech_indicator_list, if_vix=True314)315latest_price = price_array[-1]316latest_tech = tech_array[-1]317turb_df = self.api.kbars(318contract=self.api.Contracts.Indexs.TAIFEX["TAIFEXTAIWANVIX"],319start=self.end_date,320end=self.end_date,321)322latest_turb = pd.DataFrame({**turb_df})["Close"].values323return latest_price, latest_tech, latest_turb324325326