Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
robertopucp
GitHub Repository: robertopucp/1eco35_2022_2
Path: blob/main/Trabajo_grupal/WG5/Grupo_7_py.ipynb
2714 views
Kernel: Python 3 (ipykernel)

Class

import pandas as pd import numpy as np import pyreadr # Load R dataset import os # for usernanme y set direcotrio
class OLSRegClass( object ): # también podemos omitir object, pues es lo mismo # esto convertirá los métodos en privados __slots__ = [ '__X', 'y', 'RobustStandardError', 'X_np', 'y_np', 'columns', 'beta_ols', 'beta_OLS', 'var_OLS', 'beta_se', 'confiden_interval', 'y_est', 'SCR', 'R2', 'output', 'rootMSE'] def __init__( self, X:pd.DataFrame, y:pd.Series, lista, RobustStandardError=True ): # X:pd.DataFrame indica que debe ser un dataframe # y:pd.Series indica que debe ser una serie ## CONDICIONAL PARA X:pd.DataFrame ### if not isinstance( X, pd.DataFrame ): # si X no es dataframe, arroja error raise TypeError( "X must be a pd.DataFrame." ) ## CONDICIONAL PARA y:pd.Series ### if not isinstance( y, pd.Series ): # si y no es series, arroja error raise TypeError( "y must be a pd.Series." ) # ## CONDICIONAL PARA y:pd.Series ### # if not isinstance( lista, pd.Series ): # si lista no es series, arroja error # raise TypeError( "lista must be a pd.Series." ) # asignando atributos de la clase try: self.__X = X.loc[:, lista] except: self.__X = X.iloc[:, lista] self.y = y self.RobustStandardError = RobustStandardError # incluyendo columna de unos para el intercepto self.__X[ 'Intercept' ] = 1 # crea columna Intercept con valores 1 al final del array # queremos que la columna Intercept aparezca en la primera columna cols = self.__X.columns.tolist() # convierte el nombre de las columnas a lista new_cols_orders = [cols[ -1 ]] + cols[ 0:-1 ] # mueve la última columna (que sería Intercept) al inicio # la manera de hacerlo es ordenando primero cols[-1] y luego cols[0:-1] self.__X = self.__X.loc[ :, new_cols_orders ] # usamos .loc que filtra por nombre de filas o columnas # creando nuevos atributos self.X_np = self.__X.values # pasamos dataframe a multi array self.y_np = y.values.reshape( -1 , 1 ) # de objeto serie a array columna self.columns = self.__X.columns.tolist() # nombre de la base de datos como objeto lista ########################################################################### ######### CREANDO MÉTODOS ############################################### ######### MÉTODO 1 ############################################### def beta_OLS_Reg( self ): # X, y en Matrix, y vector columna respectivamente X_np = self.X_np y_np = self.y_np # beta_ols self.beta_ols = np.linalg.inv( X_np.T @ X_np ) @ ( X_np.T @ y_np ) # asignando output de la función def beta_OLS( self ): como atributo self.beta_OLS index_names = self.columns beta_OLS_output = pd.DataFrame( self.beta_ols, index = index_names, columns = [ 'Coef.' ] ) self.beta_OLS = beta_OLS_output return beta_OLS_output ######### MÉTODO 2 ############################################### def var_stderrors_cfdinterval( self ): ################# ### VARIANCE ### # Se corre la función beta_OLS que estima el vector de coeficientes self.beta_OLS_Reg() # usaré atributos pero con un nombre más simple X_np = self.X_np y_np = self.y_np # beta_ols beta_OLS = self.beta_OLS.values.reshape( - 1, 1 ) # Dataframe a vector columna # errors e = y_np - ( X_np @ beta_OLS ) # error variance N = X_np.shape[ 0 ] total_parameters = X_np.shape[ 1 ] error_var = ( (e.T @ e)[ 0 ] )/( N - total_parameters ) # Varianza var_OLS = error_var * np.linalg.inv( X_np.T @ X_np ) # asignando output de la función def reg_var_OLS( self ): como atributo self.var_OLS index_names = self.columns var_OLS_output = pd.DataFrame( var_OLS , index = index_names , columns = index_names ) self.var_OLS = var_OLS_output ####################### ### STANDAR ERRORS ### # var y beta beta_OLS = self.beta_OLS.values.reshape( -1, 1 ) # -1 significa cualquier número de filas var_OLS = self.var_OLS.values # standard errors beta_stderror = np.sqrt( np.diag( var_OLS ) ) table_data0 = { "Std.Err." : beta_stderror.ravel()} # defining index names index_names0 = self.columns # defining a pandas dataframe self.beta_se = pd.DataFrame( table_data0 , index = index_names0 ) ########################### ### Confidence interval ### up_bd = beta_OLS.ravel() + 1.96*beta_stderror lw_bd = beta_OLS.ravel() - 1.96*beta_stderror table_data1 = {"[0.025" : lw_bd.ravel(), "0.975]" : up_bd.ravel()} # defining index names index_names1 = self.columns # defining a pandas dataframe self.confiden_interval = pd.DataFrame( table_data1 , index = index_names1 ) ###################### MÉTODO 3 ############################################### def robust_var_se_cfdinterval(self): # Se corre la función reg_beta_OLS que estima el vector de coeficientes self.reg_beta_OLS() # usaré atributos pero con un nombre más simple X_np = self.X_np y_np = self.y_np listaf = self.lista beta = np.linalg.inv(X_np.T @ X_np) @ ((X_np.T) @ y ) y_est = X_np @ beta n = X_np.shape[0] k = X_np.shape[1] - 1 nk = n - k matrix_robust = np.diag(list( map( lambda x: x**2 , y - y_est))) Var = np.linalg.inv(X_np.T @ X_np) @ X_np.T @ matrix_robust @ X_np @ np.linalg.inv(X_np.T @ X_np) sd = np.sqrt( np.diag(Var) ) var = sd**2 t_est = np.absolute(beta/sd) lower_bound = beta-1.96*sd upper_bound = beta+1.96*sd SCR = sum(list( map( lambda x: x**2 , y - y_est) )) SCT = sum(list( map( lambda x: x**2 , y - np.mean(y_est) ))) R2 = 1-SCR/SCT rmse = (SCR/n)**0.5 table = pd.DataFrame( {"ols": beta , "standar_error" : sd , "Lower_bound":lower_bound, "Upper_bound":upper_bound} ) fit = {"Root_MSE":rmse, "R2": R2} index_names7 = listaf var_robust_output = pd.DataFrame( Var , index = index_names7 , columns = index_names7 ) self.var_robust = var_robust_output return table, fit, var_robust_output ######### MÉTODO 4 ############################################### def R2_rootMSE( self ) : ############ ### R2 ### # Se corre la función beta_OLS_Reg que estima el vector de coeficientes self.beta_OLS_Reg() self.y_est = self.X_np @ self.beta_OLS # y estimado error = self.y_np - self.y_est # vector de errores self.SCR = np.sum(np.square(error)) # Suma del Cuadrado de los Residuos SCT = np.sum(np.square(self.y_np - np.mean(self.X_np) )) # Suma de Cuadrados Total self.R2 = 1 - self.SCR/SCT ################# ### root MSE ### for i in error.values: suma = 0 suma = np.sqrt( suma + (i**2) / self.X_np.shape[0] ) self.rootMSE = suma.tolist() ######### MÉTODO 5 ############################################### def finaloutput( self ): self.beta_OLS_Reg() self.R2_rootMSE() self.var_stderrors_cfdinterval() # var y beta beta_OLS = self.beta_OLS.values.reshape( -1, 1 ) # -1 significa cualquier número de filas var_OLS = self.var_OLS.values # standard errors beta_stderror = np.sqrt( np.diag( var_OLS ) ) # confidence interval up_bd = beta_OLS.ravel() + 1.96*beta_stderror lw_bd = beta_OLS.ravel() - 1.96*beta_stderror table_data2 = {'Coef.' : beta_OLS.ravel(), 'Std.Err.' : beta_stderror.ravel(), '[0.025' : lw_bd.ravel(), '0.975]' : up_bd.ravel(), 'R2' : self.R2, 'rootMSE' : self.rootMSE} return table_data2
# leemos las base de datos sin cambiar nombre de usuario user = os.getlogin() # Username os.chdir(f"C:/Users/{user}/Documents/GitHub/1ECO35_2022_2/Lab4") # Set directorio cps2012_env = pyreadr.read_r("../data/cps2012.Rdata") # output formato diccionario cps2012 = cps2012_env[ 'data' ] # extrae iformación almacenada en la llave data del diccionario cps2012_env # Borrar variables constantes: filtra observaciones que tenga varianza diferente a cero variance_cols = cps2012.var().to_numpy() # to numpy dataset = cps2012.iloc[ :, np.where( variance_cols != 0 )[0] ] # filtra observaciones que tenga varianza diferente a cero # genero un dataset con 10 columnas del dataset general X = dataset.iloc[:, 1:] y = dataset[['lnw']].squeeze() # convirtiendo a serie
# asignando clase, ya sea por nombre o posición de variables reg1 = OLSRegClass (X, y, ['female', 'widowed', 'divorced', 'separated', 'nevermarried'])

Tratando de modificar atributo privado

# tratando de acceder al atributo privado reg1.__X # no se puede acceder pues esta bloqueado
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) ~\AppData\Local\Temp\ipykernel_4952\1245474602.py in <module> 1 # tratando de acceder al atributo privado ----> 2 reg1.__X # no se puede acceder pues esta bloqueado AttributeError: 'OLSRegClass' object has no attribute '__X'
# tratando de acceder al atributo privado reg1.X
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) ~\AppData\Local\Temp\ipykernel_4952\2535370293.py in <module> 1 # tratando de acceder al atributo privado ----> 2 reg1.X AttributeError: 'OLSRegClass' object has no attribute 'X'
reg1.y # sin embargo el atributo "y" no está bloqueado y sí se puede acceder a este
0 1.909543 1 1.365773 2 2.540223 3 1.801091 4 3.349904 ... 29212 3.978513 29213 3.142265 29214 2.725619 29215 3.142265 29216 2.433613 Name: lnw, Length: 29217, dtype: float64

Tratando de modificar método privado

# Tratando de añadir nuevas funciones a la clase RegClass, método privado _reg_beta_OLS(), atributo self.table_data2 def new_funtion(X): return X**3
# tratando de modificar el metodo bloqueado reg1.finaloutput = new_funtion # no se puede modificar pues el método finaloutput es de solo lectura
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) ~\AppData\Local\Temp\ipykernel_4952\2232799291.py in <module> 1 # tratando de modificar el metodo bloqueado ----> 2 reg1.finaloutput = new_funtion # no se puede modificar pues el método finaloutput es de solo lectura AttributeError: 'OLSRegClass' object attribute 'finaloutput' is read-only

SPSS file

import savReaderWriter as sav # import sav (package), lee labes de variables
user = os.getlogin() # Username os.chdir(f"C:/Users/{user}/Documents/GitHub/1ECO35_2022_2/Lab4") df = pd.read_spss( r"../data/data_administrativa.sav" ) df

Mostrar las variables que presentan missing values

df.isnull().sum() # Las variables con missing values son P203A P203B P204 P205 P206 P207 P208A P208B P209.
year 0 MES 0 CONGLOME 0 VIVIENDA 0 HOGAR 0 CODPERSO 0 UBIGEO 0 DOMINIO 0 ESTRATO 0 P201P 0 P203 0 P203A 22795 P203B 22795 P204 2349 P205 4904 P206 82480 P207 2349 P208A 2349 P208B 84038 P209 19016 dtype: int64
# Mostrar las etiquetas de dos variables (var labels) y las etiquetas de los valores en dos variables (value's labels). with sav.SavHeaderReader( r"../data/data_administrativa.sav", ioUtf8=True) as header: metadata = header.all() # save dataset val_labels_data_administrativa = metadata.valueLabels # get labels from values var_labels_data_administrativa = metadata.varLabels # get labels from varaibles (description) # ioUtf8 read special characters
# etiquetas de dos variables var_labels_data_administrativa['DOMINIO']
'Dominio geográfico'
var_labels_data_administrativa['ESTRATO']
'Estrato geográfico'
# etiquetas de valores en dos variables val_labels_data_administrativa['DOMINIO']
{1.0: 'Costa Norte', 2.0: 'Costa Centro', 3.0: 'Costa Sur', 4.0: 'Sierra Norte', 5.0: 'Sierra Centro', 6.0: 'Sierra Sur', 7.0: 'Selva', 8.0: 'Lima Metropolitana'}
val_labels_data_administrativa['ESTRATO']
{1.0: ' De 500 000 a más habitantes', 2.0: ' De 100 000 a 499 999 habitantes', 3.0: ' De 50 000 a 99 999 habitantes', 4.0: ' De 20 000 a 49 999 habitantes', 5.0: 'De 2 000 a 19 999 habitantes', 6.0: ' De 500 a 1 999 habitantes', 7.0: ' Área de Empadronamiento Rural (AER) Compuesto', 8.0: ' Área de Empadronamiento Rural (AER) Simple'}

Se le pide detectar personas que fueran entrevistadas en ambos años. Para ello, se pide detectar duplicados a partir del identificador por persona : conglome, vivienda, hogar y codperso.

# creando variable para identificar a un hogar persona = ['CONGLOME', 'VIVIENDA', 'HOGAR', 'CODPERSO'] # viendo si hay duplicados df_duplicated = df[ df.loc[:, persona].duplicated(keep = False) ] # keep=False : muestra primera aparición y duplicado df_duplicated[persona] # hay 8270 duplicados
# eliminando duplicados df_no_dpl = df[ ~ df.loc[:, persona].duplicated() ].copy() # ~ : elimina los primeros duplicados df_no_dpl

Ordene la base de datos a partir de las variables que identifican cada miembro y la variable de año (year). Así podrá observar a cada individuo en ambos años.

ordenar = ['year', 'CONGLOME', 'VIVIENDA', 'HOGAR', 'CODPERSO'] df.sort_values(ordenar, inplace=True) df

Finalmente crear una base de datos para cada año y guardar en la carpeta data con los siguientes nombres data_2019_(numero de grupo) y data_2020_(numero de grupo).

# creando bases de datos para cada año data_2019 = df[ df.year == '2019' ] data_2020 = df[ df.year == '2019' ] # guardando las bases de datos data_2019.to_csv(r'../data/data_2019_Grupo7.csv', index = False) data_2020.to_csv(r'../data/data_2020_Grupo7.csv', index = False)