Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_investRobot-master/robotlib/robot.py
5929 views
1
import datetime
2
import logging
3
import sys
4
import uuid
5
6
from dataclasses import dataclass
7
8
from tinkoff.invest import (
9
AccessLevel,
10
AccountStatus,
11
AccountType,
12
Candle,
13
CandleInstrument,
14
CandleInterval,
15
Client,
16
InfoInstrument,
17
Instrument,
18
InstrumentIdType,
19
MarketDataResponse,
20
MoneyValue,
21
OrderBookInstrument,
22
OrderDirection,
23
OrderExecutionReportStatus,
24
OrderState,
25
PostOrderResponse,
26
Quotation,
27
TradeInstrument,
28
)
29
from tinkoff.invest.exceptions import InvestError
30
from tinkoff.invest.services import MarketDataStreamManager, Services
31
32
from robotlib.strategy import TradeStrategyBase, TradeStrategyParams, RobotTradeOrder
33
from robotlib.stats import TradeStatisticsAnalyzer
34
from robotlib.money import Money
35
36
37
@dataclass
38
class OrderExecutionInfo:
39
direction: OrderDirection
40
lots: int = 0
41
amount: float = 0.0
42
43
44
class TradingRobot: # pylint:disable=too-many-instance-attributes
45
APP_NAME: str = 'karpp'
46
47
token: str
48
account_id: str
49
trade_strategy: TradeStrategyBase
50
trade_statistics: TradeStatisticsAnalyzer
51
orders_executed: dict[str, OrderExecutionInfo] # order_id -> executed lots
52
logger: logging.Logger
53
instrument_info: Instrument
54
sandbox_mode: bool
55
56
def __init__(self, token: str, account_id: str, sandbox_mode: bool, trade_strategy: TradeStrategyBase,
57
trade_statistics: TradeStatisticsAnalyzer, instrument_info: Instrument, logger: logging.Logger):
58
self.token = token
59
self.account_id = account_id
60
self.trade_strategy = trade_strategy
61
self.trade_statistics = trade_statistics
62
self.orders_executed = {}
63
self.logger = logger
64
self.instrument_info = instrument_info
65
self.sandbox_mode = sandbox_mode
66
67
def trade(self) -> TradeStatisticsAnalyzer:
68
self.logger.info('Starting trading')
69
70
self.trade_strategy.load_candles(
71
list(self._load_historic_data(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1))))
72
73
with Client(self.token, app_name=self.APP_NAME) as client:
74
trading_status = client.market_data.get_trading_status(figi=self.instrument_info.figi)
75
if not trading_status.market_order_available_flag:
76
self.logger.warning('Market trading is not available now.')
77
78
market_data_stream: MarketDataStreamManager = client.create_market_data_stream()
79
if self.trade_strategy.candle_subscription_interval:
80
market_data_stream.candles.subscribe([
81
CandleInstrument(
82
figi=self.instrument_info.figi,
83
interval=self.trade_strategy.candle_subscription_interval)
84
])
85
if self.trade_strategy.order_book_subscription_depth:
86
market_data_stream.order_book.subscribe([
87
OrderBookInstrument(
88
figi=self.instrument_info.figi,
89
depth=self.trade_strategy.order_book_subscription_depth)
90
])
91
if self.trade_strategy.trades_subscription:
92
market_data_stream.trades.subscribe([
93
TradeInstrument(figi=self.instrument_info.figi)
94
])
95
market_data_stream.info.subscribe([
96
InfoInstrument(figi=self.instrument_info.figi)
97
])
98
self.logger.debug(f'Subscribed to MarketDataStream, '
99
f'interval: {self.trade_strategy.candle_subscription_interval}')
100
try:
101
for market_data in market_data_stream:
102
self.logger.debug(f'Received market_data {market_data}')
103
if market_data.candle:
104
self._on_update(client, market_data)
105
if market_data.trading_status and market_data.trading_status.market_order_available_flag:
106
self.logger.info(f'Trading is limited. Current status: {market_data.trading_status}')
107
break
108
except InvestError as error:
109
self.logger.info(f'Caught exception {error}, stopping trading')
110
market_data_stream.stop()
111
return self.trade_statistics
112
113
def backtest(self, initial_params: TradeStrategyParams, test_duration: datetime.timedelta,
114
train_duration: datetime.timedelta = None) -> TradeStatisticsAnalyzer:
115
116
trade_statistics = TradeStatisticsAnalyzer(
117
positions=initial_params.instrument_balance,
118
money=initial_params.currency_balance,
119
instrument_info=self.instrument_info,
120
logger=self.logger
121
)
122
123
now = datetime.datetime.now(datetime.timezone.utc)
124
if train_duration:
125
train = self._load_historic_data(now - test_duration - train_duration, now - test_duration)
126
self.trade_strategy.load_candles(list(train))
127
test = self._load_historic_data(now - test_duration)
128
129
params = initial_params
130
for candle in test:
131
price = self.convert_from_quotation(candle.close)
132
robot_decision = self.trade_strategy.decide_by_candle(candle, params)
133
134
trade_order = robot_decision.robot_trade_order
135
if trade_order:
136
assert trade_order.quantity > 0
137
if trade_order.direction == OrderDirection.ORDER_DIRECTION_SELL:
138
assert trade_order.quantity >= params.instrument_balance, \
139
f'Cannot execute order {trade_order}. Params are {params}' # TODO: better logging
140
params.instrument_balance -= trade_order.quantity
141
params.currency_balance += trade_order.quantity * price * self.instrument_info.lot
142
else:
143
assert trade_order.quantity * self.instrument_info.lot * price <= params.currency_balance, \
144
f'Cannot execute order {trade_order}. Params are {params}' # TODO: better logging
145
params.instrument_balance += trade_order.quantity
146
params.currency_balance -= trade_order.quantity * price * self.instrument_info.lot
147
148
trade_statistics.add_backtest_trade(
149
quantity=trade_order.quantity, price=candle.close, direction=trade_order.direction)
150
151
return trade_statistics
152
153
@staticmethod
154
def convert_from_quotation(amount: Quotation | MoneyValue) -> float | None:
155
if amount is None:
156
return None
157
return amount.units + amount.nano / (10 ** 9)
158
159
def _on_update(self, client: Services, market_data: MarketDataResponse):
160
self._check_trade_orders(client)
161
params = TradeStrategyParams(instrument_balance=self.trade_statistics.get_positions(),
162
currency_balance=self.trade_statistics.get_money(),
163
pending_orders=self.trade_statistics.get_pending_orders())
164
165
self.logger.debug(f'Received market_data {market_data}. Running strategy with params {params}')
166
strategy_decision = self.trade_strategy.decide(market_data, params)
167
self.logger.debug(f'Strategy decision: {strategy_decision}')
168
169
if len(strategy_decision.cancel_orders) > 0:
170
self._cancel_orders(client=client, orders=strategy_decision.cancel_orders)
171
172
trade_order = strategy_decision.robot_trade_order
173
if trade_order and self._validate_strategy_order(order=trade_order, candle=market_data.candle):
174
self._post_trade_order(client=client, trade_order=trade_order)
175
176
def _validate_strategy_order(self, order: RobotTradeOrder, candle: Candle):
177
if order.direction == OrderDirection.ORDER_DIRECTION_BUY:
178
price = order.price or Money(candle.close)
179
total_cost = price * self.instrument_info.lot * order.quantity
180
balance = self.trade_statistics.get_money()
181
if total_cost.to_float() > self.trade_statistics.get_money():
182
self.logger.warning(f'Strategy decision cannot be executed. '
183
f'Requested buy cost: {total_cost}, balance: {balance}')
184
return False
185
else:
186
instrument_balance = self.trade_statistics.get_positions()
187
if order.quantity > instrument_balance:
188
self.logger.warning(f'Strategy decision cannot be executed. '
189
f'Requested sell quantity: {order.quantity}, balance: {instrument_balance}')
190
return False
191
return True
192
193
def _load_historic_data(self, from_time: datetime.datetime, to_time: datetime.datetime = None):
194
try:
195
with Client(self.token, app_name=self.APP_NAME) as client:
196
yield from client.get_all_candles(
197
from_=from_time,
198
to=to_time,
199
interval=CandleInterval.CANDLE_INTERVAL_1_MIN,
200
figi=self.instrument_info.figi,
201
)
202
except InvestError as error:
203
self.logger.error(f'Failed to load historical data. Error: {error}')
204
205
def _cancel_orders(self, client: Services, orders: list[OrderState]):
206
for order in orders:
207
try:
208
client.orders.cancel_order(account_id=self.account_id, order_id=order.order_id)
209
self.trade_statistics.cancel_order(order_id=order.order_id)
210
except InvestError as error:
211
self.logger.error(f'Failed to cancel order {order.order_id}. Error: {error}')
212
213
def _post_trade_order(self, client: Services, trade_order: RobotTradeOrder) -> PostOrderResponse | None:
214
try:
215
if self.sandbox_mode:
216
order = client.sandbox.post_sandbox_order(
217
figi=self.instrument_info.figi,
218
quantity=trade_order.quantity,
219
price=trade_order.price.to_quotation() if trade_order.price is not None else None,
220
direction=trade_order.direction,
221
account_id=self.account_id,
222
order_type=trade_order.order_type,
223
order_id=str(uuid.uuid4())
224
)
225
else:
226
order = client.orders.post_order(
227
figi=self.instrument_info.figi,
228
quantity=trade_order.quantity,
229
price=trade_order.price.to_quotation() if trade_order.price is not None else None,
230
direction=trade_order.direction,
231
account_id=self.account_id,
232
order_type=trade_order.order_type,
233
order_id=str(uuid.uuid4())
234
)
235
except InvestError as error:
236
self.logger.error(f'Posting trade order failed :(. Order: {trade_order}; Exception: {error}')
237
return
238
self.logger.info(f'Placed trade order {order}')
239
self.orders_executed[order.order_id] = OrderExecutionInfo(direction=trade_order.direction)
240
self.trade_statistics.add_trade(order)
241
return order
242
243
def _check_trade_orders(self, client: Services):
244
self.logger.debug(f'Updating trade orders info. Current trade orders num: {len(self.orders_executed)}')
245
orders_executed = list(self.orders_executed.items())
246
for order_id, execution_info in orders_executed:
247
if self.sandbox_mode:
248
order_state = client.sandbox.get_sandbox_order_state(
249
account_id=self.account_id, order_id=order_id
250
)
251
else:
252
order_state = client.orders.get_order_state(
253
account_id=self.account_id, order_id=order_id
254
)
255
256
self.trade_statistics.add_trade(trade=order_state)
257
match order_state.execution_report_status:
258
case OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_FILL:
259
self.logger.info(f'Trade order {order_id} has been FULLY FILLED')
260
self.orders_executed.pop(order_id)
261
case OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_REJECTED:
262
self.logger.warning(f'Trade order {order_id} has been REJECTED')
263
self.orders_executed.pop(order_id)
264
case OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_CANCELLED:
265
self.logger.warning(f'Trade order {order_id} has been CANCELLED')
266
self.orders_executed.pop(order_id)
267
case OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_PARTIALLYFILL:
268
self.logger.info(f'Trade order {order_id} has been PARTIALLY FILLED')
269
self.orders_executed[order_id] = OrderExecutionInfo(lots=order_state.lots_executed,
270
amount=order_state.total_order_amount,
271
direction=order_state.direction)
272
case _:
273
self.logger.debug(f'No updates on order {order_id}')
274
275
self.logger.debug(f'Successfully updated trade orders. New trade orders num: {len(self.orders_executed)}')
276
277
278
class TradingRobotFactory:
279
APP_NAME = 'karpp'
280
instrument_info: Instrument
281
token: str
282
account_id: str
283
logger: logging.Logger
284
sandbox_mode: bool
285
286
def __init__(self, token: str, account_id: str, figi: str = None, # pylint:disable=too-many-arguments
287
ticker: str = None, class_code: str = None, logger_level: int | str = 'INFO'):
288
self.instrument_info = self._get_instrument_info(token, figi, ticker, class_code).instrument
289
self.token = token
290
self.account_id = account_id
291
self.logger = self.setup_logger(logger_level)
292
self.sandbox_mode = self._validate_account(token, account_id, self.logger)
293
294
def setup_logger(self, logger_level: int | str):
295
logger = logging.getLogger(f'robot.{self.instrument_info.ticker}')
296
logger.setLevel(logger_level)
297
formatter = logging.Formatter(fmt=('%(asctime)s %(levelname)s: %(message)s')) # todo: fixit
298
handler = logging.StreamHandler(stream=sys.stderr)
299
handler.setFormatter(formatter)
300
logger.addHandler(handler)
301
return logger
302
303
def create_robot(self, trade_strategy: TradeStrategyBase, sandbox_mode: bool = True) -> TradingRobot:
304
money, positions = self._get_current_postitions()
305
trade_strategy.load_instrument_info(self.instrument_info)
306
stats = TradeStatisticsAnalyzer(
307
positions=positions,
308
money=money.to_float(), # todo: change to Money
309
instrument_info=self.instrument_info,
310
logger=self.logger.getChild(trade_strategy.strategy_id).getChild('stats')
311
)
312
return TradingRobot(token=self.token, account_id=self.account_id, sandbox_mode=sandbox_mode,
313
trade_strategy=trade_strategy, trade_statistics=stats, instrument_info=self.instrument_info,
314
logger=self.logger.getChild(trade_strategy.strategy_id))
315
316
def _get_current_postitions(self) -> tuple[Money, int]:
317
# amount of money and instrument balance
318
with Client(self.token, app_name=self.APP_NAME) as client:
319
positions = client.operations.get_positions(account_id=self.account_id)
320
321
instruments = [sec for sec in positions.securities if sec.figi == self.instrument_info.figi]
322
if len(instruments) > 0:
323
instrument = instruments[0].balance
324
else:
325
instrument = 0
326
327
moneys = [m for m in positions.money if m.currency == self.instrument_info.currency]
328
if len(moneys) > 0:
329
money = Money(moneys[0].units, moneys[0].nano)
330
else:
331
money = Money(0, 0)
332
333
return money, instrument
334
335
@staticmethod
336
def _validate_account(token: str, account_id: str, logger: logging.Logger) -> bool:
337
try:
338
with Client(token, app_name=TradingRobotFactory.APP_NAME) as client:
339
accounts = [acc for acc in client.users.get_accounts().accounts if acc.id == account_id]
340
sandbox_mode = False
341
if len(accounts) == 0:
342
sandbox_mode = True
343
accounts = [acc for acc in client.sandbox.get_sandbox_accounts().accounts if acc.id == account_id]
344
if len(accounts) == 0:
345
logger.error(f'Account {account_id} not found.')
346
raise ValueError('Account not found')
347
348
account = accounts[0]
349
if account.type not in [AccountType.ACCOUNT_TYPE_TINKOFF, AccountType.ACCOUNT_TYPE_INVEST_BOX]:
350
logger.error(f'Account type {account.type} is not supported')
351
raise ValueError('Unsupported account type')
352
if account.status != AccountStatus.ACCOUNT_STATUS_OPEN:
353
logger.error(f'Account status {account.status} is not supported')
354
raise ValueError('Unsupported account status')
355
if account.access_level != AccessLevel.ACCOUNT_ACCESS_LEVEL_FULL_ACCESS:
356
logger.error(f'No access to account. Current level is {account.access_level}')
357
raise ValueError('Insufficient access level')
358
359
return sandbox_mode
360
361
except InvestError as error:
362
logger.error(f'Failed to validate account. Exception: {error}')
363
raise error
364
365
@staticmethod
366
def _get_instrument_info(token: str, figi: str = None, ticker: str = None, class_code: str = None):
367
with Client(token, app_name=TradingRobotFactory.APP_NAME) as client:
368
if figi is None:
369
if ticker is None or class_code is None:
370
raise ValueError('figi or both ticker and class_code must be not None')
371
return client.instruments.get_instrument_by(id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_TICKER,
372
class_code=class_code, id=ticker)
373
return client.instruments.get_instrument_by(id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_FIGI, id=figi)
374
375