Path: blob/master/ invest-robot-contest_tinkoff-trading-bot-develop/app/strategies/interval/IntervalStrategy.py
5938 views
import asyncio1import logging2from datetime import timedelta3from typing import List, Optional4from uuid import uuid456import numpy as np7from tinkoff.invest import CandleInterval, HistoricCandle, AioRequestError8from tinkoff.invest.grpc.orders_pb2 import (9ORDER_DIRECTION_SELL,10ORDER_DIRECTION_BUY,11ORDER_TYPE_MARKET,12)13from tinkoff.invest.utils import now1415from app.client import client16from app.settings import settings17from app.stats.handler import StatsHandler18from app.strategies.interval.models import IntervalStrategyConfig, Corridor19from app.strategies.base import BaseStrategy20from app.strategies.models import StrategyName21from app.utils.portfolio import get_position, get_order22from app.utils.quotation import quotation_to_float2324logger = logging.getLogger(__name__)252627class IntervalStrategy(BaseStrategy):28"""29Interval strategy.3031Main strategy logic is to buy at the lowest price and sell at the highest price of the32calculated interval.3334Interval is calculated by taking interval_size percents of the last prices35for the last days_back_to_consider days. By default, it's set to 80 percents which means36that the interval is from 10th to 90th percentile.37"""3839def __init__(self, figi: str, **kwargs):40self.account_id = settings.account_id41self.corridor: Optional[Corridor] = None42self.figi = figi43self.config: IntervalStrategyConfig = IntervalStrategyConfig(**kwargs)44self.stats_handler = StatsHandler(StrategyName.INTERVAL, client)4546async def get_historical_data(self) -> List[HistoricCandle]:47"""48Gets historical data for the instrument. Returns list of candles.49Requests all the 1-min candles from days_back_to_consider days back to now.5051:return: list of HistoricCandle52"""53candles = []54logger.debug(55f"Start getting historical data for {self.config.days_back_to_consider} "56f"days back from now. figi={self.figi}"57)58async for candle in client.get_all_candles(59figi=self.figi,60from_=now() - timedelta(days=self.config.days_back_to_consider),61to=now(),62interval=CandleInterval.CANDLE_INTERVAL_1_MIN,63):64candles.append(candle)65logger.debug(f"Found {len(candles)} candles. figi={self.figi}")66return candles6768async def update_corridor(self) -> None:69"""70Gets historical data and calculates new corridor. Stores it in the class.71"""72candles = await self.get_historical_data()73if len(candles) == 0:74return75values = []76for candle in candles:77values.append(quotation_to_float(candle.close))78lower_percentile = (1 - self.config.interval_size) / 2 * 10079corridor = list(np.percentile(values, [lower_percentile, 100 - lower_percentile]))80logger.info(81f"Corridor: {corridor}. days_back_to_consider={self.config.days_back_to_consider} "82f"figi={self.figi}"83)84self.corridor = Corridor(bottom=corridor[0], top=corridor[1])8586async def get_position_quantity(self) -> int:87"""88Get quantity of the instrument in the position.89:return: int - quantity90"""91positions = (await client.get_portfolio(account_id=self.account_id)).positions92position = get_position(positions, self.figi)93if position is None:94return 095return int(quotation_to_float(position.quantity))9697async def handle_corridor_crossing_top(self, last_price: float) -> None:98"""99This method is called when last price is higher than top corridor border.100Check how many shares we already have and sell them.101102:param last_price: last price of the instrument103"""104position_quantity = await self.get_position_quantity()105if position_quantity > 0:106logger.info(107f"Selling {position_quantity} shares. Last price={last_price} figi={self.figi}"108)109try:110posted_order = await client.post_order(111order_id=str(uuid4()),112figi=self.figi,113direction=ORDER_DIRECTION_SELL,114quantity=position_quantity,115order_type=ORDER_TYPE_MARKET,116account_id=self.account_id,117)118except Exception as e:119logger.error(f"Failed to post sell order. figi={self.figi}. {e}")120return121asyncio.create_task(122self.stats_handler.handle_new_order(123order_id=posted_order.order_id, account_id=self.account_id124)125)126127async def handle_corridor_crossing_bottom(self, last_price: float) -> None:128"""129This method is called when last price is lower than bottom corridor border.130Check how many shares we already have and buy more until the quantity_limit is reached.131132:param last_price: last price of the instrument133"""134position_quantity = await self.get_position_quantity()135if position_quantity < self.config.quantity_limit:136quantity_to_buy = self.config.quantity_limit - position_quantity137logger.info(138f"Buying {quantity_to_buy} shares. Last price={last_price} figi={self.figi}"139)140141try:142posted_order = await client.post_order(143order_id=str(uuid4()),144figi=self.figi,145direction=ORDER_DIRECTION_BUY,146quantity=quantity_to_buy,147order_type=ORDER_TYPE_MARKET,148account_id=self.account_id,149)150except Exception as e:151logger.error(f"Failed to post buy order. figi={self.figi}. {e}")152return153asyncio.create_task(154self.stats_handler.handle_new_order(155order_id=posted_order.order_id, account_id=self.account_id156)157)158159async def get_last_price(self) -> float:160"""161Get last price of the instrument.162:return: float - last price163"""164last_prices_response = await client.get_last_prices(figi=[self.figi])165last_prices = last_prices_response.last_prices166return quotation_to_float(last_prices.pop().price)167168async def validate_stop_loss(self, last_price: float) -> None:169"""170Check if stop loss is reached. If yes, then sells all the shares.171:param last_price: Last price of the instrument.172"""173positions = (await client.get_portfolio(account_id=self.account_id)).positions174position = get_position(positions, self.figi)175if position is None:176return177position_price = quotation_to_float(position.average_position_price)178if position_price < last_price:179logger.info(f"Stop loss triggered. Last price={last_price} figi={self.figi}")180try:181posted_order = await client.post_order(182order_id=str(uuid4()),183figi=self.figi,184direction=ORDER_DIRECTION_SELL,185quantity=int(quotation_to_float(position.quantity)),186order_type=ORDER_TYPE_MARKET,187account_id=self.account_id,188)189except Exception as e:190logger.error(f"Failed to post sell order. figi={self.figi}. {e}")191return192asyncio.create_task(193self.stats_handler.handle_new_order(194order_id=posted_order.order_id, account_id=self.account_id195)196)197return198199async def ensure_market_open(self):200"""201Ensure that the market is open. Holds the loop until the instrument is available.202:return: when instrument is available for trading203"""204trading_status = await client.get_trading_status(figi=self.figi)205while not (206trading_status.market_order_available_flag and trading_status.api_trade_available_flag207):208logger.debug(f"Waiting for the market to open. figi={self.figi}")209await asyncio.sleep(60)210trading_status = await client.get_trading_status(figi=self.figi)211212async def main_cycle(self):213while True:214try:215await self.ensure_market_open()216await self.update_corridor()217218orders = await client.get_orders(account_id=self.account_id)219if get_order(orders=orders.orders, figi=self.figi):220logger.info(f"There are orders in progress. Waiting. figi={self.figi}")221continue222223last_price = await self.get_last_price()224logger.debug(f"Last price: {last_price}, figi={self.figi}")225226await self.validate_stop_loss(last_price)227228if last_price >= self.corridor.top:229logger.debug(230f"Last price {last_price} is higher than top corridor border "231f"{self.corridor.top}. figi={self.figi}"232)233await self.handle_corridor_crossing_top(last_price=last_price)234elif last_price <= self.corridor.bottom:235logger.debug(236f"Last price {last_price} is lower than bottom corridor border "237f"{self.corridor.bottom}. figi={self.figi}"238)239await self.handle_corridor_crossing_bottom(last_price=last_price)240except AioRequestError as are:241logger.error(f"Client error {are}")242243await asyncio.sleep(self.config.check_interval)244245async def start(self):246if self.account_id is None:247try:248self.account_id = (await client.get_accounts()).accounts.pop().id249except AioRequestError as are:250logger.error(f"Error taking account id. Stopping strategy. {are}")251return252await self.main_cycle()253254255