Path: blob/master/ invest-robot-contest_investRobot-master/robotlib/robot.py
5929 views
import datetime1import logging2import sys3import uuid45from dataclasses import dataclass67from tinkoff.invest import (8AccessLevel,9AccountStatus,10AccountType,11Candle,12CandleInstrument,13CandleInterval,14Client,15InfoInstrument,16Instrument,17InstrumentIdType,18MarketDataResponse,19MoneyValue,20OrderBookInstrument,21OrderDirection,22OrderExecutionReportStatus,23OrderState,24PostOrderResponse,25Quotation,26TradeInstrument,27)28from tinkoff.invest.exceptions import InvestError29from tinkoff.invest.services import MarketDataStreamManager, Services3031from robotlib.strategy import TradeStrategyBase, TradeStrategyParams, RobotTradeOrder32from robotlib.stats import TradeStatisticsAnalyzer33from robotlib.money import Money343536@dataclass37class OrderExecutionInfo:38direction: OrderDirection39lots: int = 040amount: float = 0.0414243class TradingRobot: # pylint:disable=too-many-instance-attributes44APP_NAME: str = 'karpp'4546token: str47account_id: str48trade_strategy: TradeStrategyBase49trade_statistics: TradeStatisticsAnalyzer50orders_executed: dict[str, OrderExecutionInfo] # order_id -> executed lots51logger: logging.Logger52instrument_info: Instrument53sandbox_mode: bool5455def __init__(self, token: str, account_id: str, sandbox_mode: bool, trade_strategy: TradeStrategyBase,56trade_statistics: TradeStatisticsAnalyzer, instrument_info: Instrument, logger: logging.Logger):57self.token = token58self.account_id = account_id59self.trade_strategy = trade_strategy60self.trade_statistics = trade_statistics61self.orders_executed = {}62self.logger = logger63self.instrument_info = instrument_info64self.sandbox_mode = sandbox_mode6566def trade(self) -> TradeStatisticsAnalyzer:67self.logger.info('Starting trading')6869self.trade_strategy.load_candles(70list(self._load_historic_data(datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1))))7172with Client(self.token, app_name=self.APP_NAME) as client:73trading_status = client.market_data.get_trading_status(figi=self.instrument_info.figi)74if not trading_status.market_order_available_flag:75self.logger.warning('Market trading is not available now.')7677market_data_stream: MarketDataStreamManager = client.create_market_data_stream()78if self.trade_strategy.candle_subscription_interval:79market_data_stream.candles.subscribe([80CandleInstrument(81figi=self.instrument_info.figi,82interval=self.trade_strategy.candle_subscription_interval)83])84if self.trade_strategy.order_book_subscription_depth:85market_data_stream.order_book.subscribe([86OrderBookInstrument(87figi=self.instrument_info.figi,88depth=self.trade_strategy.order_book_subscription_depth)89])90if self.trade_strategy.trades_subscription:91market_data_stream.trades.subscribe([92TradeInstrument(figi=self.instrument_info.figi)93])94market_data_stream.info.subscribe([95InfoInstrument(figi=self.instrument_info.figi)96])97self.logger.debug(f'Subscribed to MarketDataStream, '98f'interval: {self.trade_strategy.candle_subscription_interval}')99try:100for market_data in market_data_stream:101self.logger.debug(f'Received market_data {market_data}')102if market_data.candle:103self._on_update(client, market_data)104if market_data.trading_status and market_data.trading_status.market_order_available_flag:105self.logger.info(f'Trading is limited. Current status: {market_data.trading_status}')106break107except InvestError as error:108self.logger.info(f'Caught exception {error}, stopping trading')109market_data_stream.stop()110return self.trade_statistics111112def backtest(self, initial_params: TradeStrategyParams, test_duration: datetime.timedelta,113train_duration: datetime.timedelta = None) -> TradeStatisticsAnalyzer:114115trade_statistics = TradeStatisticsAnalyzer(116positions=initial_params.instrument_balance,117money=initial_params.currency_balance,118instrument_info=self.instrument_info,119logger=self.logger120)121122now = datetime.datetime.now(datetime.timezone.utc)123if train_duration:124train = self._load_historic_data(now - test_duration - train_duration, now - test_duration)125self.trade_strategy.load_candles(list(train))126test = self._load_historic_data(now - test_duration)127128params = initial_params129for candle in test:130price = self.convert_from_quotation(candle.close)131robot_decision = self.trade_strategy.decide_by_candle(candle, params)132133trade_order = robot_decision.robot_trade_order134if trade_order:135assert trade_order.quantity > 0136if trade_order.direction == OrderDirection.ORDER_DIRECTION_SELL:137assert trade_order.quantity >= params.instrument_balance, \138f'Cannot execute order {trade_order}. Params are {params}' # TODO: better logging139params.instrument_balance -= trade_order.quantity140params.currency_balance += trade_order.quantity * price * self.instrument_info.lot141else:142assert trade_order.quantity * self.instrument_info.lot * price <= params.currency_balance, \143f'Cannot execute order {trade_order}. Params are {params}' # TODO: better logging144params.instrument_balance += trade_order.quantity145params.currency_balance -= trade_order.quantity * price * self.instrument_info.lot146147trade_statistics.add_backtest_trade(148quantity=trade_order.quantity, price=candle.close, direction=trade_order.direction)149150return trade_statistics151152@staticmethod153def convert_from_quotation(amount: Quotation | MoneyValue) -> float | None:154if amount is None:155return None156return amount.units + amount.nano / (10 ** 9)157158def _on_update(self, client: Services, market_data: MarketDataResponse):159self._check_trade_orders(client)160params = TradeStrategyParams(instrument_balance=self.trade_statistics.get_positions(),161currency_balance=self.trade_statistics.get_money(),162pending_orders=self.trade_statistics.get_pending_orders())163164self.logger.debug(f'Received market_data {market_data}. Running strategy with params {params}')165strategy_decision = self.trade_strategy.decide(market_data, params)166self.logger.debug(f'Strategy decision: {strategy_decision}')167168if len(strategy_decision.cancel_orders) > 0:169self._cancel_orders(client=client, orders=strategy_decision.cancel_orders)170171trade_order = strategy_decision.robot_trade_order172if trade_order and self._validate_strategy_order(order=trade_order, candle=market_data.candle):173self._post_trade_order(client=client, trade_order=trade_order)174175def _validate_strategy_order(self, order: RobotTradeOrder, candle: Candle):176if order.direction == OrderDirection.ORDER_DIRECTION_BUY:177price = order.price or Money(candle.close)178total_cost = price * self.instrument_info.lot * order.quantity179balance = self.trade_statistics.get_money()180if total_cost.to_float() > self.trade_statistics.get_money():181self.logger.warning(f'Strategy decision cannot be executed. '182f'Requested buy cost: {total_cost}, balance: {balance}')183return False184else:185instrument_balance = self.trade_statistics.get_positions()186if order.quantity > instrument_balance:187self.logger.warning(f'Strategy decision cannot be executed. '188f'Requested sell quantity: {order.quantity}, balance: {instrument_balance}')189return False190return True191192def _load_historic_data(self, from_time: datetime.datetime, to_time: datetime.datetime = None):193try:194with Client(self.token, app_name=self.APP_NAME) as client:195yield from client.get_all_candles(196from_=from_time,197to=to_time,198interval=CandleInterval.CANDLE_INTERVAL_1_MIN,199figi=self.instrument_info.figi,200)201except InvestError as error:202self.logger.error(f'Failed to load historical data. Error: {error}')203204def _cancel_orders(self, client: Services, orders: list[OrderState]):205for order in orders:206try:207client.orders.cancel_order(account_id=self.account_id, order_id=order.order_id)208self.trade_statistics.cancel_order(order_id=order.order_id)209except InvestError as error:210self.logger.error(f'Failed to cancel order {order.order_id}. Error: {error}')211212def _post_trade_order(self, client: Services, trade_order: RobotTradeOrder) -> PostOrderResponse | None:213try:214if self.sandbox_mode:215order = client.sandbox.post_sandbox_order(216figi=self.instrument_info.figi,217quantity=trade_order.quantity,218price=trade_order.price.to_quotation() if trade_order.price is not None else None,219direction=trade_order.direction,220account_id=self.account_id,221order_type=trade_order.order_type,222order_id=str(uuid.uuid4())223)224else:225order = client.orders.post_order(226figi=self.instrument_info.figi,227quantity=trade_order.quantity,228price=trade_order.price.to_quotation() if trade_order.price is not None else None,229direction=trade_order.direction,230account_id=self.account_id,231order_type=trade_order.order_type,232order_id=str(uuid.uuid4())233)234except InvestError as error:235self.logger.error(f'Posting trade order failed :(. Order: {trade_order}; Exception: {error}')236return237self.logger.info(f'Placed trade order {order}')238self.orders_executed[order.order_id] = OrderExecutionInfo(direction=trade_order.direction)239self.trade_statistics.add_trade(order)240return order241242def _check_trade_orders(self, client: Services):243self.logger.debug(f'Updating trade orders info. Current trade orders num: {len(self.orders_executed)}')244orders_executed = list(self.orders_executed.items())245for order_id, execution_info in orders_executed:246if self.sandbox_mode:247order_state = client.sandbox.get_sandbox_order_state(248account_id=self.account_id, order_id=order_id249)250else:251order_state = client.orders.get_order_state(252account_id=self.account_id, order_id=order_id253)254255self.trade_statistics.add_trade(trade=order_state)256match order_state.execution_report_status:257case OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_FILL:258self.logger.info(f'Trade order {order_id} has been FULLY FILLED')259self.orders_executed.pop(order_id)260case OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_REJECTED:261self.logger.warning(f'Trade order {order_id} has been REJECTED')262self.orders_executed.pop(order_id)263case OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_CANCELLED:264self.logger.warning(f'Trade order {order_id} has been CANCELLED')265self.orders_executed.pop(order_id)266case OrderExecutionReportStatus.EXECUTION_REPORT_STATUS_PARTIALLYFILL:267self.logger.info(f'Trade order {order_id} has been PARTIALLY FILLED')268self.orders_executed[order_id] = OrderExecutionInfo(lots=order_state.lots_executed,269amount=order_state.total_order_amount,270direction=order_state.direction)271case _:272self.logger.debug(f'No updates on order {order_id}')273274self.logger.debug(f'Successfully updated trade orders. New trade orders num: {len(self.orders_executed)}')275276277class TradingRobotFactory:278APP_NAME = 'karpp'279instrument_info: Instrument280token: str281account_id: str282logger: logging.Logger283sandbox_mode: bool284285def __init__(self, token: str, account_id: str, figi: str = None, # pylint:disable=too-many-arguments286ticker: str = None, class_code: str = None, logger_level: int | str = 'INFO'):287self.instrument_info = self._get_instrument_info(token, figi, ticker, class_code).instrument288self.token = token289self.account_id = account_id290self.logger = self.setup_logger(logger_level)291self.sandbox_mode = self._validate_account(token, account_id, self.logger)292293def setup_logger(self, logger_level: int | str):294logger = logging.getLogger(f'robot.{self.instrument_info.ticker}')295logger.setLevel(logger_level)296formatter = logging.Formatter(fmt=('%(asctime)s %(levelname)s: %(message)s')) # todo: fixit297handler = logging.StreamHandler(stream=sys.stderr)298handler.setFormatter(formatter)299logger.addHandler(handler)300return logger301302def create_robot(self, trade_strategy: TradeStrategyBase, sandbox_mode: bool = True) -> TradingRobot:303money, positions = self._get_current_postitions()304trade_strategy.load_instrument_info(self.instrument_info)305stats = TradeStatisticsAnalyzer(306positions=positions,307money=money.to_float(), # todo: change to Money308instrument_info=self.instrument_info,309logger=self.logger.getChild(trade_strategy.strategy_id).getChild('stats')310)311return TradingRobot(token=self.token, account_id=self.account_id, sandbox_mode=sandbox_mode,312trade_strategy=trade_strategy, trade_statistics=stats, instrument_info=self.instrument_info,313logger=self.logger.getChild(trade_strategy.strategy_id))314315def _get_current_postitions(self) -> tuple[Money, int]:316# amount of money and instrument balance317with Client(self.token, app_name=self.APP_NAME) as client:318positions = client.operations.get_positions(account_id=self.account_id)319320instruments = [sec for sec in positions.securities if sec.figi == self.instrument_info.figi]321if len(instruments) > 0:322instrument = instruments[0].balance323else:324instrument = 0325326moneys = [m for m in positions.money if m.currency == self.instrument_info.currency]327if len(moneys) > 0:328money = Money(moneys[0].units, moneys[0].nano)329else:330money = Money(0, 0)331332return money, instrument333334@staticmethod335def _validate_account(token: str, account_id: str, logger: logging.Logger) -> bool:336try:337with Client(token, app_name=TradingRobotFactory.APP_NAME) as client:338accounts = [acc for acc in client.users.get_accounts().accounts if acc.id == account_id]339sandbox_mode = False340if len(accounts) == 0:341sandbox_mode = True342accounts = [acc for acc in client.sandbox.get_sandbox_accounts().accounts if acc.id == account_id]343if len(accounts) == 0:344logger.error(f'Account {account_id} not found.')345raise ValueError('Account not found')346347account = accounts[0]348if account.type not in [AccountType.ACCOUNT_TYPE_TINKOFF, AccountType.ACCOUNT_TYPE_INVEST_BOX]:349logger.error(f'Account type {account.type} is not supported')350raise ValueError('Unsupported account type')351if account.status != AccountStatus.ACCOUNT_STATUS_OPEN:352logger.error(f'Account status {account.status} is not supported')353raise ValueError('Unsupported account status')354if account.access_level != AccessLevel.ACCOUNT_ACCESS_LEVEL_FULL_ACCESS:355logger.error(f'No access to account. Current level is {account.access_level}')356raise ValueError('Insufficient access level')357358return sandbox_mode359360except InvestError as error:361logger.error(f'Failed to validate account. Exception: {error}')362raise error363364@staticmethod365def _get_instrument_info(token: str, figi: str = None, ticker: str = None, class_code: str = None):366with Client(token, app_name=TradingRobotFactory.APP_NAME) as client:367if figi is None:368if ticker is None or class_code is None:369raise ValueError('figi or both ticker and class_code must be not None')370return client.instruments.get_instrument_by(id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_TICKER,371class_code=class_code, id=ticker)372return client.instruments.get_instrument_by(id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_FIGI, id=figi)373374375