Path: blob/master/ invest-robot-contest_tinkoffSDK-master/my_trader.py
5925 views
import logging1import time2from typing import Iterator, List3import time4from datetime import timedelta, datetime, timezone56import tinkoff7from tinkoff.invest import (8CandleInstrument,9InvestError,10MarketDataRequest,11MarketDataResponse,12SubscribeCandlesRequest,13SubscriptionAction,14OrderDirection,15OrderType,16RequestError,17)18from tinkoff.invest.services import Services19#from tinkoff.invest.strategies.base.account_manager import AccountManager20from tinkoff.invest.strategies.base.errors import MarketDataNotAvailableError21from tinkoff.invest.strategies.base.event import DataEvent, SignalEvent22from tinkoff.invest.strategies.base.models import CandleEvent23from tinkoff.invest.strategies.base.signal import CloseSignal, OpenSignal, Signal,OpenLongMarketOrder,CloseLongMarketOrder24#from tinkoff.invest.strategies.base.signal_executor_base import SignalExecutor25from tinkoff.invest.strategies.base.trader_base import Trader26#from tinkoff.invest.strategies.moving_average.strategy import MovingAverageStrategy27from tinkoff.invest.strategies.moving_average.strategy_settings import (28MovingAverageStrategySettings,29)30from tinkoff.invest.strategies.moving_average.strategy_state import (31MovingAverageStrategyState,32)33from tinkoff.invest.strategies.moving_average.supervisor import (34MovingAverageStrategySupervisor,35)36from tinkoff.invest.utils import (37candle_interval_to_subscription_interval,38floor_datetime,39now,40)41import my_signal_executor_base42from my_signal_executor_base import SignalExecutor43from my_account_manager import AccountManager44import my_moving_average45import my_strategy46from my_strategy import MovingAverageStrategy474849logger = logging.getLogger(__name__)505152class MovingAverageStrategyTrader(Trader):53def __init__(54self,55strategy: MovingAverageStrategy,56settings: MovingAverageStrategySettings,57services: Services,58state: MovingAverageStrategyState,59signal_executor: SignalExecutor,60account_manager: AccountManager,61supervisor: MovingAverageStrategySupervisor,62):63super().__init__(strategy, services, settings)64self._settings: MovingAverageStrategySettings = settings65self._strategy = strategy66self._services = services67self._data: List[CandleEvent]68self._market_data_stream: Iterator[MarketDataResponse]69self._state = state70self._signal_executor = signal_executor71self._account_manager = account_manager72self._supervisor = supervisor7374self._data = list(75self._load_candles(self._settings.short_period + self._settings.long_period)76)77for candle_event in self._data:78self._supervisor.notify(self._convert_to_data_event(candle_event))79self._ensure_marginal_trade_active()8081self._subscribe()8283self._strategy.fit(self._data)8485def _ensure_marginal_trade_active(self) -> None:86self._account_manager.ensure_marginal_trade()8788def _subscribe(self):89current_instrument = CandleInstrument(90figi=self._settings.share_id,91interval=candle_interval_to_subscription_interval(92self._settings.candle_interval93),94)95candle_subscribe_request = MarketDataRequest(96subscribe_candles_request=SubscribeCandlesRequest(97subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,98instruments=[current_instrument],99)100)101102def request_iterator():103yield candle_subscribe_request104while True:105time.sleep(1)106107self._market_data_stream = self._services.market_data_stream.market_data_stream(108request_iterator()109)110111def _is_candle_fresh(self, candle: tinkoff.invest.Candle) -> bool:112is_fresh_border = floor_datetime(113now(), delta=self._settings.candle_interval_timedelta114)115logger.debug(116"Checking if candle is fresh: candle.time=%s > is_fresh_border=%s %s)",117candle.time,118is_fresh_border,119candle.time >= is_fresh_border,120)121return candle.time >= is_fresh_border122123@staticmethod124def _convert_to_data_event(candle_event: CandleEvent) -> DataEvent:125return DataEvent(candle_event=candle_event, time=candle_event.time)126127def _make_observations(self) -> None:128while True:129market_data_response: MarketDataResponse = next(self._market_data_stream)130logger.debug("got market_data_response: %s", market_data_response)131if market_data_response.candle is None:132logger.debug("market_data_response didn't have candle")133continue134candle = market_data_response.candle135logger.debug("candle extracted: %s", candle)136candle_event = self._convert_candle(candle)137self._strategy.observe(candle_event)138self._supervisor.notify(self._convert_to_data_event(candle_event))139if self._is_candle_fresh(candle):140logger.info("Data refreshed")141break142143def _refresh_data(self) -> None:144logger.info("Refreshing data")145try:146self._make_observations()147except StopIteration as e:148logger.info("Fresh quotations not available")149raise MarketDataNotAvailableError() from e150151def _filter_closing_signals(self, signals: List[Signal]) -> List[Signal]:152return list(filter(lambda signal: isinstance(signal, CloseSignal), signals))153154def _filter_opening_signals(self, signals: List[Signal]) -> List[Signal]:155return list(filter(lambda signal: isinstance(signal, OpenSignal), signals))156157def _execute(self, signal: Signal) -> None:158logger.info("Trying to execute signal %s", signal)159try:160self._signal_executor.execute(signal)161except RequestError:162was_executed = False163logger.info("Signal was_executed = False")164logger.info("RequestError.__cause__ %s", RequestError.__cause__)165logger.info("RequestError.__context__ %s", RequestError.__context__)166logger.info("RequestError.code %s", RequestError.code)167logger.info("RequestError.details %s", RequestError.details)168except:169was_executed = False170else:171was_executed = True172self._supervisor.notify(173SignalEvent(signal=signal, was_executed=was_executed, time=now())174)175176#if len(self._data) != 0:177# order_id = "AndreiSoiko_" + str(datetime.now().microsecond) + "_" + str(datetime.now().second)178179#Это отладочный код для выставления ордеров, оставлю пока здесь, потому что _signal_executor.execute может работать с ошибками из-за order_id,180# а может по другим причинам.181# if isinstance(signal,OpenLongMarketOrder):182# post_order_response = self._services.orders.post_order(183# figi= self._strategy._settings.share_id,184# quantity=1,185# #price= self._data.pop().candle.close,186# direction = OrderDirection(1), #ORDER_DIRECTION_BUY = 1187# account_id=self._settings.account_id,188# order_type= OrderType(2), #ORDER_TYPE_MARKET = 2189# order_id= order_id ,190# )191# logger.info("post_order_response %s", post_order_response)192# elif isinstance(signal,CloseLongMarketOrder):193# post_order_response = self._services.orders.post_order(194# figi= self._strategy._settings.share_id,195# #quantity=self._strategy._state.position, #Количество можно передовать из стратегии.196# quantity=1,197# #price= self._data.pop().candle.close,198# direction = OrderDirection(2), #ORDER_DIRECTION_SELL = 2199# account_id=self._settings.account_id,200# order_type= OrderType(2), #ORDER_TYPE_MARKET = 2201# order_id= order_id ,202# )203204# logger.info("post_order_response %s", post_order_response)205206207208209def _get_signals(self) -> List[Signal]:210signals = list(self._strategy.predict())211return [212*self._filter_closing_signals(signals),213*self._filter_opening_signals(signals),214]215216def trade(self) -> None:217"""Делает попытку следовать стратегии."""218219logger.info("Balance: %s", self._account_manager.get_current_balance())220self._refresh_data()221222signals = self._get_signals()223if signals:224logger.info("Got signals %s", signals)225for signal in signals:226self._execute(signal)227if self._state.position == 0:228logger.info("Trade try complete")229230