Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
AllenDowney
GitHub Repository: AllenDowney/bayesian-analysis-recipes
Path: blob/master/incubator/covariance-imputation.ipynb
411 views
Kernel: bayesian

Introduction

In this notebook, I would like to investigate the use of pairwise covariance matrices to impute data.

Simulated Data

First off, let's simulate data drawn from a multivariate normal. Three columns of data, columns A, B, and C, for which we know the ground-truth covariance matrix between all 3.

import numpy as np import matplotlib.pyplot as plt import pandas as pd import seaborn as sns import janitor import numpy_sugar as ns %load_ext autoreload %autoreload 2 %matplotlib inline %config InlineBackend.figure_format = 'retina'
def extract_diagonal(M): diag = np.zeros((M.shape[0], M.shape[1])) np.fill_diagonal(diag, np.diagonal(M)) return diag
n_variates = 5 triangle = np.random.random(size=(n_variates, n_variates)) cov = np.triu(triangle, k=1) + np.triu(triangle, k=1).T + extract_diagonal(triangle) mean = np.random.random(size=(n_variates))
cov
ns.linalg.check_semidefinite_positiveness(cov)
np.linalg.cond(cov)
data = np.random.multivariate_normal(mean=mean, cov=cov, size=10_000_000) df = pd.DataFrame(data)
data.shape
sns.pairplot(pd.DataFrame(data).sample(1000))

Now, let's simulate the case where a dropout mask is applied on 99% of the data.

mask = np.random.binomial(n=1, p=0.01, size=data.shape)
ind = np.where(mask.flatten() == 0) ind
data_masked = mask * data
np.put(data_masked, ind, np.nan)
pd.DataFrame(data_masked)
import missingno as msno
df_masked = pd.DataFrame(data_masked).dropna(how="all") msno.matrix(df_masked)
import janitor
from itertools import combinations import seaborn as sns fig, ax = plt.subplots( figsize=(15, 15), nrows=n_variates, ncols=n_variates, sharex=True, sharey=True ) covars = dict() for r, c in combinations(df_masked.columns, 2): df_filtered = df_masked[[r, c]].dropna() sns.kdeplot(data=df_filtered[r], data2=df_filtered[c], ax=ax[r, c]) sns.kdeplot(data=df_filtered[c], data2=df_filtered[r], ax=ax[c, r]) # ax[r, c].scatter(df_filtered[r], df_filtered[c]) # ax[c, r].scatter(df_filtered[c], df_filtered[r]) covar = np.cov(df_filtered.T) ax[r, c].set_title(f"{covar[0, 1]:.2f}") ax[c, r].set_title(f"{covar[1, 0]:.2f}") covars[(r, c)] = covar covars[(c, r)] = covar plt.tight_layout()

Now, let's say I have a new sample for which I only have data from column 0 and 1. Can we combine this information in a mathematically principled fashion so as to recover measurement of column 2 with uncertainty?

df_unknown2 = df_masked.dropna(subset=[0, 1], how="any").dropnotnull(4)
df_unknown2
from pprint import pprint pprint(covars)

By the fundamental rule of multivariate normals, if we have a bivariate Normal distribution:

X1X2N(μ,Σ)X_1 X_2 \sim N(\mu, \Sigma)

Then if we know the value of X2=x2X_2=x_2, then X1X_1 follows a distribution:

X1N(μ12,Σ12)X_1 \sim N(\mu_{1|2}, \Sigma_{1|2})

where

μ12=μ1+Σ12Σ221(x2μ2)\mu_{1|2} = \mu_1 + \Sigma_{12}\Sigma^{-1}_{22}(x_2-\mu_2)

and

Σ12=Σ11Σ12Σ221Σ21\Sigma_{1|2} = \Sigma_{11} - \Sigma_{12} \Sigma^{-1}_{22} \Sigma_{21}

Thanks to the magic of Python, we can encode this in a function. Given two columns of data, we can estimate μ1\mu_1 and μ2\mu_2 and the covariance matrix Σ\Sigma.

def mu_cond(mu, sig, x): """ Compute Gaussian mean conditioned on x (observed data). x should always have fewer entries than mu, and is assumed to be aligned with the last set of entries in mu and sig. """
mu = df_masked.mean().values mu
covars[(0, 2)] # sigma = np.cov()
def mu_2g1(mu: np.ndarray, cov: np.ndarray, x_1: float): """ :param mu: length-2 vector of mus. :param cov: 2x2 square covariance matrix. :param x_2: Known measurement. """ sigma_21 = cov[1, 0] sigma_11 = cov[0, 0] mu_1 = mu[0] mu_2 = mu[1] return mu_2 + sigma_21 * 1 / sigma_11 * (x_1 - mu_1) idx = df_unknown2.index[0] print(mu_2g1(mu[[0, 2]], covars[(0, 2)], x_1=df_unknown2.loc[idx, 0])) print(mu_2g1(mu[[1, 2]], covars[(1, 2)], x_1=df_unknown2.loc[idx, 1]))
def sig_2g1(cov): """ :param cov: 2x2 covariance matrix. """ if not cov.shape == (2, 2): raise ValueError("cov must be a 2x2 matrix") return cov[1, 1] - cov[1, 0] * 1 / cov[0, 0] * cov[0, 1] pprint(sig_2g1(covars[(0, 2)])) pprint(sig_2g1(covars[(1, 2)]))
df_masked[[0, 2]].mean().values
def mu_sig(known_col, unknown_col): idx = df_unknown2.index[0] x_known = df_unknown2.loc[idx, known_col] mu = df_masked[[known_col, unknown_col]].mean().values cov = covars[(known_col, unknown_col)] mu = mu_2g1(mu, cov, x_known) sig = sig_2g1(cov) return mu, sig
mu_sig(0, 2)
mu_sig(1, 2)
from scipy.stats import norm
x = np.linspace(-5, 5, 1000) logpdf1 = norm(*mu_sig(0, 2)).logpdf(x) pdf1 = norm(*mu_sig(0, 2)).pdf(x) logpdf2 = norm(*mu_sig(1, 2)).logpdf(x) pdf2 = norm(*mu_sig(1, 2)).pdf(x) plt.plot(x, logpdf1) plt.plot(x, logpdf2) plt.plot(x, logpdf1 + logpdf2)
r = 0 c = 2 df_filtered = df_masked[[r, c]].dropna() plt.scatter(*df_filtered.T.values) plt.vlines(x=df_unknown2.loc[idx, r], ymin=0, ymax=4) r = 1 c = 2 df_filtered = df_masked[[r, c]].dropna() plt.scatter(*df_filtered.T.values) plt.vlines(x=df_unknown2.loc[idx, r], ymin=0, ymax=4)
idx = df_unknown2.index[0] df.loc[idx]
sumlogpdf = logpdf1 + logpdf2 x[np.where(sumlogpdf == sumlogpdf.max())]
plt.plot(x, pdf1 * pdf2)

It works! We can use fully probabilistic methods that are mathematically principled to obtain estimates of unknown data, given that we know the joint distribution.