Path: blob/master/ invest-robot-contest_tinkoff-invest-volume-analysis-robot-master/trading_robot.py
5925 views
import asyncio1import logging2from datetime import timedelta34import pandas as pd5from tinkoff.invest import (6AsyncClient7)8from tinkoff.invest.utils import now910from constants import ONE_HOUR_TO_MINUTES11from services.order_service import OrderService12from services.user_service import UserService13from settings import INSTRUMENTS, CAN_OPEN_ORDERS, TOKEN14from strategies.profile_touch_strategy import ProfileTouchStrategy15from utils.exchange_util import is_open_exchange16from utils.instrument_util import request_iterator, get_file_path_by_instrument17from utils.logger import init_logging18from utils.parse_util import processed_data19from utils.strategy_util import merge_two_frames, create_empty_df2021pd.options.display.max_columns = None22pd.options.display.max_rows = None23pd.options.display.width = None2425init_logging()26logger = logging.getLogger(__name__)272829class TradingRobot:30def __init__(self):31self.order_service = OrderService(is_notification=True, can_open_orders=CAN_OPEN_ORDERS)32self.order_service.start()3334self.is_history_processed = True3536self.df_by_instrument = {}37self.strategy = {}38for instrument in INSTRUMENTS:39figi = instrument["figi"]40df_by_instrument = create_empty_df()41self.df_by_instrument[figi] = df_by_instrument4243file_path = get_file_path_by_instrument(instrument)44instrument_file = open(file_path, "a", newline='')45df_by_instrument.to_csv(instrument_file, mode="a", header=instrument_file.tell() == 0, index=False)4647profile_touch_strategy = ProfileTouchStrategy(instrument["name"])48profile_touch_strategy.start()49self.strategy[figi] = profile_touch_strategy5051# актуализация DataFrame из полученных ранее данных52async def sync_df(self, client):53self.is_history_processed = True54for instrument in INSTRUMENTS:55try:56figi = instrument["figi"]5758file_path = get_file_path_by_instrument(instrument)59self.df_by_instrument[figi] = pd.read_csv(file_path, sep=",")60self.df_by_instrument[figi]["time"] = pd.to_datetime(self.df_by_instrument[figi]["time"], utc=True)6162history_df = await self.get_history_trades(client, instrument)63self.df_by_instrument[figi] = merge_two_frames(self.df_by_instrument[figi], history_df)64except Exception as ex:65logger.error(ex)66self.is_history_processed = False6768# загрузка последних доступных обезличенных сделок69async def get_history_trades(self, client, instrument):70history_df = create_empty_df()71figi = instrument["figi"]72current_date = now()73time = 07475while True:76try:77interval_from = current_date - timedelta(minutes=time + ONE_HOUR_TO_MINUTES)78interval_to = current_date - timedelta(minutes=time)7980logger.info("instrument: %s, from: %s", instrument, interval_from)81logger.info("instrument: %s, to: %s", instrument, interval_to)8283response = await client.market_data.get_last_trades(84figi=figi,85from_=interval_from,86to=interval_to,87)88logger.info("instrument: %s, size: %s", instrument, len(response.trades))89if response is None or len(response.trades) == 0:90break9192for trade in response.trades:93processed_trade_df = processed_data(trade)94if processed_trade_df is not None:95history_df = pd.concat([history_df, processed_trade_df])96history_df = history_df.sort_values("time")97time += ONE_HOUR_TO_MINUTES98except Exception as ex:99logger.error(ex)100break101102return history_df103104# основной метод для обработки входящих данных105async def trades_stream(self, client):106temp_df = {}107for instrument in INSTRUMENTS:108temp_df[instrument["figi"]] = create_empty_df()109110try:111async for marketdata in client.market_data_stream.market_data_stream(112request_iterator(INSTRUMENTS)113):114if not is_open_exchange():115logger.info("торговый день завершен, сохранение статистики")116self.order_service.write_statistics()117# todo добавить выход из приложения118119logger.info(marketdata)120if marketdata is None:121continue122trade = marketdata.trade123if trade is None:124continue125126figi = trade.figi127instrument = next(item for item in INSTRUMENTS if item["figi"] == figi)128129processed_trade_df = processed_data(trade)130if processed_trade_df is not None:131# проверка позиций на закрытие по тейку/стопу132processed_trade = processed_trade_df.iloc[0]133price = processed_trade["price"]134time = processed_trade["time"]135self.order_service.processed_orders(instrument["name"], price, time)136137if self.is_history_processed is True:138# пока происходит обработка истории - новые данные складываю во временную переменную139next_df = [temp_df[figi], processed_trade_df]140temp_df[figi] = pd.concat(next_df, ignore_index=True)141else:142# есть проблема, когда исторические данные загрузились, но в real-time они не приходят143# тогда исторические данные не окажутся в файле144if len(temp_df[figi]) > 0:145# если после обработки истории успели накопить real-time данные,146# то подмерживаю их и очищаю временную переменную147next_df = [self.df_by_instrument[figi], temp_df[figi], processed_trade_df]148self.df_by_instrument[figi] = pd.concat(next_df, ignore_index=True)149temp_df[figi].drop(temp_df[figi].index, inplace=True)150151# отправляю обезличенные сделки на анализ152self.strategy[figi].set_df(self.df_by_instrument[figi])153154file_path = get_file_path_by_instrument(instrument)155self.df_by_instrument[figi].to_csv(file_path, mode="w", header=True, index=False)156else:157# отправляю обезличенную сделку на анализ158# (алгоритм анализа можно заменить на любой)159# если ТВ подтвердится, то возвращается структура сделки160orders = self.strategy[figi].analyze(processed_trade_df)161if orders is not None:162for order in orders:163self.order_service.create_order(order)164165next_df = [self.df_by_instrument[figi], processed_trade_df]166self.df_by_instrument[figi] = pd.concat(next_df, ignore_index=True)167168file_path = get_file_path_by_instrument(instrument)169processed_trade_df.to_csv(file_path, mode="a", header=False, index=False)170except Exception as ex:171logger.error(ex)172173async def main(self):174UserService().show_settings()175176async with AsyncClient(TOKEN) as client:177tasks = [asyncio.ensure_future(self.trades_stream(client)),178asyncio.ensure_future(self.sync_df(client))]179await asyncio.wait(tasks)180181182if __name__ == "__main__":183try:184robot = TradingRobot()185asyncio.run(robot.main())186except Exception as ex:187logger.error(ex)188189190