Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_invest-bot-main/invest_api/services/orders_service.py
7813 views
1
import logging
2
3
from tinkoff.invest import Client, OrderDirection, Quotation, OrderType, PostOrderResponse, OrderState
4
5
from invest_api.utils import generate_order_id
6
from invest_api.invest_error_decorators import invest_error_logging, invest_api_retry
7
8
__all__ = ("OrderService")
9
10
logger = logging.getLogger(__name__)
11
12
13
class OrderService:
14
"""
15
The class encapsulate tinkoff order service api
16
"""
17
def __init__(self, token: str, app_name: str) -> None:
18
self.__token = token
19
self.__app_name = app_name
20
21
@invest_api_retry()
22
@invest_error_logging
23
def __post_order(
24
self,
25
account_id: str,
26
figi: str,
27
count_lots: int,
28
price: Quotation,
29
direction: OrderDirection,
30
order_type: OrderType,
31
order_id: str
32
) -> PostOrderResponse:
33
with Client(self.__token, app_name=self.__app_name) as client:
34
return client.orders.post_order(
35
figi=figi,
36
quantity=count_lots,
37
price=price,
38
direction=direction,
39
account_id=account_id,
40
order_type=order_type,
41
order_id=order_id
42
)
43
44
def post_market_order(
45
self,
46
account_id: str,
47
figi: str,
48
count_lots: int,
49
is_buy: bool
50
) -> str:
51
"""
52
Post market order
53
"""
54
logger.info(
55
f"Post market order account_id: {account_id}, "
56
f"figi: {figi}, count_lots: {count_lots}, is_buy: {is_buy}"
57
)
58
59
order_id = self.__post_order(
60
account_id=account_id,
61
figi=figi,
62
count_lots=count_lots,
63
price=None,
64
direction=OrderDirection.ORDER_DIRECTION_BUY if is_buy else OrderDirection.ORDER_DIRECTION_SELL,
65
order_type=OrderType.ORDER_TYPE_MARKET,
66
order_id=generate_order_id()
67
).order_id
68
69
logger.debug(f"order_id is {order_id}")
70
71
return order_id
72
73
@invest_api_retry()
74
@invest_error_logging
75
def cancel_order(self, account_id: str, order_id: str) -> None:
76
with Client(self.__token, app_name=self.__app_name) as client:
77
client.orders.cancel_order(account_id=account_id, order_id=order_id)
78
79
@invest_api_retry()
80
@invest_error_logging
81
def get_order_state(self, account_id: str, order_id: str) -> OrderState:
82
with Client(self.__token, app_name=self.__app_name) as client:
83
return client.orders.get_order_state(account_id=account_id, order_id=order_id)
84
85
@invest_api_retry()
86
@invest_error_logging
87
def get_orders(self, account_id: str) -> list[OrderState]:
88
with Client(self.__token, app_name=self.__app_name) as client:
89
return client.orders.get_orders(account_id=account_id).orders
90
91