Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TradingCompetition2022-main/session/Session.py
5925 views
1
from tinkoff.invest.services import MarketDataStreamManager
2
from tinkoff.invest import (
3
MarketDataRequest,
4
SubscribeLastPriceRequest,
5
SubscriptionAction,
6
LastPriceInstrument,
7
InstrumentIdType,
8
OperationType,
9
RequestError,
10
SecurityTradingStatus
11
)
12
13
from datetime import date, datetime, timedelta
14
from decimal import Decimal
15
from account.Account import Account
16
from stock.StockStorage import StockStorage
17
from stock.StockBuilder import StockBuilder
18
from stock.Stock import Stock
19
from order.OrderStorage import OrderStorage, Order
20
import time
21
from logger.LoggerFactory import tech_log, LoggerFactory
22
from logger.BusinessLogger import BusinessLogger
23
from session.SessionCache import SessionCache
24
25
26
class Session:
27
""" Class to work with market session """
28
29
_session = None
30
31
@classmethod
32
def get_session(cls, account, stock_storage):
33
""" Get session (singleton) """
34
if cls._session is None:
35
cls._session = Session(account, stock_storage)
36
37
return cls._session
38
39
def __init__(self, account: Account, stock_storage: StockStorage):
40
self._account = account
41
self._stock_storage: StockStorage = stock_storage
42
self._market_data_stream: MarketDataStreamManager = self._account.client.create_market_data_stream()
43
self._session_date = date.today()
44
self._session_id = str(datetime.today().now())
45
self._stream_limit = 200 # ToDo read from TF platform
46
self._stream_count = 0
47
self._session_cache = SessionCache.get_session_cache()
48
49
@property
50
def session_id(self):
51
return self._session_id
52
53
@tech_log
54
def trading_loop(self):
55
""" Trading LOOP
56
Waiting for new last price and do action
57
"""
58
for market_data in self._market_data_stream:
59
self.stream_line_process(market_data)
60
61
def stream_line_process(self, market_data):
62
if not market_data.last_price:
63
# In ping from server do upd stock object
64
for stock in self._stock_storage:
65
self._process_in_loop(stock, market_data)
66
67
if market_data.last_price:
68
# In last price case Do process Stock object (with upd)
69
stock = self._stock_storage.get_stock_by_figi(market_data.last_price.figi)
70
if stock:
71
# Process with stock in trading loop
72
self._process_in_loop(stock, market_data)
73
74
if self._stock_storage.is_empty:
75
# In case when Stock storage is empty -> Done! Stop
76
self._market_data_stream.stop()
77
78
def _process_in_loop(self, stock, market_data):
79
""" Process with stock object in trading loop
80
"""
81
if self.is_stock_available_in_session(stock) is True:
82
# set last price into model
83
if market_data.last_price:
84
stock.model.set_last_price(last_price=Account.convert(market_data.last_price.price))
85
else:
86
# Stock is not available in this session.
87
# If time is comme Kill Stock
88
89
self._del_from_session(stock)
90
return
91
92
# Update orders result
93
if stock:
94
self._account.do_upd_order_result(stock)
95
96
if stock:
97
# Set order action (if necessary)
98
self._do_order_action(stock)
99
100
if self._is_ready_to_del_from_session(stock):
101
# If time is comme Kill Stock
102
self._del_from_session(stock)
103
return
104
105
if stock:
106
# ToDo Refactor this
107
if stock.is_trading_done is True:
108
# re init
109
new_stock = StockBuilder().re_init_stock(stock=stock, client=self._account.client)
110
self._stock_storage.delete(stock)
111
self._stock_storage.add(new_stock)
112
113
def _is_ready_to_del_from_session(self, stock) -> bool:
114
""" Check stock can be used or time to delete from this session """
115
# if stock.is_trading_done is True or self.is_stock_available_in_session(stock) is False:
116
if self.is_stock_available_in_session(stock) is False:
117
return True
118
else:
119
return False
120
121
def _del_from_session(self, stock: Stock):
122
# If trading is done or Stock is not available in this session,
123
# and We don't have any Non-completed order delete stock from storage
124
self._stock_storage.delete(stock)
125
# self.unsubscribe_last_price(stock.figi) # ToDo Add UnSubscribe
126
127
# Log data
128
LoggerFactory.get_business_logger_instance().add_event(BusinessLogger.STOCK_DEL_FROM_SESS, stock, stock)
129
del stock
130
131
@tech_log
132
def _do_order_action(self, stock):
133
""" Order Action
134
"""
135
# ToDo move this method into right Class (maybe account class is better place for this method)
136
match True:
137
case stock.model.is_ready_to_open_long:
138
# Check is it enough money for buy?
139
if self._account.is_ready_to_buy(stock) is True:
140
self._account.set_order(order_type=Account.LONG, stock=stock, quantity=stock.model.possible_lot)
141
142
case stock.model.is_ready_to_close_long:
143
if self._account.is_ready_to_sell(stock) is True:
144
self._account.set_order(order_type=Account.CLOSE_LONG, stock=stock, quantity=stock.lot_available)
145
146
case stock.model.is_ready_to_close_short:
147
# ToDo Implement Short orders
148
# self._account.set_order(account.CLOSE_SHORT)
149
pass
150
151
case stock.model.is_ready_to_open_short:
152
# ToDo Implement Short orders
153
# self._account.set_order(account.SHORT)
154
pass
155
156
def is_stock_available_in_session(self, stock: Stock) -> bool:
157
""" Is stock available in current session for trading?"""
158
# ToDo Add other status too (not only normal trading)
159
# ToDo Redefine this check. It is too expensive to ask platform every time
160
161
# Read from cache
162
curr_sock_status = self._session_cache.get_cached(group_key=SessionCache.STOCK_AVAILABLE, key=stock.ticker)
163
if curr_sock_status is None:
164
try:
165
curr_sock_status = self._account.client.instruments.share_by(
166
id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_TICKER,
167
class_code='TQBR',
168
id=stock.ticker).instrument.trading_status
169
# Save to cache
170
self._session_cache.cache_data(group=SessionCache.STOCK_AVAILABLE,
171
key=stock.ticker, data=curr_sock_status)
172
except RequestError as error:
173
# Log tech error
174
LoggerFactory.get_tech_logger_instance().add_except(error)
175
return True # ToDo Workaround
176
177
if SecurityTradingStatus:
178
if curr_sock_status == SecurityTradingStatus.SECURITY_TRADING_STATUS_NORMAL_TRADING:
179
return True
180
else:
181
return False
182
else:
183
return False
184
185
@staticmethod
186
def is_stock_open_in_order(stock) -> bool:
187
""" Check this stock has opened order """
188
order_storage: OrderStorage = OrderStorage.get_order_storage()
189
for order in order_storage.get_order_by_ticker(stock.ticker):
190
if order.is_order_completed is False:
191
return False
192
return True
193
194
def subscribe_last_price(self) -> MarketDataStreamManager:
195
""" Subscribe to last price (Market Data)"""
196
sub_last_price = SubscribeLastPriceRequest()
197
sub_last_price.subscription_action = SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE
198
sub_last_price.instruments = []
199
for i in self._stock_storage.get_all_figis():
200
sub_last_price.instruments.append(LastPriceInstrument(figi=i))
201
try:
202
self._market_data_stream.subscribe(MarketDataRequest(subscribe_last_price_request=sub_last_price))
203
except RequestError as error:
204
LoggerFactory.get_tech_logger_instance().add_except(error)
205
return self._market_data_stream
206
207
def unsubscribe_last_price(self, figi):
208
""" UnSubscribe from last price (Market Data)"""
209
# ToDo On Platform TF UnSubscribe is not work correct - need to check
210
sub_last_price = SubscribeLastPriceRequest()
211
sub_last_price.subscription_action = SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE
212
sub_last_price.instruments = [LastPriceInstrument(figi=figi)]
213
self._market_data_stream.unsubscribe(MarketDataRequest(subscribe_last_price_request=sub_last_price))
214
return self._market_data_stream
215
216
def get_session_result(self):
217
"""
218
Receive result of Session (result of trading)
219
:return:
220
"""
221
result = dict(count_long_opened=0, count_long_closed=0, count_orders=0,
222
fee=list(), sum_fee=Decimal(), buy=list(), sell=list())
223
224
from_ = datetime.today() - timedelta(days=1)
225
to = datetime.today()
226
227
count_long_opened = 0
228
count_long_closed = 0
229
for order in OrderStorage():
230
if order.order_type == Order.TYPE_LONG_OPEN:
231
count_long_opened += 1
232
if order.order_type == Order.TYPE_LONG_CLOSE:
233
count_long_closed += 1
234
count_orders = count_long_opened + count_long_closed
235
operations = self._account.get_operations(from_=from_, to=to)
236
result['count_long_opened'] = count_long_opened
237
result['count_long_closed'] = count_long_closed
238
result['count_orders'] = count_orders
239
240
def __get_dict_operation(name, operation):
241
return dict(name=name, payment=Account.convert(operation.payment),
242
currency=operation.currency, quantity=operation.quantity)
243
244
figi = dict()
245
for i in operations.operations:
246
instrument = None
247
if i.figi:
248
if i.figi not in figi:
249
try:
250
instrument = self._account.client.instruments.share_by(
251
id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_FIGI,
252
class_code='TQBR', id=i.figi).instrument
253
figi[i.figi] = instrument
254
except RequestError as error:
255
# Tech log
256
LoggerFactory.get_tech_logger_instance().add_except(error)
257
continue
258
else:
259
instrument = figi[i.figi]
260
261
if i.operation_type == OperationType.OPERATION_TYPE_BROKER_FEE:
262
# Collect FEE result
263
result['fee'].append(Account.convert(i.payment))
264
result['sum_fee'] += Account.convert(i.payment)
265
266
if instrument:
267
if i.operation_type == OperationType.OPERATION_TYPE_BUY:
268
# Collect all BUY operation
269
result['buy'].append(__get_dict_operation(instrument.name, i))
270
271
elif i.operation_type == OperationType.OPERATION_TYPE_SELL:
272
# Collect all SELL operation
273
result['sell'].append(__get_dict_operation(instrument.name, i))
274
return result
275
276
def _check_stream_limit(self):
277
""" Check if limit of stream grader then profile limit, wait a minute """
278
# ToDo it is workaround, It should be changed
279
if self._stream_count > (self._stream_limit - 10):
280
time.sleep(60)
281
self._stream_count = 0
282
self._stream_count += 1
283
284