Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_NeuroInvest-main/NeuroInvest/tinkoff_api.py
5929 views
1
from config import tinkoffAPIToken, scanInterval, tinkoffScanInterval
2
3
from tinkoff.investments import Environment, TinkoffInvestmentsRESTClient
4
from tinkoff.investments.client.exceptions import TinkoffInvestmentsUnavailableError, TinkoffInvestmentsConnectionError, \
5
TinkoffInvestmentsUnauthorizedError
6
from tinkoff.investments.utils.historical_data import HistoricalData
7
8
from asyncio import get_event_loop
9
from datetime import datetime
10
from trade_collection import TradeCollection
11
12
class Tinkoff(TradeCollection):
13
__figi = None
14
__completeFunction = None
15
16
def __init__(self, tinkoffTrade):
17
self.__figi = tinkoffTrade.value[0]["figi"]
18
super().__init__(tinkoffTrade.value[0]["name"], self.getData)
19
20
# noinspection PyUnusedLocal
21
def getData(self, start, end, stepInSeconds, history, onUpdateData):
22
self.__completeFunction = onUpdateData
23
24
startTime = datetime.utcfromtimestamp(int(start / scanInterval) * scanInterval)
25
endTime = datetime.utcfromtimestamp(int(end / scanInterval) * scanInterval)
26
27
loop = get_event_loop()
28
29
async def onComplete():
30
onUpdateData()
31
32
async def prepareCandles(candle):
33
time = candle.time.timestamp()
34
value = (candle.h + candle.l) / 2.0
35
history[time] = value
36
print(time, candle.time, value)
37
38
async def getCandles():
39
try:
40
async with TinkoffInvestmentsRESTClient(
41
token = tinkoffAPIToken,
42
environment = Environment.SANDBOX) as client:
43
historical_data = HistoricalData(client)
44
45
async for candle in historical_data.iter_candles(
46
figi = self.__figi,
47
dt_from = startTime,
48
dt_to = endTime,
49
interval = tinkoffScanInterval):
50
await prepareCandles(candle)
51
52
except TinkoffInvestmentsUnauthorizedError:
53
print("Invalid token. Please check it in config.py!")
54
loop.stop()
55
await onComplete()
56
57
except TinkoffInvestmentsConnectionError as message:
58
print("Connect error: " + str(message))
59
loop.stop()
60
await onComplete()
61
62
except TinkoffInvestmentsUnavailableError as message:
63
print("Unavailable data: " + str(message))
64
loop.stop()
65
await onComplete()
66
67
except Exception as message:
68
print("Unexpected error: " + str(message))
69
loop.stop()
70
await onComplete()
71
72
73
loop.run_until_complete(getCandles())
74
loop.run_until_complete(onComplete())
75
return True
76
77