Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_invest-bot-main/trading/trader.py
5931 views
1
import datetime
2
import logging
3
from decimal import Decimal
4
5
from tinkoff.invest import Candle
6
from tinkoff.invest.utils import quotation_to_decimal
7
8
from blog.blogger import Blogger
9
from invest_api.services.client_service import ClientService
10
from invest_api.services.instruments_service import InstrumentService
11
from invest_api.services.market_data_service import MarketDataService
12
from invest_api.services.operations_service import OperationService
13
from invest_api.services.orders_service import OrderService
14
from invest_api.services.market_data_stream_service import MarketDataStreamService
15
from invest_api.utils import candle_to_historiccandle
16
from trade_system.signal import SignalType
17
from trade_system.strategies.base_strategy import IStrategy
18
from trading.trade_results import TradeResults
19
from configuration.settings import TradingSettings
20
21
__all__ = ("Trader")
22
23
logger = logging.getLogger(__name__)
24
25
26
class Trader:
27
"""
28
The class encapsulate main trade logic.
29
"""
30
31
def __init__(
32
self,
33
client_service: ClientService,
34
instrument_service: InstrumentService,
35
operation_service: OperationService,
36
order_service: OrderService,
37
stream_service: MarketDataStreamService,
38
market_data_service: MarketDataService,
39
blogger: Blogger
40
) -> None:
41
self.__today_trade_results: TradeResults = None
42
self.__client_service = client_service
43
self.__instrument_service = instrument_service
44
self.__operation_service = operation_service
45
self.__order_service = order_service
46
self.__stream_service = stream_service
47
self.__market_data_service = market_data_service
48
self.__blogger = blogger
49
50
async def trade_day(
51
self,
52
account_id: str,
53
trading_settings: TradingSettings,
54
strategies: list[IStrategy],
55
trade_day_end_time: datetime,
56
min_rub: int
57
) -> None:
58
logger.info("Start preparations for trading today")
59
today_trade_strategies = self.__get_today_strategies(strategies)
60
if not today_trade_strategies:
61
logger.info("No shares to trade today.")
62
return None
63
64
self.__clear_all_positions(account_id, today_trade_strategies)
65
66
rub_before_trade_day = self.__operation_service.available_rub_on_account(account_id)
67
logger.info(f"Amount of RUB on account {rub_before_trade_day} and minimum for trading: {min_rub}")
68
if rub_before_trade_day < min_rub:
69
return None
70
71
logger.info("Start trading today")
72
self.__blogger.start_trading_message(today_trade_strategies, rub_before_trade_day)
73
74
try:
75
await self.__trading(
76
account_id,
77
trading_settings,
78
today_trade_strategies,
79
trade_day_end_time
80
)
81
logger.debug("Test Results:")
82
logger.debug(f"Current: {self.__today_trade_results.get_current_open_orders()}")
83
logger.debug(f"Old: {self.__today_trade_results.get_closed_orders()}")
84
except Exception as ex:
85
logger.error(f"Trading error: {repr(ex)}")
86
87
logger.info("Finishing trading today")
88
self.__blogger.finish_trading_message()
89
90
try:
91
if self.__today_trade_results:
92
for key_figi, value_order_id in self.__clear_all_positions(account_id, today_trade_strategies).items():
93
trade_order = self.__today_trade_results.close_position(key_figi, value_order_id)
94
self.__blogger.close_position_message(trade_order)
95
else:
96
self.__clear_all_positions(account_id, today_trade_strategies)
97
except Exception as ex:
98
logger.error(f"Finishing trading error: {repr(ex)}")
99
100
logger.info("Show trade results today")
101
try:
102
self.__summary_today_trade_results(account_id, rub_before_trade_day)
103
except Exception as ex:
104
logger.error(f"Summary trading day error: {repr(ex)}")
105
106
async def __trading(
107
self,
108
account_id: str,
109
trading_settings: TradingSettings,
110
strategies: dict[str, IStrategy],
111
trade_day_end_time: datetime
112
) -> None:
113
logger.info(f"Subscribe and read Candles for {strategies.keys()}")
114
115
# End trading before close trade session
116
trade_before_time: datetime = \
117
trade_day_end_time - datetime.timedelta(seconds=trading_settings.stop_trade_before_close)
118
119
signals_before_time: datetime = \
120
trade_day_end_time - datetime.timedelta(minutes=trading_settings.stop_signals_before_close)
121
logger.debug(f"Stop time: signals - {signals_before_time}, trading - {trade_before_time}")
122
123
current_candles: dict[str, Candle] = dict()
124
self.__today_trade_results = TradeResults()
125
126
async for candle in self.__stream_service.start_async_candles_stream(
127
list(strategies.keys()),
128
trade_before_time
129
):
130
current_figi_candle = current_candles.setdefault(candle.figi, candle)
131
if candle.time < current_figi_candle.time:
132
# it can be based on API documentation
133
logger.debug("Skip candle from past.")
134
continue
135
136
# check price from candle for take or stop price levels
137
current_trade_order = self.__today_trade_results.get_current_trade_order(candle.figi)
138
if current_trade_order:
139
high, low = quotation_to_decimal(candle.high), quotation_to_decimal(candle.low)
140
141
# Logic is:
142
# if stop or take price level is between high and low, then stop or take will be executed
143
if low <= current_trade_order.signal.stop_loss_level <= high:
144
logger.info(f"STOP LOSS: {current_trade_order}")
145
close_order_id = \
146
self.__close_position_by_figi(account_id, [candle.figi], strategies).get(candle.figi, None)
147
if close_order_id:
148
trade_order = self.__today_trade_results.close_position(candle.figi, close_order_id)
149
self.__blogger.close_position_message(trade_order)
150
151
elif low <= current_trade_order.signal.take_profit_level <= high:
152
logger.info(f"TAKE PROFIT: {current_trade_order}")
153
close_order_id = \
154
self.__close_position_by_figi(account_id, [candle.figi], strategies).get(candle.figi, None)
155
if close_order_id:
156
trade_order = self.__today_trade_results.close_position(candle.figi, close_order_id)
157
self.__blogger.close_position_message(trade_order)
158
159
if candle.time > current_figi_candle.time and \
160
datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) <= signals_before_time:
161
signal_new = strategies[candle.figi].analyze_candles(
162
[candle_to_historiccandle(current_figi_candle)]
163
)
164
165
if signal_new:
166
logger.info(f"New signal: {signal_new}")
167
168
if self.__today_trade_results.get_current_trade_order(candle.figi):
169
logger.info(f"New signal has been skipped. Previous signal is still alive.")
170
elif not self.__market_data_service.is_stock_ready_for_trading(candle.figi):
171
logger.info(f"New signal has been skipped. Stock isn't ready for trading")
172
else:
173
available_lots = self.__open_position_lots_count(
174
account_id,
175
strategies[candle.figi].settings.max_lots_per_order,
176
quotation_to_decimal(candle.close),
177
strategies[candle.figi].settings.lot_size
178
)
179
180
logger.debug(f"Available lots: {available_lots}")
181
if available_lots:
182
open_order_id = self.__order_service.post_market_order(
183
account_id=account_id,
184
figi=candle.figi,
185
count_lots=available_lots,
186
is_buy=(signal_new.signal_type == SignalType.LONG)
187
)
188
open_position = self.__today_trade_results.open_position(
189
candle.figi,
190
open_order_id,
191
signal_new
192
)
193
self.__blogger.open_position_message(open_position)
194
logger.info(f"Open position: {open_position}")
195
else:
196
logger.info(f"New signal has been skipped. No available money")
197
198
current_candles[candle.figi] = candle
199
200
logger.info("Today trading has been completed")
201
202
def __summary_today_trade_results(
203
self,
204
account_id: str,
205
rub_before_trade_day: Decimal
206
) -> None:
207
logger.info("Today trading summary:")
208
self.__blogger.summary_message()
209
210
current_rub_on_depo = self.__operation_service.available_rub_on_account(account_id)
211
logger.info(f"RUBs on account before:{rub_before_trade_day}, after:{current_rub_on_depo}")
212
213
today_profit = current_rub_on_depo - rub_before_trade_day
214
today_percent_profit = (today_profit / rub_before_trade_day) * 100
215
logger.info(f"Today Profit:{today_profit} rub ({today_percent_profit} %)")
216
self.__blogger.trading_depo_summary_message(rub_before_trade_day, current_rub_on_depo)
217
218
if self.__today_trade_results:
219
logger.info(f"Today Open Signals:")
220
for figi_key, trade_order_value in self.__today_trade_results.get_current_open_orders().items():
221
logger.info(f"Stock: {figi_key}")
222
223
open_order_state = self.__order_service.get_order_state(account_id, trade_order_value.open_order_id)
224
logger.info(f"Signal {trade_order_value.signal}")
225
logger.info(f"Open: {open_order_state}")
226
self.__blogger.summary_open_signal_message(trade_order_value, open_order_state)
227
228
logger.info(f"All open positions should be closed manually.")
229
230
logger.info(f"Today Closed Signals:")
231
for figi_key, trade_orders_value in self.__today_trade_results.get_closed_orders().items():
232
logger.info(f"Stock: {figi_key}")
233
for trade_order in trade_orders_value:
234
open_order_state = self.__order_service.get_order_state(account_id, trade_order.open_order_id)
235
close_order_state = self.__order_service.get_order_state(account_id, trade_order.close_order_id)
236
logger.info(f"Signal {trade_order.signal}")
237
logger.info(f"Open: {open_order_state}")
238
logger.info(f"Close: {close_order_state}")
239
self.__blogger.summary_closed_signal_message(trade_order, open_order_state, close_order_state)
240
else:
241
logger.info(f"Something went wrong: today trade results is empty")
242
logger.info(f"All open positions should be closed manually.")
243
self.__blogger.fail_message()
244
245
self.__blogger.final_message()
246
247
def __open_position_lots_count(
248
self,
249
account_id: str,
250
max_lots_per_order: int,
251
price: Decimal,
252
share_lot_size: int
253
) -> int:
254
"""
255
Calculate counts of lots for order
256
"""
257
current_rub_on_depo = self.__operation_service.available_rub_on_account(account_id)
258
259
available_lots = int(current_rub_on_depo / (share_lot_size * price))
260
261
return available_lots if max_lots_per_order > available_lots else max_lots_per_order
262
263
def __clear_all_positions(
264
self,
265
account_id: str,
266
strategies: dict[str, IStrategy]
267
) -> dict[str, str]:
268
logger.info("Clear all orders and close all open positions")
269
270
logger.debug("Cancel all order.")
271
self.__client_service.cancel_all_orders(account_id)
272
273
logger.debug("Close all positions.")
274
return self.__close_position_by_figi(account_id, strategies.keys(), strategies)
275
276
def __close_position_by_figi(
277
self,
278
account_id: str,
279
figies: list[str],
280
strategies: dict[str, IStrategy]
281
) -> dict[str, str]:
282
result: dict[str, str] = dict()
283
current_positions = self.__operation_service.positions_securities(account_id)
284
285
if current_positions:
286
logger.info(f"Current positions: {current_positions}")
287
for position in current_positions:
288
if position.figi in figies:
289
# Check a stock
290
if self.__market_data_service.is_stock_ready_for_trading(position.figi):
291
result[position.figi] = self.__order_service.post_market_order(
292
account_id=account_id,
293
figi=position.figi,
294
count_lots=abs(int(position.balance / strategies[position.figi].settings.lot_size)),
295
is_buy=(position.balance < 0)
296
)
297
298
return result
299
300
def __get_today_strategies(self, strategies: list[IStrategy]) -> dict[str, IStrategy]:
301
"""
302
Check and Select stocks for trading today.
303
"""
304
logger.info("Check shares and strategy settings")
305
today_trade_strategy: dict[str, IStrategy] = dict()
306
307
for strategy in strategies:
308
share_settings = self.__instrument_service.share_by_figi(strategy.settings.figi)
309
logger.debug(f"Check share settings for figi {strategy.settings.figi}: {share_settings}")
310
311
if (not share_settings.otc_flag) \
312
and share_settings.buy_available_flag \
313
and share_settings.sell_available_flag \
314
and share_settings.api_trade_available_flag:
315
logger.debug(f"Share is ready for trading")
316
317
# refresh information by latest info
318
strategy.update_lot_count(share_settings.lot)
319
strategy.update_short_status(share_settings.short_enabled_flag)
320
321
today_trade_strategy[strategy.settings.figi] = strategy
322
323
return today_trade_strategy
324
325