SharedPriprema DataSeta / utility.pyOpen in CoCalc
import numpy as np
import sys
import inspect
import os
import pandas
import matplotlib.pyplot as plt
from sklearn.svm import SVR
from sklearn import model_selection, tree, neighbors, ensemble, utils
from sklearn.metrics import mean_squared_error
from sklearn.kernel_ridge import KernelRidge


def error_mean_square_distance(x, y):
    return mean_squared_error(x, y)


def test_regression(x_train, y_train, x_test, y_test, model_fit=None):
    """Check regression for various parameter settings."""
    mod_pred = None
    rng = utils.check_random_state(0)
    msg_info = """type: %s;
        mse: %f;
        Parameters: %s with a score of %0.2f;"""
    y_pred = None
    error_pred = None
    c = [0.1]#np.linspace(0.1, 2000.0, num=3)
    gamma = np.linspace(0.0, 5.0, num=3)
    grid = model_selection.ParameterGrid({"max_samples": [0.5, 1.0],
                                          "max_features": [0.5, 1.0],
                                          "bootstrap": [True, False],
                                          "bootstrap_features": [True, False]})
    if model_fit == "SVR":
        grid_svr = [{'kernel': ['linear'], 'C': c},
                      {'kernel': ['rbf'], 'C': c, 'gamma': gamma},
                      {'kernel': ['sigmoid'], 'C': c, 'gamma': gamma}]
        base_estimator = model_selection.GridSearchCV(SVR(), cv=5, param_grid=grid_svr)
    elif model_fit == "KNR":
        grid_knr = [{'weights': ['uniform', 'distance'], 'algorithm': ['auto'], 'p': [2]}]
        base_estimator = model_selection.GridSearchCV(neighbors.KNeighborsRegressor(), param_grid=grid_knr)
    elif model_fit == "DTR":
        base_estimator = tree.DecisionTreeRegressor()
    elif model_fit == "KRR":
        grid_krr = [{'kernel': ['linear'], 'alpha': 1/(2.*c)},
                    {'kernel': ['rbf'], 'alpha': 1/(2.*c), 'gamma': gamma},
                    {'kernel': ['sigmoid'], 'alpha': 1/(2.*c), "gamma": gamma}]
        base_estimator = model_selection.GridSearchCV(KernelRidge(), cv=5, param_grid=grid_krr)
    else:
        base_estimator = None
    for params in grid:
            mod_pred_temp = ensemble.BaggingRegressor(base_estimator=base_estimator, random_state=rng, **params)
            mod_pred_temp.fit(x_train, y_train)
            y_pred_temp = mod_pred_temp.predict(x_test)
            error_pred_temp = error_mean_square_distance(y_test, y_pred_temp)
            if error_pred and error_pred > error_pred_temp:
                error_pred = error_pred_temp
                y_pred = y_pred_temp
                mod_pred = mod_pred_temp
            if error_pred is None:
                error_pred = error_pred_temp
                y_pred = y_pred_temp
                mod_pred = mod_pred_temp
    if mod_pred is not None:
        print(msg_info % (model_fit, error_pred, mod_pred.get_params(), mod_pred.score(x_test, y_test)))
    return y_pred


def get_file_list_from_root_dir(root_dir, extension=".xlsx"):
    file_list = []
    for file in os.listdir(root_dir):
        if file.endswith(extension):
            file_list.append(os.path.join(root_dir, file))
    return file_list


def collect_data_from_file_list(file_list, sheet_name):
    data = {}
    for path in file_list:
        base_name_without_ext = os.path.basename(path)
        base_name_without_ext = os.path.splitext(base_name_without_ext)[0]
        try:
            data[base_name_without_ext] = pandas.read_excel(open(path, 'rb'), index_col=0,
                                                            sheetname=sheet_name).to_dict(orient='index')
        except Exception as e:
            print(e, base_name_without_ext)
            pass
    return data


def extract_vectors_of_specific_data(data, st_name_t_stamps, mark):
    vecs = []
    st_name_t_stamps_len = len(st_name_t_stamps)
    for stock_name, t_stamp in st_name_t_stamps:
        if st_name_t_stamps_len != 1:
            vecs.append(extract_vectors_of_specific_data_1d(data[stock_name], t_stamp, mark))
        else:
            vecs = extract_vectors_of_specific_data_1d(data[stock_name], t_stamp, mark)
            break
    return vecs


def extract_vectors_of_specific_data_1d(data, st_name_t_stamps, mark):
    vec = []
    for t_stamp in st_name_t_stamps:
        vec.append(data[t_stamp][mark])
    return vec


def plot_data(x, y, title="", xlabel="", ylabel=""):
    plt.figure(figsize=(20, 5))
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.plot(x, y)
    plt.show()


def period_error(period, length):
    if length < period or period < 0:
        try:
            caller_name = inspect.currentframe().f_back.f_code.co_name
            error_massage = '\n'.join((
                "Function name: " + caller_name,
                "Length of data is: %d" % length,
                "Period is: %d" % period,
                "We need period to be grather then 0 and less then length of data"
            ))
            raise ValueError(error_massage)
        except Exception as error:
            print("Error occured: " + str(error) + "\nEXIT")
            sys.exit(-1)


def relative_strength_index(close_prices_vecs, period):
    """
    Relative Strength Index (RSI): A technical momentum indicator that compares
    the magnitude of recent gains to recent losses in an attempt to determine
    overbought and oversold conditions of an asset. The formula for computing
    the Relative Strength Index is as follows.
    :return: [RSI = 100-[100/(1+RS)]]
    where  RS = Avg. of x days’ up closes / Average of x days’ down closes.
    """
    rsi = []
    for prices_vec in close_prices_vecs:
        if np.shape(prices_vec):
            rsi.append(relative_strength_index_1d(prices_vec, period))
        else:
            rsi = relative_strength_index_1d(close_prices_vecs, period)
            break
    return rsi


def relative_strength_index_1d(close_prices_vec, period):
    len_cl_prices_vec = len(close_prices_vec)
    period_error(period, len_cl_prices_vec)
    deltas = np.diff(close_prices_vec)
    seed = deltas[:period]
    up = seed[seed >= 0].sum() / period
    down = -seed[seed < 0].sum() / period
    rsi = [0]*(len_cl_prices_vec - period)
    for i in range(0, len_cl_prices_vec - period):
        delta = deltas[i + period - 1]
        up = (up * (period - 1) + (delta if delta >= 0 else 0)) / period
        down = (down * (period - 1) + (-delta if delta < 0 else 0)) / period
        rs = up / down if down != 0 else 100
        rsi[i] = 100. - 100. / (1. + rs)
    return rsi


def money_flow_index(typc_prices_vecs, volume_data_vecs, period):
    """
    Money Flow Index (MFI): This one measures the strength of money in and out
    of a security. The formula for MFI is as follows:
    Money Flow (MF) = Typical Price * Volume.
    Money Ratio (MR) = (Positive MF / Negative MF).
    :return: MFI = 100 – (100/ (1+MR)).
    """
    mfi = []
    for typical_prices_vec, volume_data_vec in zip(typc_prices_vecs, volume_data_vecs):
        if np.shape(typical_prices_vec):
            mfi.append(money_flow_index_1d(typical_prices_vec, volume_data_vec, period))
        else:
            mfi = money_flow_index_1d(typc_prices_vecs, volume_data_vecs, period)
            break
    return mfi


def money_flow_index_1d(typical_prices_vec, volume_data_vec, period):
    len_ty_prices_vec = len(typical_prices_vec)
    period_error(period, len_ty_prices_vec)
    deltas = np.diff(typical_prices_vec)
    raw_money_flow = np.array([typ*vol for typ, vol in zip(typical_prices_vec[1:], volume_data_vec)])
    mfi = [0]*(len_ty_prices_vec - period)
    for i in range(0, len_ty_prices_vec - period):
        seed = deltas[i:i + period]
        up = raw_money_flow[i:i + period][seed >= 0].sum() / period
        down = raw_money_flow[i:i + period][seed < 0].sum() / period
        mr = up / down if down != 0 else 100
        mfi[i] = 100. - 100. / (1. + mr)
    return mfi


def typical_prices_vecs(price_list_high, price_list_low, price_list_close):
    typ = []
    for high, low, close in zip(price_list_high, price_list_low, price_list_close):
        if np.shape(high):
            typ.append(typical_prices_vecs_1d(high, low, close))
        else:
            typ = typical_prices_vecs_1d(price_list_high, price_list_low, price_list_close)
            break
    return typ


def typical_prices_vecs_1d(price_list_high, price_list_low, price_list_close):
    return [np.mean([high, low, close]) for high, low, close in
            zip(price_list_high, price_list_low, price_list_close)]


def exponential_moving_average(close_price_vecs, period):
    """
    Exponential Moving Average (EMA): This indicator
    returns the exponential moving average of a field over a
    given period of time. EMA formula is as follows.
    :return: EMA = [alpha *T Close] + [1-alpha *Y close]
    Where T is Today’s close and Y is Yesterday’s close
    """
    ema = []
    for close_price_vec in close_price_vecs:
        if np.shape(close_price_vec):
            ema.append(exponential_moving_average_1d(close_price_vec, period))
        else:
            ema = exponential_moving_average_1d(close_price_vecs, period)
            break
    return ema


def exponential_moving_average_1d(close_prices_vec, period):
    len_cl_prices_vec = len(close_prices_vec)
    period_error(period, len_cl_prices_vec)
    close_prices_vec = np.asarray(close_prices_vec)
    weights = np.exp(np.linspace(-1., 0., period))
    weights /= weights.sum()
    ema = np.convolve(close_prices_vec, weights, mode='full')[:len(close_prices_vec)]
    ema = ema[period:]
    return ema


def stochastic_oscillator(high_prices_vecs, low_prices_vecs, close_prices_vecs, period):
    """
    Stochastic Oscillator (SO): The stochastic oscillator
    defined as a measure of the difference between the
    current closing price of a security and its lowest low
    price, relative to its highest high price for a given period
    of time. The formula for this computation is as follows:
    :return: %K = [(Close price – Lowest price) / (Highest Price – Lowest Price)] * 100
    """
    so = []
    for high_prices_vec, low_prices_vec, close_prices_vec in zip(high_prices_vecs, low_prices_vecs, close_prices_vecs):
        if np.shape(high_prices_vec):
            so.append(stochastic_oscillator_1d(high_prices_vec, low_prices_vec, close_prices_vec, period))
        else:
            so = stochastic_oscillator_1d(high_prices_vecs, low_prices_vecs, close_prices_vecs, period)
            break
    return so


def stochastic_oscillator_1d(high_prices_vec, low_prices_vec, close_prices_vec, period):
    len_high_prices_vec = len(high_prices_vec)
    period_error(period, len_high_prices_vec)
    max_high_vec = [max(high_prices_vec[i:i + period+1]) for i in range(0, len_high_prices_vec - period)]
    min_low_vec = [min(low_prices_vec[i:i + period+1]) for i in range(0, len_high_prices_vec - period)]
    curr_low_sub = np.subtract(close_prices_vec[period:], min_low_vec)
    high_min_sub = np.subtract(max_high_vec, min_low_vec)
    return [(curr_low_s / high_min_s) * 100 if high_min_s != 0 else 100
            for curr_low_s, high_min_s in zip(curr_low_sub, high_min_sub)]


def moving_average_convergence_divergence(close_prices_vecs, slow_period=26, fast_period=12):
    """
    Moving Average Convergence/Divergence (MACD):
    This function calculates difference between a short and a long term moving average for a field.
    The formulas for calculating MACD.
    :return: MACD = [EMA of Closing prices] - [EMA of closing prices]
    """
    macd = []
    for close_prices_vec in close_prices_vecs:
        if np.shape(close_prices_vec):
            macd.append(moving_average_convergence_divergence_1d(close_prices_vec, slow_period, fast_period))
        else:
            macd = moving_average_convergence_divergence_1d(close_prices_vecs, slow_period, fast_period)
            break
    return macd


def moving_average_convergence_divergence_1d(close_prices_vec, slow_period=26, fast_period=12):
    len_cl_prices_vec = len(close_prices_vec)
    period_error(slow_period, len_cl_prices_vec)
    period_error(fast_period, len_cl_prices_vec)
    emaslow = exponential_moving_average(close_prices_vec, slow_period)
    emafast = exponential_moving_average(close_prices_vec, fast_period)
    return np.subtract(emafast[len(emafast) - len(emaslow):], emaslow)


def signal_line(macd_data_vecs, period=9):
    """
    :return: Signal Line = period day EMA of MACD
    """
    sl = []
    for macd_data_vec in macd_data_vecs:
        if np.shape(macd_data_vec):
            sl.append(signal_line_1d(macd_data_vec, period))
        else:
            sl = signal_line_1d(macd_data_vecs, period)
            break
    return sl


def signal_line_1d(macd_data_vec, period=9):
    len_macd_data_vec = len(macd_data_vec)
    period_error(period, len_macd_data_vec)
    return exponential_moving_average(macd_data_vec, period)