Path: blob/master/ invest-robot-contest_TradingCompetition2022-main/session/Session.py
5925 views
from tinkoff.invest.services import MarketDataStreamManager1from tinkoff.invest import (2MarketDataRequest,3SubscribeLastPriceRequest,4SubscriptionAction,5LastPriceInstrument,6InstrumentIdType,7OperationType,8RequestError,9SecurityTradingStatus10)1112from datetime import date, datetime, timedelta13from decimal import Decimal14from account.Account import Account15from stock.StockStorage import StockStorage16from stock.StockBuilder import StockBuilder17from stock.Stock import Stock18from order.OrderStorage import OrderStorage, Order19import time20from logger.LoggerFactory import tech_log, LoggerFactory21from logger.BusinessLogger import BusinessLogger22from session.SessionCache import SessionCache232425class Session:26""" Class to work with market session """2728_session = None2930@classmethod31def get_session(cls, account, stock_storage):32""" Get session (singleton) """33if cls._session is None:34cls._session = Session(account, stock_storage)3536return cls._session3738def __init__(self, account: Account, stock_storage: StockStorage):39self._account = account40self._stock_storage: StockStorage = stock_storage41self._market_data_stream: MarketDataStreamManager = self._account.client.create_market_data_stream()42self._session_date = date.today()43self._session_id = str(datetime.today().now())44self._stream_limit = 200 # ToDo read from TF platform45self._stream_count = 046self._session_cache = SessionCache.get_session_cache()4748@property49def session_id(self):50return self._session_id5152@tech_log53def trading_loop(self):54""" Trading LOOP55Waiting for new last price and do action56"""57for market_data in self._market_data_stream:58self.stream_line_process(market_data)5960def stream_line_process(self, market_data):61if not market_data.last_price:62# In ping from server do upd stock object63for stock in self._stock_storage:64self._process_in_loop(stock, market_data)6566if market_data.last_price:67# In last price case Do process Stock object (with upd)68stock = self._stock_storage.get_stock_by_figi(market_data.last_price.figi)69if stock:70# Process with stock in trading loop71self._process_in_loop(stock, market_data)7273if self._stock_storage.is_empty:74# In case when Stock storage is empty -> Done! Stop75self._market_data_stream.stop()7677def _process_in_loop(self, stock, market_data):78""" Process with stock object in trading loop79"""80if self.is_stock_available_in_session(stock) is True:81# set last price into model82if market_data.last_price:83stock.model.set_last_price(last_price=Account.convert(market_data.last_price.price))84else:85# Stock is not available in this session.86# If time is comme Kill Stock8788self._del_from_session(stock)89return9091# Update orders result92if stock:93self._account.do_upd_order_result(stock)9495if stock:96# Set order action (if necessary)97self._do_order_action(stock)9899if self._is_ready_to_del_from_session(stock):100# If time is comme Kill Stock101self._del_from_session(stock)102return103104if stock:105# ToDo Refactor this106if stock.is_trading_done is True:107# re init108new_stock = StockBuilder().re_init_stock(stock=stock, client=self._account.client)109self._stock_storage.delete(stock)110self._stock_storage.add(new_stock)111112def _is_ready_to_del_from_session(self, stock) -> bool:113""" Check stock can be used or time to delete from this session """114# if stock.is_trading_done is True or self.is_stock_available_in_session(stock) is False:115if self.is_stock_available_in_session(stock) is False:116return True117else:118return False119120def _del_from_session(self, stock: Stock):121# If trading is done or Stock is not available in this session,122# and We don't have any Non-completed order delete stock from storage123self._stock_storage.delete(stock)124# self.unsubscribe_last_price(stock.figi) # ToDo Add UnSubscribe125126# Log data127LoggerFactory.get_business_logger_instance().add_event(BusinessLogger.STOCK_DEL_FROM_SESS, stock, stock)128del stock129130@tech_log131def _do_order_action(self, stock):132""" Order Action133"""134# ToDo move this method into right Class (maybe account class is better place for this method)135match True:136case stock.model.is_ready_to_open_long:137# Check is it enough money for buy?138if self._account.is_ready_to_buy(stock) is True:139self._account.set_order(order_type=Account.LONG, stock=stock, quantity=stock.model.possible_lot)140141case stock.model.is_ready_to_close_long:142if self._account.is_ready_to_sell(stock) is True:143self._account.set_order(order_type=Account.CLOSE_LONG, stock=stock, quantity=stock.lot_available)144145case stock.model.is_ready_to_close_short:146# ToDo Implement Short orders147# self._account.set_order(account.CLOSE_SHORT)148pass149150case stock.model.is_ready_to_open_short:151# ToDo Implement Short orders152# self._account.set_order(account.SHORT)153pass154155def is_stock_available_in_session(self, stock: Stock) -> bool:156""" Is stock available in current session for trading?"""157# ToDo Add other status too (not only normal trading)158# ToDo Redefine this check. It is too expensive to ask platform every time159160# Read from cache161curr_sock_status = self._session_cache.get_cached(group_key=SessionCache.STOCK_AVAILABLE, key=stock.ticker)162if curr_sock_status is None:163try:164curr_sock_status = self._account.client.instruments.share_by(165id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_TICKER,166class_code='TQBR',167id=stock.ticker).instrument.trading_status168# Save to cache169self._session_cache.cache_data(group=SessionCache.STOCK_AVAILABLE,170key=stock.ticker, data=curr_sock_status)171except RequestError as error:172# Log tech error173LoggerFactory.get_tech_logger_instance().add_except(error)174return True # ToDo Workaround175176if SecurityTradingStatus:177if curr_sock_status == SecurityTradingStatus.SECURITY_TRADING_STATUS_NORMAL_TRADING:178return True179else:180return False181else:182return False183184@staticmethod185def is_stock_open_in_order(stock) -> bool:186""" Check this stock has opened order """187order_storage: OrderStorage = OrderStorage.get_order_storage()188for order in order_storage.get_order_by_ticker(stock.ticker):189if order.is_order_completed is False:190return False191return True192193def subscribe_last_price(self) -> MarketDataStreamManager:194""" Subscribe to last price (Market Data)"""195sub_last_price = SubscribeLastPriceRequest()196sub_last_price.subscription_action = SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE197sub_last_price.instruments = []198for i in self._stock_storage.get_all_figis():199sub_last_price.instruments.append(LastPriceInstrument(figi=i))200try:201self._market_data_stream.subscribe(MarketDataRequest(subscribe_last_price_request=sub_last_price))202except RequestError as error:203LoggerFactory.get_tech_logger_instance().add_except(error)204return self._market_data_stream205206def unsubscribe_last_price(self, figi):207""" UnSubscribe from last price (Market Data)"""208# ToDo On Platform TF UnSubscribe is not work correct - need to check209sub_last_price = SubscribeLastPriceRequest()210sub_last_price.subscription_action = SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE211sub_last_price.instruments = [LastPriceInstrument(figi=figi)]212self._market_data_stream.unsubscribe(MarketDataRequest(subscribe_last_price_request=sub_last_price))213return self._market_data_stream214215def get_session_result(self):216"""217Receive result of Session (result of trading)218:return:219"""220result = dict(count_long_opened=0, count_long_closed=0, count_orders=0,221fee=list(), sum_fee=Decimal(), buy=list(), sell=list())222223from_ = datetime.today() - timedelta(days=1)224to = datetime.today()225226count_long_opened = 0227count_long_closed = 0228for order in OrderStorage():229if order.order_type == Order.TYPE_LONG_OPEN:230count_long_opened += 1231if order.order_type == Order.TYPE_LONG_CLOSE:232count_long_closed += 1233count_orders = count_long_opened + count_long_closed234operations = self._account.get_operations(from_=from_, to=to)235result['count_long_opened'] = count_long_opened236result['count_long_closed'] = count_long_closed237result['count_orders'] = count_orders238239def __get_dict_operation(name, operation):240return dict(name=name, payment=Account.convert(operation.payment),241currency=operation.currency, quantity=operation.quantity)242243figi = dict()244for i in operations.operations:245instrument = None246if i.figi:247if i.figi not in figi:248try:249instrument = self._account.client.instruments.share_by(250id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_FIGI,251class_code='TQBR', id=i.figi).instrument252figi[i.figi] = instrument253except RequestError as error:254# Tech log255LoggerFactory.get_tech_logger_instance().add_except(error)256continue257else:258instrument = figi[i.figi]259260if i.operation_type == OperationType.OPERATION_TYPE_BROKER_FEE:261# Collect FEE result262result['fee'].append(Account.convert(i.payment))263result['sum_fee'] += Account.convert(i.payment)264265if instrument:266if i.operation_type == OperationType.OPERATION_TYPE_BUY:267# Collect all BUY operation268result['buy'].append(__get_dict_operation(instrument.name, i))269270elif i.operation_type == OperationType.OPERATION_TYPE_SELL:271# Collect all SELL operation272result['sell'].append(__get_dict_operation(instrument.name, i))273return result274275def _check_stream_limit(self):276""" Check if limit of stream grader then profile limit, wait a minute """277# ToDo it is workaround, It should be changed278if self._stream_count > (self._stream_limit - 10):279time.sleep(60)280self._stream_count = 0281self._stream_count += 1282283284