Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_tinkoffSDK-master/my_trader.py
5925 views
1
import logging
2
import time
3
from typing import Iterator, List
4
import time
5
from datetime import timedelta, datetime, timezone
6
7
import tinkoff
8
from tinkoff.invest import (
9
CandleInstrument,
10
InvestError,
11
MarketDataRequest,
12
MarketDataResponse,
13
SubscribeCandlesRequest,
14
SubscriptionAction,
15
OrderDirection,
16
OrderType,
17
RequestError,
18
)
19
from tinkoff.invest.services import Services
20
#from tinkoff.invest.strategies.base.account_manager import AccountManager
21
from tinkoff.invest.strategies.base.errors import MarketDataNotAvailableError
22
from tinkoff.invest.strategies.base.event import DataEvent, SignalEvent
23
from tinkoff.invest.strategies.base.models import CandleEvent
24
from tinkoff.invest.strategies.base.signal import CloseSignal, OpenSignal, Signal,OpenLongMarketOrder,CloseLongMarketOrder
25
#from tinkoff.invest.strategies.base.signal_executor_base import SignalExecutor
26
from tinkoff.invest.strategies.base.trader_base import Trader
27
#from tinkoff.invest.strategies.moving_average.strategy import MovingAverageStrategy
28
from tinkoff.invest.strategies.moving_average.strategy_settings import (
29
MovingAverageStrategySettings,
30
)
31
from tinkoff.invest.strategies.moving_average.strategy_state import (
32
MovingAverageStrategyState,
33
)
34
from tinkoff.invest.strategies.moving_average.supervisor import (
35
MovingAverageStrategySupervisor,
36
)
37
from tinkoff.invest.utils import (
38
candle_interval_to_subscription_interval,
39
floor_datetime,
40
now,
41
)
42
import my_signal_executor_base
43
from my_signal_executor_base import SignalExecutor
44
from my_account_manager import AccountManager
45
import my_moving_average
46
import my_strategy
47
from my_strategy import MovingAverageStrategy
48
49
50
logger = logging.getLogger(__name__)
51
52
53
class MovingAverageStrategyTrader(Trader):
54
def __init__(
55
self,
56
strategy: MovingAverageStrategy,
57
settings: MovingAverageStrategySettings,
58
services: Services,
59
state: MovingAverageStrategyState,
60
signal_executor: SignalExecutor,
61
account_manager: AccountManager,
62
supervisor: MovingAverageStrategySupervisor,
63
):
64
super().__init__(strategy, services, settings)
65
self._settings: MovingAverageStrategySettings = settings
66
self._strategy = strategy
67
self._services = services
68
self._data: List[CandleEvent]
69
self._market_data_stream: Iterator[MarketDataResponse]
70
self._state = state
71
self._signal_executor = signal_executor
72
self._account_manager = account_manager
73
self._supervisor = supervisor
74
75
self._data = list(
76
self._load_candles(self._settings.short_period + self._settings.long_period)
77
)
78
for candle_event in self._data:
79
self._supervisor.notify(self._convert_to_data_event(candle_event))
80
self._ensure_marginal_trade_active()
81
82
self._subscribe()
83
84
self._strategy.fit(self._data)
85
86
def _ensure_marginal_trade_active(self) -> None:
87
self._account_manager.ensure_marginal_trade()
88
89
def _subscribe(self):
90
current_instrument = CandleInstrument(
91
figi=self._settings.share_id,
92
interval=candle_interval_to_subscription_interval(
93
self._settings.candle_interval
94
),
95
)
96
candle_subscribe_request = MarketDataRequest(
97
subscribe_candles_request=SubscribeCandlesRequest(
98
subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
99
instruments=[current_instrument],
100
)
101
)
102
103
def request_iterator():
104
yield candle_subscribe_request
105
while True:
106
time.sleep(1)
107
108
self._market_data_stream = self._services.market_data_stream.market_data_stream(
109
request_iterator()
110
)
111
112
def _is_candle_fresh(self, candle: tinkoff.invest.Candle) -> bool:
113
is_fresh_border = floor_datetime(
114
now(), delta=self._settings.candle_interval_timedelta
115
)
116
logger.debug(
117
"Checking if candle is fresh: candle.time=%s > is_fresh_border=%s %s)",
118
candle.time,
119
is_fresh_border,
120
candle.time >= is_fresh_border,
121
)
122
return candle.time >= is_fresh_border
123
124
@staticmethod
125
def _convert_to_data_event(candle_event: CandleEvent) -> DataEvent:
126
return DataEvent(candle_event=candle_event, time=candle_event.time)
127
128
def _make_observations(self) -> None:
129
while True:
130
market_data_response: MarketDataResponse = next(self._market_data_stream)
131
logger.debug("got market_data_response: %s", market_data_response)
132
if market_data_response.candle is None:
133
logger.debug("market_data_response didn't have candle")
134
continue
135
candle = market_data_response.candle
136
logger.debug("candle extracted: %s", candle)
137
candle_event = self._convert_candle(candle)
138
self._strategy.observe(candle_event)
139
self._supervisor.notify(self._convert_to_data_event(candle_event))
140
if self._is_candle_fresh(candle):
141
logger.info("Data refreshed")
142
break
143
144
def _refresh_data(self) -> None:
145
logger.info("Refreshing data")
146
try:
147
self._make_observations()
148
except StopIteration as e:
149
logger.info("Fresh quotations not available")
150
raise MarketDataNotAvailableError() from e
151
152
def _filter_closing_signals(self, signals: List[Signal]) -> List[Signal]:
153
return list(filter(lambda signal: isinstance(signal, CloseSignal), signals))
154
155
def _filter_opening_signals(self, signals: List[Signal]) -> List[Signal]:
156
return list(filter(lambda signal: isinstance(signal, OpenSignal), signals))
157
158
def _execute(self, signal: Signal) -> None:
159
logger.info("Trying to execute signal %s", signal)
160
try:
161
self._signal_executor.execute(signal)
162
except RequestError:
163
was_executed = False
164
logger.info("Signal was_executed = False")
165
logger.info("RequestError.__cause__ %s", RequestError.__cause__)
166
logger.info("RequestError.__context__ %s", RequestError.__context__)
167
logger.info("RequestError.code %s", RequestError.code)
168
logger.info("RequestError.details %s", RequestError.details)
169
except:
170
was_executed = False
171
else:
172
was_executed = True
173
self._supervisor.notify(
174
SignalEvent(signal=signal, was_executed=was_executed, time=now())
175
)
176
177
#if len(self._data) != 0:
178
# order_id = "AndreiSoiko_" + str(datetime.now().microsecond) + "_" + str(datetime.now().second)
179
180
#Это отладочный код для выставления ордеров, оставлю пока здесь, потому что _signal_executor.execute может работать с ошибками из-за order_id,
181
# а может по другим причинам.
182
# if isinstance(signal,OpenLongMarketOrder):
183
# post_order_response = self._services.orders.post_order(
184
# figi= self._strategy._settings.share_id,
185
# quantity=1,
186
# #price= self._data.pop().candle.close,
187
# direction = OrderDirection(1), #ORDER_DIRECTION_BUY = 1
188
# account_id=self._settings.account_id,
189
# order_type= OrderType(2), #ORDER_TYPE_MARKET = 2
190
# order_id= order_id ,
191
# )
192
# logger.info("post_order_response %s", post_order_response)
193
# elif isinstance(signal,CloseLongMarketOrder):
194
# post_order_response = self._services.orders.post_order(
195
# figi= self._strategy._settings.share_id,
196
# #quantity=self._strategy._state.position, #Количество можно передовать из стратегии.
197
# quantity=1,
198
# #price= self._data.pop().candle.close,
199
# direction = OrderDirection(2), #ORDER_DIRECTION_SELL = 2
200
# account_id=self._settings.account_id,
201
# order_type= OrderType(2), #ORDER_TYPE_MARKET = 2
202
# order_id= order_id ,
203
# )
204
205
# logger.info("post_order_response %s", post_order_response)
206
207
208
209
210
def _get_signals(self) -> List[Signal]:
211
signals = list(self._strategy.predict())
212
return [
213
*self._filter_closing_signals(signals),
214
*self._filter_opening_signals(signals),
215
]
216
217
def trade(self) -> None:
218
"""Делает попытку следовать стратегии."""
219
220
logger.info("Balance: %s", self._account_manager.get_current_balance())
221
self._refresh_data()
222
223
signals = self._get_signals()
224
if signals:
225
logger.info("Got signals %s", signals)
226
for signal in signals:
227
self._execute(signal)
228
if self._state.position == 0:
229
logger.info("Trade try complete")
230