Path: blob/master/ invest-robot-contest_invest-bot-main/invest_api/services/orders_service.py
7813 views
import logging12from tinkoff.invest import Client, OrderDirection, Quotation, OrderType, PostOrderResponse, OrderState34from invest_api.utils import generate_order_id5from invest_api.invest_error_decorators import invest_error_logging, invest_api_retry67__all__ = ("OrderService")89logger = logging.getLogger(__name__)101112class OrderService:13"""14The class encapsulate tinkoff order service api15"""16def __init__(self, token: str, app_name: str) -> None:17self.__token = token18self.__app_name = app_name1920@invest_api_retry()21@invest_error_logging22def __post_order(23self,24account_id: str,25figi: str,26count_lots: int,27price: Quotation,28direction: OrderDirection,29order_type: OrderType,30order_id: str31) -> PostOrderResponse:32with Client(self.__token, app_name=self.__app_name) as client:33return client.orders.post_order(34figi=figi,35quantity=count_lots,36price=price,37direction=direction,38account_id=account_id,39order_type=order_type,40order_id=order_id41)4243def post_market_order(44self,45account_id: str,46figi: str,47count_lots: int,48is_buy: bool49) -> str:50"""51Post market order52"""53logger.info(54f"Post market order account_id: {account_id}, "55f"figi: {figi}, count_lots: {count_lots}, is_buy: {is_buy}"56)5758order_id = self.__post_order(59account_id=account_id,60figi=figi,61count_lots=count_lots,62price=None,63direction=OrderDirection.ORDER_DIRECTION_BUY if is_buy else OrderDirection.ORDER_DIRECTION_SELL,64order_type=OrderType.ORDER_TYPE_MARKET,65order_id=generate_order_id()66).order_id6768logger.debug(f"order_id is {order_id}")6970return order_id7172@invest_api_retry()73@invest_error_logging74def cancel_order(self, account_id: str, order_id: str) -> None:75with Client(self.__token, app_name=self.__app_name) as client:76client.orders.cancel_order(account_id=account_id, order_id=order_id)7778@invest_api_retry()79@invest_error_logging80def get_order_state(self, account_id: str, order_id: str) -> OrderState:81with Client(self.__token, app_name=self.__app_name) as client:82return client.orders.get_order_state(account_id=account_id, order_id=order_id)8384@invest_api_retry()85@invest_error_logging86def get_orders(self, account_id: str) -> list[OrderState]:87with Client(self.__token, app_name=self.__app_name) as client:88return client.orders.get_orders(account_id=account_id).orders899091