Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_tinkoff-invest-volume-analysis-robot-master/trading_robot.py
5925 views
1
import asyncio
2
import logging
3
from datetime import timedelta
4
5
import pandas as pd
6
from tinkoff.invest import (
7
AsyncClient
8
)
9
from tinkoff.invest.utils import now
10
11
from constants import ONE_HOUR_TO_MINUTES
12
from services.order_service import OrderService
13
from services.user_service import UserService
14
from settings import INSTRUMENTS, CAN_OPEN_ORDERS, TOKEN
15
from strategies.profile_touch_strategy import ProfileTouchStrategy
16
from utils.exchange_util import is_open_exchange
17
from utils.instrument_util import request_iterator, get_file_path_by_instrument
18
from utils.logger import init_logging
19
from utils.parse_util import processed_data
20
from utils.strategy_util import merge_two_frames, create_empty_df
21
22
pd.options.display.max_columns = None
23
pd.options.display.max_rows = None
24
pd.options.display.width = None
25
26
init_logging()
27
logger = logging.getLogger(__name__)
28
29
30
class TradingRobot:
31
def __init__(self):
32
self.order_service = OrderService(is_notification=True, can_open_orders=CAN_OPEN_ORDERS)
33
self.order_service.start()
34
35
self.is_history_processed = True
36
37
self.df_by_instrument = {}
38
self.strategy = {}
39
for instrument in INSTRUMENTS:
40
figi = instrument["figi"]
41
df_by_instrument = create_empty_df()
42
self.df_by_instrument[figi] = df_by_instrument
43
44
file_path = get_file_path_by_instrument(instrument)
45
instrument_file = open(file_path, "a", newline='')
46
df_by_instrument.to_csv(instrument_file, mode="a", header=instrument_file.tell() == 0, index=False)
47
48
profile_touch_strategy = ProfileTouchStrategy(instrument["name"])
49
profile_touch_strategy.start()
50
self.strategy[figi] = profile_touch_strategy
51
52
# актуализация DataFrame из полученных ранее данных
53
async def sync_df(self, client):
54
self.is_history_processed = True
55
for instrument in INSTRUMENTS:
56
try:
57
figi = instrument["figi"]
58
59
file_path = get_file_path_by_instrument(instrument)
60
self.df_by_instrument[figi] = pd.read_csv(file_path, sep=",")
61
self.df_by_instrument[figi]["time"] = pd.to_datetime(self.df_by_instrument[figi]["time"], utc=True)
62
63
history_df = await self.get_history_trades(client, instrument)
64
self.df_by_instrument[figi] = merge_two_frames(self.df_by_instrument[figi], history_df)
65
except Exception as ex:
66
logger.error(ex)
67
self.is_history_processed = False
68
69
# загрузка последних доступных обезличенных сделок
70
async def get_history_trades(self, client, instrument):
71
history_df = create_empty_df()
72
figi = instrument["figi"]
73
current_date = now()
74
time = 0
75
76
while True:
77
try:
78
interval_from = current_date - timedelta(minutes=time + ONE_HOUR_TO_MINUTES)
79
interval_to = current_date - timedelta(minutes=time)
80
81
logger.info("instrument: %s, from: %s", instrument, interval_from)
82
logger.info("instrument: %s, to: %s", instrument, interval_to)
83
84
response = await client.market_data.get_last_trades(
85
figi=figi,
86
from_=interval_from,
87
to=interval_to,
88
)
89
logger.info("instrument: %s, size: %s", instrument, len(response.trades))
90
if response is None or len(response.trades) == 0:
91
break
92
93
for trade in response.trades:
94
processed_trade_df = processed_data(trade)
95
if processed_trade_df is not None:
96
history_df = pd.concat([history_df, processed_trade_df])
97
history_df = history_df.sort_values("time")
98
time += ONE_HOUR_TO_MINUTES
99
except Exception as ex:
100
logger.error(ex)
101
break
102
103
return history_df
104
105
# основной метод для обработки входящих данных
106
async def trades_stream(self, client):
107
temp_df = {}
108
for instrument in INSTRUMENTS:
109
temp_df[instrument["figi"]] = create_empty_df()
110
111
try:
112
async for marketdata in client.market_data_stream.market_data_stream(
113
request_iterator(INSTRUMENTS)
114
):
115
if not is_open_exchange():
116
logger.info("торговый день завершен, сохранение статистики")
117
self.order_service.write_statistics()
118
# todo добавить выход из приложения
119
120
logger.info(marketdata)
121
if marketdata is None:
122
continue
123
trade = marketdata.trade
124
if trade is None:
125
continue
126
127
figi = trade.figi
128
instrument = next(item for item in INSTRUMENTS if item["figi"] == figi)
129
130
processed_trade_df = processed_data(trade)
131
if processed_trade_df is not None:
132
# проверка позиций на закрытие по тейку/стопу
133
processed_trade = processed_trade_df.iloc[0]
134
price = processed_trade["price"]
135
time = processed_trade["time"]
136
self.order_service.processed_orders(instrument["name"], price, time)
137
138
if self.is_history_processed is True:
139
# пока происходит обработка истории - новые данные складываю во временную переменную
140
next_df = [temp_df[figi], processed_trade_df]
141
temp_df[figi] = pd.concat(next_df, ignore_index=True)
142
else:
143
# есть проблема, когда исторические данные загрузились, но в real-time они не приходят
144
# тогда исторические данные не окажутся в файле
145
if len(temp_df[figi]) > 0:
146
# если после обработки истории успели накопить real-time данные,
147
# то подмерживаю их и очищаю временную переменную
148
next_df = [self.df_by_instrument[figi], temp_df[figi], processed_trade_df]
149
self.df_by_instrument[figi] = pd.concat(next_df, ignore_index=True)
150
temp_df[figi].drop(temp_df[figi].index, inplace=True)
151
152
# отправляю обезличенные сделки на анализ
153
self.strategy[figi].set_df(self.df_by_instrument[figi])
154
155
file_path = get_file_path_by_instrument(instrument)
156
self.df_by_instrument[figi].to_csv(file_path, mode="w", header=True, index=False)
157
else:
158
# отправляю обезличенную сделку на анализ
159
# (алгоритм анализа можно заменить на любой)
160
# если ТВ подтвердится, то возвращается структура сделки
161
orders = self.strategy[figi].analyze(processed_trade_df)
162
if orders is not None:
163
for order in orders:
164
self.order_service.create_order(order)
165
166
next_df = [self.df_by_instrument[figi], processed_trade_df]
167
self.df_by_instrument[figi] = pd.concat(next_df, ignore_index=True)
168
169
file_path = get_file_path_by_instrument(instrument)
170
processed_trade_df.to_csv(file_path, mode="a", header=False, index=False)
171
except Exception as ex:
172
logger.error(ex)
173
174
async def main(self):
175
UserService().show_settings()
176
177
async with AsyncClient(TOKEN) as client:
178
tasks = [asyncio.ensure_future(self.trades_stream(client)),
179
asyncio.ensure_future(self.sync_df(client))]
180
await asyncio.wait(tasks)
181
182
183
if __name__ == "__main__":
184
try:
185
robot = TradingRobot()
186
asyncio.run(robot.main())
187
except Exception as ex:
188
logger.error(ex)
189
190