Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_tinkoff-trading-bot-develop/app/strategies/interval/IntervalStrategy.py
5938 views
1
import asyncio
2
import logging
3
from datetime import timedelta
4
from typing import List, Optional
5
from uuid import uuid4
6
7
import numpy as np
8
from tinkoff.invest import CandleInterval, HistoricCandle, AioRequestError
9
from tinkoff.invest.grpc.orders_pb2 import (
10
ORDER_DIRECTION_SELL,
11
ORDER_DIRECTION_BUY,
12
ORDER_TYPE_MARKET,
13
)
14
from tinkoff.invest.utils import now
15
16
from app.client import client
17
from app.settings import settings
18
from app.stats.handler import StatsHandler
19
from app.strategies.interval.models import IntervalStrategyConfig, Corridor
20
from app.strategies.base import BaseStrategy
21
from app.strategies.models import StrategyName
22
from app.utils.portfolio import get_position, get_order
23
from app.utils.quotation import quotation_to_float
24
25
logger = logging.getLogger(__name__)
26
27
28
class IntervalStrategy(BaseStrategy):
29
"""
30
Interval strategy.
31
32
Main strategy logic is to buy at the lowest price and sell at the highest price of the
33
calculated interval.
34
35
Interval is calculated by taking interval_size percents of the last prices
36
for the last days_back_to_consider days. By default, it's set to 80 percents which means
37
that the interval is from 10th to 90th percentile.
38
"""
39
40
def __init__(self, figi: str, **kwargs):
41
self.account_id = settings.account_id
42
self.corridor: Optional[Corridor] = None
43
self.figi = figi
44
self.config: IntervalStrategyConfig = IntervalStrategyConfig(**kwargs)
45
self.stats_handler = StatsHandler(StrategyName.INTERVAL, client)
46
47
async def get_historical_data(self) -> List[HistoricCandle]:
48
"""
49
Gets historical data for the instrument. Returns list of candles.
50
Requests all the 1-min candles from days_back_to_consider days back to now.
51
52
:return: list of HistoricCandle
53
"""
54
candles = []
55
logger.debug(
56
f"Start getting historical data for {self.config.days_back_to_consider} "
57
f"days back from now. figi={self.figi}"
58
)
59
async for candle in client.get_all_candles(
60
figi=self.figi,
61
from_=now() - timedelta(days=self.config.days_back_to_consider),
62
to=now(),
63
interval=CandleInterval.CANDLE_INTERVAL_1_MIN,
64
):
65
candles.append(candle)
66
logger.debug(f"Found {len(candles)} candles. figi={self.figi}")
67
return candles
68
69
async def update_corridor(self) -> None:
70
"""
71
Gets historical data and calculates new corridor. Stores it in the class.
72
"""
73
candles = await self.get_historical_data()
74
if len(candles) == 0:
75
return
76
values = []
77
for candle in candles:
78
values.append(quotation_to_float(candle.close))
79
lower_percentile = (1 - self.config.interval_size) / 2 * 100
80
corridor = list(np.percentile(values, [lower_percentile, 100 - lower_percentile]))
81
logger.info(
82
f"Corridor: {corridor}. days_back_to_consider={self.config.days_back_to_consider} "
83
f"figi={self.figi}"
84
)
85
self.corridor = Corridor(bottom=corridor[0], top=corridor[1])
86
87
async def get_position_quantity(self) -> int:
88
"""
89
Get quantity of the instrument in the position.
90
:return: int - quantity
91
"""
92
positions = (await client.get_portfolio(account_id=self.account_id)).positions
93
position = get_position(positions, self.figi)
94
if position is None:
95
return 0
96
return int(quotation_to_float(position.quantity))
97
98
async def handle_corridor_crossing_top(self, last_price: float) -> None:
99
"""
100
This method is called when last price is higher than top corridor border.
101
Check how many shares we already have and sell them.
102
103
:param last_price: last price of the instrument
104
"""
105
position_quantity = await self.get_position_quantity()
106
if position_quantity > 0:
107
logger.info(
108
f"Selling {position_quantity} shares. Last price={last_price} figi={self.figi}"
109
)
110
try:
111
posted_order = await client.post_order(
112
order_id=str(uuid4()),
113
figi=self.figi,
114
direction=ORDER_DIRECTION_SELL,
115
quantity=position_quantity,
116
order_type=ORDER_TYPE_MARKET,
117
account_id=self.account_id,
118
)
119
except Exception as e:
120
logger.error(f"Failed to post sell order. figi={self.figi}. {e}")
121
return
122
asyncio.create_task(
123
self.stats_handler.handle_new_order(
124
order_id=posted_order.order_id, account_id=self.account_id
125
)
126
)
127
128
async def handle_corridor_crossing_bottom(self, last_price: float) -> None:
129
"""
130
This method is called when last price is lower than bottom corridor border.
131
Check how many shares we already have and buy more until the quantity_limit is reached.
132
133
:param last_price: last price of the instrument
134
"""
135
position_quantity = await self.get_position_quantity()
136
if position_quantity < self.config.quantity_limit:
137
quantity_to_buy = self.config.quantity_limit - position_quantity
138
logger.info(
139
f"Buying {quantity_to_buy} shares. Last price={last_price} figi={self.figi}"
140
)
141
142
try:
143
posted_order = await client.post_order(
144
order_id=str(uuid4()),
145
figi=self.figi,
146
direction=ORDER_DIRECTION_BUY,
147
quantity=quantity_to_buy,
148
order_type=ORDER_TYPE_MARKET,
149
account_id=self.account_id,
150
)
151
except Exception as e:
152
logger.error(f"Failed to post buy order. figi={self.figi}. {e}")
153
return
154
asyncio.create_task(
155
self.stats_handler.handle_new_order(
156
order_id=posted_order.order_id, account_id=self.account_id
157
)
158
)
159
160
async def get_last_price(self) -> float:
161
"""
162
Get last price of the instrument.
163
:return: float - last price
164
"""
165
last_prices_response = await client.get_last_prices(figi=[self.figi])
166
last_prices = last_prices_response.last_prices
167
return quotation_to_float(last_prices.pop().price)
168
169
async def validate_stop_loss(self, last_price: float) -> None:
170
"""
171
Check if stop loss is reached. If yes, then sells all the shares.
172
:param last_price: Last price of the instrument.
173
"""
174
positions = (await client.get_portfolio(account_id=self.account_id)).positions
175
position = get_position(positions, self.figi)
176
if position is None:
177
return
178
position_price = quotation_to_float(position.average_position_price)
179
if position_price < last_price:
180
logger.info(f"Stop loss triggered. Last price={last_price} figi={self.figi}")
181
try:
182
posted_order = await client.post_order(
183
order_id=str(uuid4()),
184
figi=self.figi,
185
direction=ORDER_DIRECTION_SELL,
186
quantity=int(quotation_to_float(position.quantity)),
187
order_type=ORDER_TYPE_MARKET,
188
account_id=self.account_id,
189
)
190
except Exception as e:
191
logger.error(f"Failed to post sell order. figi={self.figi}. {e}")
192
return
193
asyncio.create_task(
194
self.stats_handler.handle_new_order(
195
order_id=posted_order.order_id, account_id=self.account_id
196
)
197
)
198
return
199
200
async def ensure_market_open(self):
201
"""
202
Ensure that the market is open. Holds the loop until the instrument is available.
203
:return: when instrument is available for trading
204
"""
205
trading_status = await client.get_trading_status(figi=self.figi)
206
while not (
207
trading_status.market_order_available_flag and trading_status.api_trade_available_flag
208
):
209
logger.debug(f"Waiting for the market to open. figi={self.figi}")
210
await asyncio.sleep(60)
211
trading_status = await client.get_trading_status(figi=self.figi)
212
213
async def main_cycle(self):
214
while True:
215
try:
216
await self.ensure_market_open()
217
await self.update_corridor()
218
219
orders = await client.get_orders(account_id=self.account_id)
220
if get_order(orders=orders.orders, figi=self.figi):
221
logger.info(f"There are orders in progress. Waiting. figi={self.figi}")
222
continue
223
224
last_price = await self.get_last_price()
225
logger.debug(f"Last price: {last_price}, figi={self.figi}")
226
227
await self.validate_stop_loss(last_price)
228
229
if last_price >= self.corridor.top:
230
logger.debug(
231
f"Last price {last_price} is higher than top corridor border "
232
f"{self.corridor.top}. figi={self.figi}"
233
)
234
await self.handle_corridor_crossing_top(last_price=last_price)
235
elif last_price <= self.corridor.bottom:
236
logger.debug(
237
f"Last price {last_price} is lower than bottom corridor border "
238
f"{self.corridor.bottom}. figi={self.figi}"
239
)
240
await self.handle_corridor_crossing_bottom(last_price=last_price)
241
except AioRequestError as are:
242
logger.error(f"Client error {are}")
243
244
await asyncio.sleep(self.config.check_interval)
245
246
async def start(self):
247
if self.account_id is None:
248
try:
249
self.account_id = (await client.get_accounts()).accounts.pop().id
250
except AioRequestError as are:
251
logger.error(f"Error taking account id. Stopping strategy. {are}")
252
return
253
await self.main_cycle()
254
255