Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_tinkoff-invest-volume-analysis-robot-master/strategies/profile_touch_strategy.py
5932 views
1
import datetime
2
import logging
3
import threading
4
from typing import List, Optional
5
6
import pandas as pd
7
from tinkoff.invest import TradeDirection, OrderDirection
8
9
from constants import FIVE_MINUTES_TO_SECONDS
10
from domains.order import Order
11
from utils.exchange_util import is_open_orders, is_premarket_time
12
from visualizers.finplot_graph import FinplotGraph
13
from settings import PROFILE_PERIOD, FIRST_TOUCH_VOLUME_LEVEL, SECOND_TOUCH_VOLUME_LEVEL, FIRST_GOAL, \
14
PERCENTAGE_STOP_LOSS, SIGNAL_CLUSTER_PERIOD, IS_SHOW_CHART, GOAL_STEP, COUNT_LOTS, COUNT_GOALS
15
from utils.order_util import prepare_orders
16
from utils.strategy_util import is_price_in_range_cluster, ticks_to_cluster, calculate_ratio, \
17
processed_volume_levels_to_times, apply_frame_type
18
19
pd.options.display.max_columns = None
20
pd.options.display.max_rows = None
21
pd.options.display.width = None
22
23
logger = logging.getLogger(__name__)
24
25
26
# стратегия касание объемного уровня
27
class ProfileTouchStrategy(threading.Thread):
28
def __init__(self, instrument_name):
29
super().__init__()
30
31
self.instrument_name = instrument_name
32
33
self.df = pd.DataFrame(columns=["figi", "direction", "price", "quantity", "time"])
34
self.df = apply_frame_type(self.df)
35
36
self.first_tick_time = None
37
self.fix_date = {}
38
self.clusters = None
39
self.processed_volume_levels = {}
40
41
if IS_SHOW_CHART:
42
self.visualizer = FinplotGraph(SIGNAL_CLUSTER_PERIOD)
43
self.visualizer.start()
44
45
def set_df(self, df: pd.DataFrame):
46
self.df = df
47
logger.info("загружен новый DataFrame")
48
49
def analyze(
50
self,
51
trade_df: pd.DataFrame
52
) -> Optional[List[Order]]:
53
trade_data = trade_df.iloc[0]
54
current_price = trade_data["price"]
55
time = trade_data["time"]
56
57
if is_premarket_time(time):
58
return
59
60
if PROFILE_PERIOD not in self.fix_date:
61
self.fix_date[PROFILE_PERIOD] = time.hour
62
63
if self.first_tick_time is None:
64
# сбрасываю секунды, чтобы сравнивать "целые" минутные свечи
65
self.first_tick_time = time.replace(second=0, microsecond=0)
66
67
if self.fix_date[PROFILE_PERIOD] < time.hour:
68
# построение кластерных свечей и графика раз в 1 час
69
self.fix_date[PROFILE_PERIOD] = time.hour
70
self.calculate_clusters()
71
72
self.df = pd.concat([self.df, trade_df])
73
74
if self.clusters is not None:
75
for index, cluster in self.clusters.iterrows():
76
cluster_time = cluster["time"]
77
cluster_price = cluster["max_volume_price"]
78
79
# цена может коснуться объемного уровня в заданном процентном диапазоне
80
is_price_in_range = is_price_in_range_cluster(current_price, cluster_price)
81
if is_price_in_range:
82
timedelta = time - cluster_time
83
if timedelta < datetime.timedelta(minutes=FIRST_TOUCH_VOLUME_LEVEL):
84
continue
85
86
if cluster_price not in self.processed_volume_levels:
87
# инициализация первого касания уровня
88
self.processed_volume_levels[cluster_price] = {}
89
self.processed_volume_levels[cluster_price]["count_touches"] = 0
90
self.processed_volume_levels[cluster_price]["times"] = {}
91
else:
92
# обработка второго и последующего касания уровня на основе времени последнего касания
93
if self.processed_volume_levels[cluster_price]["last_touch_time"] is not None:
94
timedelta = time - self.processed_volume_levels[cluster_price]["last_touch_time"]
95
if timedelta < datetime.timedelta(minutes=SECOND_TOUCH_VOLUME_LEVEL):
96
continue
97
98
# установка параметров при касании уровня
99
self.processed_volume_levels[cluster_price]["count_touches"] += 1
100
self.processed_volume_levels[cluster_price]["last_touch_time"] = time
101
self.processed_volume_levels[cluster_price]["times"][time] = None
102
103
count_touches = self.processed_volume_levels[cluster_price]['count_touches']
104
logger.info("объемный уровень %s сформирован %s", cluster_price, cluster_time)
105
logger.info("время %s: цена %s подошла к объемному уровню %s раз\n", time, current_price, count_touches)
106
break
107
108
if (time - self.first_tick_time).total_seconds() >= FIVE_MINUTES_TO_SECONDS:
109
# сбрасываю секунды, чтобы сравнивать завершенные свечи
110
self.first_tick_time = time.replace(second=0, microsecond=0)
111
# если торги доступны, то каждую завершенную минуту проверяю кластера на возможную ТВ
112
if is_open_orders(time) and len(self.processed_volume_levels) > 0:
113
return self.check_entry_points(current_price, time)
114
115
def calculate_clusters(self):
116
if self.df.empty:
117
return
118
self.clusters = ticks_to_cluster(self.df, period=PROFILE_PERIOD)
119
valid_entry_points, invalid_entry_points = processed_volume_levels_to_times(
120
self.processed_volume_levels)
121
if IS_SHOW_CHART:
122
self.visualizer.render(self.df,
123
valid_entry_points=valid_entry_points,
124
invalid_entry_points=invalid_entry_points,
125
clusters=self.clusters)
126
127
def check_entry_points(
128
self,
129
current_price: float,
130
time: datetime
131
) -> Optional[List[Order]]:
132
for volume_price, volume_level in self.processed_volume_levels.items():
133
for touch_time, value in volume_level["times"].items():
134
if value is not None:
135
continue
136
candles = ticks_to_cluster(self.df, period=SIGNAL_CLUSTER_PERIOD)
137
candles = calculate_ratio(candles)
138
prev_candle = candles.iloc[-3]
139
current_candle = candles.iloc[-2]
140
if current_candle.empty or prev_candle.empty:
141
logger.error("свеча не найдена")
142
continue
143
144
if current_candle["win"] is True:
145
# если свеча является сигнальной, то осуществляю сделку
146
max_volume_price = current_candle["max_volume_price"]
147
percent = (max_volume_price * PERCENTAGE_STOP_LOSS / 100)
148
self.processed_volume_levels[volume_price]["times"][touch_time] = True
149
150
if current_candle["direction"] == TradeDirection.TRADE_DIRECTION_BUY:
151
if prev_candle["open"] < current_candle["open"]:
152
logger.info("пропуск входа - предыдущая свеча открылась ниже текущей")
153
return
154
155
# todo условие дает плохое соотношение
156
# если подошли к объемному уровню снизу вверх на лонговой свече, то пропускаю вход
157
# if current_candle['close'] < volume_price:
158
# logger.info(f"пропуск входа - цена закрытия ниже объемного уровня, time={time}, price={current_price}")
159
# return
160
161
if current_price < max_volume_price:
162
logger.info(
163
"пропуск входа - цена открытия ниже макс объема в сигнальной свече, время %s, цена %s",
164
time,
165
current_price
166
)
167
return
168
169
stop = max_volume_price - percent
170
orders = self.prepare_orders(
171
current_price=current_price,
172
time=time,
173
stop=stop,
174
direction=OrderDirection.ORDER_DIRECTION_BUY
175
)
176
logger.info("подтверждена точка входа в лонг, ордера: %s", orders)
177
return orders
178
179
else:
180
if prev_candle["open"] > current_candle["open"]:
181
logger.info("пропуск входа - предыдущая свеча открылась выше текущей")
182
return
183
184
# todo условие дает плохое соотношение
185
# если подошли к объемному уровню сверху вниз на шортовой свече, то пропускаю вход
186
# if current_candle['close'] > volume_price:
187
# logger.info(f"пропуск входа - цена закрытия выше объемного уровня, time={time}, price={current_price}")
188
# return
189
190
if current_price > max_volume_price:
191
logger.info(
192
"пропуск входа - цена открытия выше макс объема в сигнальной свече, время %s, цена %s",
193
time,
194
current_price
195
)
196
return
197
198
stop = max_volume_price + percent
199
orders = self.prepare_orders(
200
current_price=current_price,
201
time=time,
202
stop=stop,
203
direction=OrderDirection.ORDER_DIRECTION_SELL
204
)
205
logger.info("подтверждена точка входа в шорт, ордера: %s", orders)
206
return orders
207
208
else:
209
# если текущая свеча не сигнальная, то ожидаю следующую для возможного входа
210
self.processed_volume_levels[volume_price]["times"][touch_time] = False
211
self.processed_volume_levels[volume_price]["last_touch_time"] = None
212
213
def prepare_orders(
214
self,
215
current_price: float,
216
time: datetime,
217
stop: float,
218
direction: OrderDirection
219
) -> List[Order]:
220
return prepare_orders(
221
instrument=self.instrument_name,
222
current_price=current_price,
223
time=time,
224
stop_loss=stop,
225
direction=direction,
226
count_lots=COUNT_LOTS,
227
count_goals=COUNT_GOALS,
228
goal_step=GOAL_STEP,
229
first_goal=FIRST_GOAL
230
)
231
232