Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AI4Finance-Foundation
GitHub Repository: AI4Finance-Foundation/FinRL
Path: blob/master/finrl/meta/data_processors/processor_ccxt.py
732 views
1
from __future__ import annotations
2
3
import calendar
4
from datetime import datetime
5
6
import ccxt
7
import numpy as np
8
import pandas as pd
9
from stockstats import StockDataFrame as Sdf
10
11
12
class CCXTEngineer:
13
def __init__(self):
14
self.binance = ccxt.binance()
15
16
def data_fetch(self, start, end, pair_list=["BTC/USDT"], period="1m"):
17
def min_ohlcv(dt, pair, limit):
18
since = calendar.timegm(dt.utctimetuple()) * 1000
19
ohlcv = self.binance.fetch_ohlcv(
20
symbol=pair, timeframe="1m", since=since, limit=limit
21
)
22
return ohlcv
23
24
def ohlcv(dt, pair, period="1d"):
25
ohlcv = []
26
limit = 1000
27
if period == "1m":
28
limit = 720
29
elif period == "1d":
30
limit = 1
31
elif period == "1h":
32
limit = 24
33
elif period == "5m":
34
limit = 288
35
for i in dt:
36
start_dt = i
37
since = calendar.timegm(start_dt.utctimetuple()) * 1000
38
if period == "1m":
39
ohlcv.extend(min_ohlcv(start_dt, pair, limit))
40
else:
41
ohlcv.extend(
42
self.binance.fetch_ohlcv(
43
symbol=pair, timeframe=period, since=since, limit=limit
44
)
45
)
46
df = pd.DataFrame(
47
ohlcv, columns=["time", "open", "high", "low", "close", "volume"]
48
)
49
df["time"] = [
50
datetime.fromtimestamp(float(time) / 1000) for time in df["time"]
51
]
52
df["open"] = df["open"].astype(np.float64)
53
df["high"] = df["high"].astype(np.float64)
54
df["low"] = df["low"].astype(np.float64)
55
df["close"] = df["close"].astype(np.float64)
56
df["volume"] = df["volume"].astype(np.float64)
57
return df
58
59
crypto_column = pd.MultiIndex.from_product(
60
[pair_list, ["open", "high", "low", "close", "volume"]]
61
)
62
first_time = True
63
for pair in pair_list:
64
start_dt = datetime.strptime(start, "%Y%m%d %H:%M:%S")
65
end_dt = datetime.strptime(end, "%Y%m%d %H:%M:%S")
66
start_timestamp = calendar.timegm(start_dt.utctimetuple())
67
end_timestamp = calendar.timegm(end_dt.utctimetuple())
68
if period == "1m":
69
date_list = [
70
datetime.utcfromtimestamp(float(time))
71
for time in range(start_timestamp, end_timestamp, 60 * 720)
72
]
73
else:
74
date_list = [
75
datetime.utcfromtimestamp(float(time))
76
for time in range(start_timestamp, end_timestamp, 60 * 1440)
77
]
78
df = ohlcv(date_list, pair, period)
79
if first_time:
80
dataset = pd.DataFrame(columns=crypto_column, index=df["time"].values)
81
first_time = False
82
temp_col = pd.MultiIndex.from_product(
83
[[pair], ["open", "high", "low", "close", "volume"]]
84
)
85
dataset[temp_col] = df[["open", "high", "low", "close", "volume"]].values
86
print("Actual end time: " + str(df["time"].values[-1]))
87
return dataset
88
89
def add_technical_indicators(
90
self,
91
df,
92
pair_list,
93
tech_indicator_list=[
94
"macd",
95
"boll_ub",
96
"boll_lb",
97
"rsi_30",
98
"dx_30",
99
"close_30_sma",
100
"close_60_sma",
101
],
102
):
103
df = df.dropna()
104
df = df.copy()
105
column_list = [
106
pair_list,
107
["open", "high", "low", "close", "volume"] + (tech_indicator_list),
108
]
109
column = pd.MultiIndex.from_product(column_list)
110
index_list = df.index
111
dataset = pd.DataFrame(columns=column, index=index_list)
112
for pair in pair_list:
113
pair_column = pd.MultiIndex.from_product(
114
[[pair], ["open", "high", "low", "close", "volume"]]
115
)
116
dataset[pair_column] = df[pair]
117
temp_df = df[pair].reset_index().sort_values(by=["index"])
118
temp_df = temp_df.rename(columns={"index": "date"})
119
crypto_df = Sdf.retype(temp_df.copy())
120
for indicator in tech_indicator_list:
121
temp_indicator = crypto_df[indicator].values.tolist()
122
dataset[(pair, indicator)] = temp_indicator
123
print("Succesfully add technical indicators")
124
return dataset
125
126
def df_to_ary(
127
self,
128
df,
129
pair_list,
130
tech_indicator_list=[
131
"macd",
132
"boll_ub",
133
"boll_lb",
134
"rsi_30",
135
"dx_30",
136
"close_30_sma",
137
"close_60_sma",
138
],
139
):
140
df = df.dropna()
141
date_ary = df.index.values
142
price_array = df[pd.MultiIndex.from_product([pair_list, ["close"]])].values
143
tech_array = df[
144
pd.MultiIndex.from_product([pair_list, tech_indicator_list])
145
].values
146
return price_array, tech_array, date_ary
147
148