Path: blob/master/finrl/meta/data_processors/processor_ccxt.py
732 views
from __future__ import annotations12import calendar3from datetime import datetime45import ccxt6import numpy as np7import pandas as pd8from stockstats import StockDataFrame as Sdf91011class CCXTEngineer:12def __init__(self):13self.binance = ccxt.binance()1415def data_fetch(self, start, end, pair_list=["BTC/USDT"], period="1m"):16def min_ohlcv(dt, pair, limit):17since = calendar.timegm(dt.utctimetuple()) * 100018ohlcv = self.binance.fetch_ohlcv(19symbol=pair, timeframe="1m", since=since, limit=limit20)21return ohlcv2223def ohlcv(dt, pair, period="1d"):24ohlcv = []25limit = 100026if period == "1m":27limit = 72028elif period == "1d":29limit = 130elif period == "1h":31limit = 2432elif period == "5m":33limit = 28834for i in dt:35start_dt = i36since = calendar.timegm(start_dt.utctimetuple()) * 100037if period == "1m":38ohlcv.extend(min_ohlcv(start_dt, pair, limit))39else:40ohlcv.extend(41self.binance.fetch_ohlcv(42symbol=pair, timeframe=period, since=since, limit=limit43)44)45df = pd.DataFrame(46ohlcv, columns=["time", "open", "high", "low", "close", "volume"]47)48df["time"] = [49datetime.fromtimestamp(float(time) / 1000) for time in df["time"]50]51df["open"] = df["open"].astype(np.float64)52df["high"] = df["high"].astype(np.float64)53df["low"] = df["low"].astype(np.float64)54df["close"] = df["close"].astype(np.float64)55df["volume"] = df["volume"].astype(np.float64)56return df5758crypto_column = pd.MultiIndex.from_product(59[pair_list, ["open", "high", "low", "close", "volume"]]60)61first_time = True62for pair in pair_list:63start_dt = datetime.strptime(start, "%Y%m%d %H:%M:%S")64end_dt = datetime.strptime(end, "%Y%m%d %H:%M:%S")65start_timestamp = calendar.timegm(start_dt.utctimetuple())66end_timestamp = calendar.timegm(end_dt.utctimetuple())67if period == "1m":68date_list = [69datetime.utcfromtimestamp(float(time))70for time in range(start_timestamp, end_timestamp, 60 * 720)71]72else:73date_list = [74datetime.utcfromtimestamp(float(time))75for time in range(start_timestamp, end_timestamp, 60 * 1440)76]77df = ohlcv(date_list, pair, period)78if first_time:79dataset = pd.DataFrame(columns=crypto_column, index=df["time"].values)80first_time = False81temp_col = pd.MultiIndex.from_product(82[[pair], ["open", "high", "low", "close", "volume"]]83)84dataset[temp_col] = df[["open", "high", "low", "close", "volume"]].values85print("Actual end time: " + str(df["time"].values[-1]))86return dataset8788def add_technical_indicators(89self,90df,91pair_list,92tech_indicator_list=[93"macd",94"boll_ub",95"boll_lb",96"rsi_30",97"dx_30",98"close_30_sma",99"close_60_sma",100],101):102df = df.dropna()103df = df.copy()104column_list = [105pair_list,106["open", "high", "low", "close", "volume"] + (tech_indicator_list),107]108column = pd.MultiIndex.from_product(column_list)109index_list = df.index110dataset = pd.DataFrame(columns=column, index=index_list)111for pair in pair_list:112pair_column = pd.MultiIndex.from_product(113[[pair], ["open", "high", "low", "close", "volume"]]114)115dataset[pair_column] = df[pair]116temp_df = df[pair].reset_index().sort_values(by=["index"])117temp_df = temp_df.rename(columns={"index": "date"})118crypto_df = Sdf.retype(temp_df.copy())119for indicator in tech_indicator_list:120temp_indicator = crypto_df[indicator].values.tolist()121dataset[(pair, indicator)] = temp_indicator122print("Succesfully add technical indicators")123return dataset124125def df_to_ary(126self,127df,128pair_list,129tech_indicator_list=[130"macd",131"boll_ub",132"boll_lb",133"rsi_30",134"dx_30",135"close_30_sma",136"close_60_sma",137],138):139df = df.dropna()140date_ary = df.index.values141price_array = df[pd.MultiIndex.from_product([pair_list, ["close"]])].values142tech_array = df[143pd.MultiIndex.from_product([pair_list, tech_indicator_list])144].values145return price_array, tech_array, date_ary146147148