SharedPriprema DataSeta / utility.pyOpen in CoCalc
import numpy as np
import sys
import inspect
import os
import pandas
import matplotlib.pyplot as plt
from sklearn.svm import SVR
from sklearn import model_selection, tree, neighbors, ensemble, utils
from sklearn.metrics import mean_squared_error
from sklearn.kernel_ridge import KernelRidge

def error_mean_square_distance(x, y):
return mean_squared_error(x, y)

def test_regression(x_train, y_train, x_test, y_test, model_fit=None):
"""Check regression for various parameter settings."""
mod_pred = None
rng = utils.check_random_state(0)
msg_info = """type: %s;
mse: %f;
Parameters: %s with a score of %0.2f;"""
y_pred = None
error_pred = None
c = [0.1]#np.linspace(0.1, 2000.0, num=3)
gamma = np.linspace(0.0, 5.0, num=3)
grid = model_selection.ParameterGrid({"max_samples": [0.5, 1.0],
"max_features": [0.5, 1.0],
"bootstrap": [True, False],
"bootstrap_features": [True, False]})
if model_fit == "SVR":
grid_svr = [{'kernel': ['linear'], 'C': c},
{'kernel': ['rbf'], 'C': c, 'gamma': gamma},
{'kernel': ['sigmoid'], 'C': c, 'gamma': gamma}]
base_estimator = model_selection.GridSearchCV(SVR(), cv=5, param_grid=grid_svr)
elif model_fit == "KNR":
grid_knr = [{'weights': ['uniform', 'distance'], 'algorithm': ['auto'], 'p': [2]}]
base_estimator = model_selection.GridSearchCV(neighbors.KNeighborsRegressor(), param_grid=grid_knr)
elif model_fit == "DTR":
base_estimator = tree.DecisionTreeRegressor()
elif model_fit == "KRR":
grid_krr = [{'kernel': ['linear'], 'alpha': 1/(2.*c)},
{'kernel': ['rbf'], 'alpha': 1/(2.*c), 'gamma': gamma},
{'kernel': ['sigmoid'], 'alpha': 1/(2.*c), "gamma": gamma}]
base_estimator = model_selection.GridSearchCV(KernelRidge(), cv=5, param_grid=grid_krr)
else:
base_estimator = None
for params in grid:
mod_pred_temp = ensemble.BaggingRegressor(base_estimator=base_estimator, random_state=rng, **params)
mod_pred_temp.fit(x_train, y_train)
y_pred_temp = mod_pred_temp.predict(x_test)
error_pred_temp = error_mean_square_distance(y_test, y_pred_temp)
if error_pred and error_pred > error_pred_temp:
error_pred = error_pred_temp
y_pred = y_pred_temp
mod_pred = mod_pred_temp
if error_pred is None:
error_pred = error_pred_temp
y_pred = y_pred_temp
mod_pred = mod_pred_temp
if mod_pred is not None:
print(msg_info % (model_fit, error_pred, mod_pred.get_params(), mod_pred.score(x_test, y_test)))
return y_pred

def get_file_list_from_root_dir(root_dir, extension=".xlsx"):
file_list = []
for file in os.listdir(root_dir):
if file.endswith(extension):
file_list.append(os.path.join(root_dir, file))
return file_list

def collect_data_from_file_list(file_list, sheet_name):
data = {}
for path in file_list:
base_name_without_ext = os.path.basename(path)
base_name_without_ext = os.path.splitext(base_name_without_ext)[0]
try:
sheetname=sheet_name).to_dict(orient='index')
except Exception as e:
print(e, base_name_without_ext)
pass
return data

def extract_vectors_of_specific_data(data, st_name_t_stamps, mark):
vecs = []
st_name_t_stamps_len = len(st_name_t_stamps)
for stock_name, t_stamp in st_name_t_stamps:
if st_name_t_stamps_len != 1:
vecs.append(extract_vectors_of_specific_data_1d(data[stock_name], t_stamp, mark))
else:
vecs = extract_vectors_of_specific_data_1d(data[stock_name], t_stamp, mark)
break
return vecs

def extract_vectors_of_specific_data_1d(data, st_name_t_stamps, mark):
vec = []
for t_stamp in st_name_t_stamps:
vec.append(data[t_stamp][mark])
return vec

def plot_data(x, y, title="", xlabel="", ylabel=""):
plt.figure(figsize=(20, 5))
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.plot(x, y)
plt.show()

def period_error(period, length):
if length < period or period < 0:
try:
caller_name = inspect.currentframe().f_back.f_code.co_name
error_massage = '\n'.join((
"Function name: " + caller_name,
"Length of data is: %d" % length,
"Period is: %d" % period,
"We need period to be grather then 0 and less then length of data"
))
raise ValueError(error_massage)
except Exception as error:
print("Error occured: " + str(error) + "\nEXIT")
sys.exit(-1)

def relative_strength_index(close_prices_vecs, period):
"""
Relative Strength Index (RSI): A technical momentum indicator that compares
the magnitude of recent gains to recent losses in an attempt to determine
overbought and oversold conditions of an asset. The formula for computing
the Relative Strength Index is as follows.
:return: [RSI = 100-[100/(1+RS)]]
where  RS = Avg. of x days’ up closes / Average of x days’ down closes.
"""
rsi = []
for prices_vec in close_prices_vecs:
if np.shape(prices_vec):
rsi.append(relative_strength_index_1d(prices_vec, period))
else:
rsi = relative_strength_index_1d(close_prices_vecs, period)
break
return rsi

def relative_strength_index_1d(close_prices_vec, period):
len_cl_prices_vec = len(close_prices_vec)
period_error(period, len_cl_prices_vec)
deltas = np.diff(close_prices_vec)
seed = deltas[:period]
up = seed[seed >= 0].sum() / period
down = -seed[seed < 0].sum() / period
rsi = [0]*(len_cl_prices_vec - period)
for i in range(0, len_cl_prices_vec - period):
delta = deltas[i + period - 1]
up = (up * (period - 1) + (delta if delta >= 0 else 0)) / period
down = (down * (period - 1) + (-delta if delta < 0 else 0)) / period
rs = up / down if down != 0 else 100
rsi[i] = 100. - 100. / (1. + rs)
return rsi

def money_flow_index(typc_prices_vecs, volume_data_vecs, period):
"""
Money Flow Index (MFI): This one measures the strength of money in and out
of a security. The formula for MFI is as follows:
Money Flow (MF) = Typical Price * Volume.
Money Ratio (MR) = (Positive MF / Negative MF).
:return: MFI = 100 – (100/ (1+MR)).
"""
mfi = []
for typical_prices_vec, volume_data_vec in zip(typc_prices_vecs, volume_data_vecs):
if np.shape(typical_prices_vec):
mfi.append(money_flow_index_1d(typical_prices_vec, volume_data_vec, period))
else:
mfi = money_flow_index_1d(typc_prices_vecs, volume_data_vecs, period)
break
return mfi

def money_flow_index_1d(typical_prices_vec, volume_data_vec, period):
len_ty_prices_vec = len(typical_prices_vec)
period_error(period, len_ty_prices_vec)
deltas = np.diff(typical_prices_vec)
raw_money_flow = np.array([typ*vol for typ, vol in zip(typical_prices_vec[1:], volume_data_vec)])
mfi = [0]*(len_ty_prices_vec - period)
for i in range(0, len_ty_prices_vec - period):
seed = deltas[i:i + period]
up = raw_money_flow[i:i + period][seed >= 0].sum() / period
down = raw_money_flow[i:i + period][seed < 0].sum() / period
mr = up / down if down != 0 else 100
mfi[i] = 100. - 100. / (1. + mr)
return mfi

def typical_prices_vecs(price_list_high, price_list_low, price_list_close):
typ = []
for high, low, close in zip(price_list_high, price_list_low, price_list_close):
if np.shape(high):
typ.append(typical_prices_vecs_1d(high, low, close))
else:
typ = typical_prices_vecs_1d(price_list_high, price_list_low, price_list_close)
break
return typ

def typical_prices_vecs_1d(price_list_high, price_list_low, price_list_close):
return [np.mean([high, low, close]) for high, low, close in
zip(price_list_high, price_list_low, price_list_close)]

def exponential_moving_average(close_price_vecs, period):
"""
Exponential Moving Average (EMA): This indicator
returns the exponential moving average of a field over a
given period of time. EMA formula is as follows.
:return: EMA = [alpha *T Close] + [1-alpha *Y close]
Where T is Today’s close and Y is Yesterday’s close
"""
ema = []
for close_price_vec in close_price_vecs:
if np.shape(close_price_vec):
ema.append(exponential_moving_average_1d(close_price_vec, period))
else:
ema = exponential_moving_average_1d(close_price_vecs, period)
break
return ema

def exponential_moving_average_1d(close_prices_vec, period):
len_cl_prices_vec = len(close_prices_vec)
period_error(period, len_cl_prices_vec)
close_prices_vec = np.asarray(close_prices_vec)
weights = np.exp(np.linspace(-1., 0., period))
weights /= weights.sum()
ema = np.convolve(close_prices_vec, weights, mode='full')[:len(close_prices_vec)]
ema = ema[period:]
return ema

def stochastic_oscillator(high_prices_vecs, low_prices_vecs, close_prices_vecs, period):
"""
Stochastic Oscillator (SO): The stochastic oscillator
defined as a measure of the difference between the
current closing price of a security and its lowest low
price, relative to its highest high price for a given period
of time. The formula for this computation is as follows:
:return: %K = [(Close price – Lowest price) / (Highest Price – Lowest Price)] * 100
"""
so = []
for high_prices_vec, low_prices_vec, close_prices_vec in zip(high_prices_vecs, low_prices_vecs, close_prices_vecs):
if np.shape(high_prices_vec):
so.append(stochastic_oscillator_1d(high_prices_vec, low_prices_vec, close_prices_vec, period))
else:
so = stochastic_oscillator_1d(high_prices_vecs, low_prices_vecs, close_prices_vecs, period)
break
return so

def stochastic_oscillator_1d(high_prices_vec, low_prices_vec, close_prices_vec, period):
len_high_prices_vec = len(high_prices_vec)
period_error(period, len_high_prices_vec)
max_high_vec = [max(high_prices_vec[i:i + period+1]) for i in range(0, len_high_prices_vec - period)]
min_low_vec = [min(low_prices_vec[i:i + period+1]) for i in range(0, len_high_prices_vec - period)]
curr_low_sub = np.subtract(close_prices_vec[period:], min_low_vec)
high_min_sub = np.subtract(max_high_vec, min_low_vec)
return [(curr_low_s / high_min_s) * 100 if high_min_s != 0 else 100
for curr_low_s, high_min_s in zip(curr_low_sub, high_min_sub)]

def moving_average_convergence_divergence(close_prices_vecs, slow_period=26, fast_period=12):
"""
Moving Average Convergence/Divergence (MACD):
This function calculates difference between a short and a long term moving average for a field.
The formulas for calculating MACD.
:return: MACD = [EMA of Closing prices] - [EMA of closing prices]
"""
macd = []
for close_prices_vec in close_prices_vecs:
if np.shape(close_prices_vec):
macd.append(moving_average_convergence_divergence_1d(close_prices_vec, slow_period, fast_period))
else:
macd = moving_average_convergence_divergence_1d(close_prices_vecs, slow_period, fast_period)
break
return macd

def moving_average_convergence_divergence_1d(close_prices_vec, slow_period=26, fast_period=12):
len_cl_prices_vec = len(close_prices_vec)
period_error(slow_period, len_cl_prices_vec)
period_error(fast_period, len_cl_prices_vec)
emaslow = exponential_moving_average(close_prices_vec, slow_period)
emafast = exponential_moving_average(close_prices_vec, fast_period)
return np.subtract(emafast[len(emafast) - len(emaslow):], emaslow)

def signal_line(macd_data_vecs, period=9):
"""
:return: Signal Line = period day EMA of MACD
"""
sl = []
for macd_data_vec in macd_data_vecs:
if np.shape(macd_data_vec):
sl.append(signal_line_1d(macd_data_vec, period))
else:
sl = signal_line_1d(macd_data_vecs, period)
break
return sl

def signal_line_1d(macd_data_vec, period=9):
len_macd_data_vec = len(macd_data_vec)
period_error(period, len_macd_data_vec)
return exponential_moving_average(macd_data_vec, period)