Path: blob/master/ invest-robot-contest_tinkoff-invest-volume-analysis-robot-master/utils/strategy_util.py
5931 views
import numpy as np1import pandas as pd2from tinkoff.invest import TradeDirection34from settings import PERCENTAGE_VOLUME_LEVEL_RANGE567# находится ли цена в процентном диапазоне8def is_price_in_range_cluster(9current_price: float,10cluster_price11) -> bool:12level_range = cluster_price * PERCENTAGE_VOLUME_LEVEL_RANGE / 10013increased_level = cluster_price + level_range14reduced_level = cluster_price - level_range15return reduced_level <= current_price <= increased_level161718# объединение двух DateFrame19def merge_two_frames(20source_df: pd.DataFrame,21new_df: pd.DataFrame22) -> pd.DataFrame:23if new_df is None or len(new_df) == 0:24return source_df25if source_df is None or len(source_df) == 0:26return new_df2728first_time = pd.to_datetime(new_df.iloc[0]["time"], utc=True)29last_time = pd.to_datetime(new_df.iloc[-1]["time"], utc=True)30search_condition = (source_df["time"] >= first_time) & (source_df["time"] <= last_time)3132result_df = source_df.drop(source_df.loc[search_condition].index)33result_df = pd.concat([result_df, new_df]).rename_axis("index")34return result_df.sort_values(["time", "index"]).reset_index(drop=True)353637# расчет процентного соотношения лонгистов/шортистов в свече38def calculate_ratio(candles: pd.DataFrame) -> pd.DataFrame:39difference = candles["high"] - candles["low"]40long_ratio = (candles["close"] - candles["low"]) / difference * 10041short_ratio = (candles["high"] - candles["close"]) / difference * 10042candles["long"] = long_ratio43candles["short"] = short_ratio4445# расчет расположения макс. объема относительно открытия свечи46from_high = abs(candles["high"] - candles["max_volume_price"])47from_low = abs(candles["max_volume_price"] - candles["low"])48total = from_high + from_low49candles.loc[candles["direction"] == TradeDirection.TRADE_DIRECTION_BUY, "percent"] = from_low / total * 10050candles.loc[candles["direction"] == TradeDirection.TRADE_DIRECTION_SELL, "percent"] = from_high / total * 1005152# определение победителя:53# если соотношение лонгистов больше и макс объем как можно ниже, то приоритет для лонга54# если соотношение шортистов больше и макс объем как можно выше, то приоритет для шорта55# todo вынести значения в константы56candles.loc[candles["direction"] == TradeDirection.TRADE_DIRECTION_BUY, "win"] = (candles["long"] > 50) & (57candles["percent"] <= 40)58candles.loc[candles["direction"] == TradeDirection.TRADE_DIRECTION_SELL, "win"] = (candles["short"] > 50) & (59candles["percent"] <= 40)6061return candles626364# агрегация данных для получения свечи65def agg_ohlc(df: pd.DataFrame) -> pd.Series:66if df.empty:67return pd.Series({68"low": np.nan,69"high": np.nan,70"open": np.nan,71"close": np.nan,72"total_volume": np.nan,73"max_volume_price": np.nan74})75price = df["price"].values76quantity = df["quantity"].values77return pd.Series({78"low": min(price),79"high": max(price),80"open": price[0],81"close": price[-1],82"total_volume": sum(quantity),83"max_volume_price": df.groupby(["price"])[["quantity"]].sum().idxmax()[0]84})858687# преобразование тиковых данных в свечи с максимальным объемом88def ticks_to_cluster(89df: pd.DataFrame,90period: str = "1min"91) -> pd.DataFrame:92candles = df.set_index(["time"])93candles = candles.resample(period).apply(agg_ohlc)94candles = candles.ffill()9596candles["time"] = candles.index97# свеча доджи98candles.loc[candles["close"] == candles["open"], "direction"] = TradeDirection.TRADE_DIRECTION_UNSPECIFIED99# бычья свеча100candles.loc[candles["close"] > candles["open"], "direction"] = TradeDirection.TRADE_DIRECTION_BUY101# медвежья свеча102candles.loc[candles["open"] > candles["close"], "direction"] = TradeDirection.TRADE_DIRECTION_SELL103104candles = candles[["time", "open", "close", "high", "low", "total_volume", "direction", "max_volume_price"]]105return candles.reset_index(drop=True)106107108# получение подходящего и не подходящего времени из сформированных109# точек входов для последующего отображения на графике110def processed_volume_levels_to_times(processed_volume_levels):111valid_entry_points = []112invalid_entry_points = []113if processed_volume_levels is None:114return valid_entry_points, invalid_entry_points115116for price, volume_level in processed_volume_levels.items():117for time, is_success in volume_level["times"].items():118if is_success:119valid_entry_points += [time]120else:121invalid_entry_points += [time]122123return valid_entry_points, invalid_entry_points124125126def apply_frame_type(df: pd.DataFrame) -> pd.DataFrame:127return df.astype({128"figi": "object",129"direction": "int64",130"price": "float64",131"quantity": "int64",132# "time": "datetime64[ms]",133})134135136def create_empty_df() -> pd.DataFrame:137df = pd.DataFrame(columns=["figi", "direction", "price", "quantity", "time"])138df.time = pd.to_datetime(df.time, unit="ms")139df.price = pd.to_numeric(df.price)140df.quantity = pd.to_numeric(df.quantity)141return df142143144