Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_tinkoffSDK-master/my_signal_executor.py
5925 views
1
import logging
2
from functools import singledispatchmethod
3
4
from tinkoff.invest.services import Services
5
from tinkoff.invest.strategies.base.errors import UnknownSignal
6
from tinkoff.invest.strategies.base.signal import (
7
CloseLongMarketOrder,
8
CloseShortMarketOrder,
9
OpenLongMarketOrder,
10
OpenShortMarketOrder,
11
Signal,
12
)
13
from tinkoff.invest.strategies.moving_average.strategy_settings import (
14
MovingAverageStrategySettings,
15
)
16
from tinkoff.invest.strategies.moving_average.strategy_state import (
17
MovingAverageStrategyState,
18
)
19
20
import my_signal_executor_base
21
from my_signal_executor_base import SignalExecutor
22
23
logger = logging.getLogger(__name__)
24
25
26
class MovingAverageSignalExecutor(SignalExecutor):
27
def __init__(
28
self,
29
services: Services,
30
state: MovingAverageStrategyState,
31
settings: MovingAverageStrategySettings,
32
):
33
super().__init__(services, settings)
34
self._services = services
35
self._state = state
36
37
@singledispatchmethod
38
def execute(self, signal: Signal) -> None:
39
raise UnknownSignal()
40
41
@execute.register
42
def _execute_open_long_market_order(self, signal: OpenLongMarketOrder) -> None:
43
self.execute_open_long_market_order(signal)
44
self._state.long_open = True
45
self._state.position = signal.lots
46
logger.info("Signal executed %s", signal)
47
48
@execute.register
49
def _execute_close_long_market_order(self, signal: CloseLongMarketOrder) -> None:
50
self.execute_close_long_market_order(signal)
51
self._state.long_open = False
52
self._state.position = 0
53
logger.info("Signal executed %s", signal)
54
55
@execute.register
56
def _execute_open_short_market_order(self, signal: OpenShortMarketOrder) -> None:
57
self.execute_open_short_market_order(signal)
58
self._state.short_open = True
59
self._state.position = signal.lots
60
logger.info("Signal executed %s", signal)
61
62
@execute.register
63
def _execute_close_short_market_order(self, signal: CloseShortMarketOrder) -> None:
64
self.execute_close_short_market_order(signal)
65
self._state.short_open = False
66
self._state.position = 0
67
logger.info("Signal executed %s", signal)
68
69