Path: blob/master/ invest-robot-contest_NeuroInvest-main/NeuroInvest/tinkoff_api.py
5929 views
from config import tinkoffAPIToken, scanInterval, tinkoffScanInterval12from tinkoff.investments import Environment, TinkoffInvestmentsRESTClient3from tinkoff.investments.client.exceptions import TinkoffInvestmentsUnavailableError, TinkoffInvestmentsConnectionError, \4TinkoffInvestmentsUnauthorizedError5from tinkoff.investments.utils.historical_data import HistoricalData67from asyncio import get_event_loop8from datetime import datetime9from trade_collection import TradeCollection1011class Tinkoff(TradeCollection):12__figi = None13__completeFunction = None1415def __init__(self, tinkoffTrade):16self.__figi = tinkoffTrade.value[0]["figi"]17super().__init__(tinkoffTrade.value[0]["name"], self.getData)1819# noinspection PyUnusedLocal20def getData(self, start, end, stepInSeconds, history, onUpdateData):21self.__completeFunction = onUpdateData2223startTime = datetime.utcfromtimestamp(int(start / scanInterval) * scanInterval)24endTime = datetime.utcfromtimestamp(int(end / scanInterval) * scanInterval)2526loop = get_event_loop()2728async def onComplete():29onUpdateData()3031async def prepareCandles(candle):32time = candle.time.timestamp()33value = (candle.h + candle.l) / 2.034history[time] = value35print(time, candle.time, value)3637async def getCandles():38try:39async with TinkoffInvestmentsRESTClient(40token = tinkoffAPIToken,41environment = Environment.SANDBOX) as client:42historical_data = HistoricalData(client)4344async for candle in historical_data.iter_candles(45figi = self.__figi,46dt_from = startTime,47dt_to = endTime,48interval = tinkoffScanInterval):49await prepareCandles(candle)5051except TinkoffInvestmentsUnauthorizedError:52print("Invalid token. Please check it in config.py!")53loop.stop()54await onComplete()5556except TinkoffInvestmentsConnectionError as message:57print("Connect error: " + str(message))58loop.stop()59await onComplete()6061except TinkoffInvestmentsUnavailableError as message:62print("Unavailable data: " + str(message))63loop.stop()64await onComplete()6566except Exception as message:67print("Unexpected error: " + str(message))68loop.stop()69await onComplete()707172loop.run_until_complete(getCandles())73loop.run_until_complete(onComplete())74return True757677