Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: stats.multivariate: introduce Covariance class and subclasses #88

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scipy/stats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@
from ._page_trend_test import page_trend_test
from ._mannwhitneyu import mannwhitneyu
from ._fit import fit
from ._covariance import *

# Deprecated namespaces, to be removed in v2.0.0
from . import (
Expand Down
340 changes: 340 additions & 0 deletions scipy/stats/_covariance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
from functools import cached_property

import numpy as np
from scipy import linalg
from . import _multivariate

__all__ = ["Covariance", "CovViaDiagonal", "CovViaPrecision",
"CovViaEigendecomposition", "CovViaCov", "CovViaPSD"]


def _apply_over_matrices(f):
def _f_wrapper(x, *args, **kwargs):
x = np.asarray(x)
if x.ndim <= 2:
return f(x, *args, **kwargs)

old_shape = list(x.shape)
m, n = old_shape[-2:]
new_shape = old_shape[:-2] + [m*n]

def _f_on_raveled(x):
return f(x.reshape((m, n)), *args, **kwargs)

return np.apply_along_axis(_f_on_raveled, axis=-1,
arr=x.reshape(new_shape))
return _f_wrapper


@_apply_over_matrices
def _extract_diag(A):
return np.diag(A)


# @_apply_over_matrices
# def _solve_triangular(A, *args, **kwargs):
# return linalg.solve_triangular(A, *args, **kwargs)

def _solve_triangular(A, x, *args, **kwds):

dims = A.shape[-1]
message = "`x.shape[-1] must equal the `dimensionality` of the covariance."
if x.shape[-1] != dims:
raise ValueError(message)

try:
np.broadcast_shapes(x.shape[:-2], A.shape[:-2])
except ValueError as e:
message = "Shape of `x` must be compatible with that of `cov`."
raise ValueError(message) from e

# Prepend 1s as needed so `ndim` are equal
ndim = max(A.ndim, x.ndim)
A = A.reshape((1,)*(ndim-A.ndim) + A.shape)
x = x.reshape((1,)*(ndim-x.ndim) + x.shape)
dimnums = np.arange(ndim-2)

# If dimension of A is non-singleton, we need to loop over it. ("loop")
# Move all of these to the front.
# Remaining dimensions will end up next to -2. We'll combine them with
# -2 and act over them in a multiple-RHS single call to
# `solve_triangular`. ("mrhs")
i_loop = np.array(A.shape[:-2]) > 1
j_loop = dimnums[i_loop]
n_loop_dims = len(j_loop)
A = np.moveaxis(A, j_loop, np.arange(n_loop_dims))
x = np.moveaxis(x, j_loop, np.arange(n_loop_dims))

# Next, broadcast the looping dimensions of x against those of A
x_new_shape = list(x.shape)
x_new_shape[:n_loop_dims] = A.shape[:n_loop_dims]
x = np.broadcast_to(x, x_new_shape)

# Combine the dimensions
n_loops = np.prod(A.shape[:n_loop_dims], dtype=int)
n_mrhs = np.prod(x.shape[n_loop_dims:-1], dtype=int)
A = A.reshape((n_loops, dims, dims))
x = x.reshape((n_loops, n_mrhs, dims))

# Calculate
res = np.zeros_like(x)
for i in range(n_loops):
res[i, :] = linalg.solve_triangular(A[i], x[i].T, *args, **kwds).T

# Undo the shape transformations
res = res.reshape(x_new_shape)
res = np.moveaxis(res, np.arange(n_loop_dims), j_loop)

return res


def _T(A):
if A.ndim < 2:
return A
else:
return np.swapaxes(A, -1, -2)


def _J(A):
return np.flip(A, axis=(-1, -2))


def _dot_diag(x, d):
# If d were a full diagonal matrix, x @ d would always do what we want
# This is for when `d` is compressed to include only the diagonal elements
return x * d if x.ndim < 2 else x * np.expand_dims(d, -2)


class Covariance():
"""
Representation of a covariance matrix as needed by multivariate_normal
"""
# The last two axes of matrix-like input represent the dimensionality
# In the case of diagonal elements or eigenvalues, in which a diagonal
# matrix has been reduced to one dimension, the last dimension
# of the input represents the dimensionality. Internally, it
# will be expanded in the second to last axis as needed.
# Matrix math works, but instead of the fundamental matrix-vector
# operation being A@x, think of x are row vectors that pre-multiply A.

def whiten(self, x):
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is taken from KDE. Really, I think I'd just replace this method with xTPx, which would do the equivalent of np.sum(np.square(np.dot(dev, prec_U)), axis=-1).

"""
Right multiplication by the left square root of the precision matrix.
"""
return self._whiten(x)

@cached_property
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cached_property is probably not appropriate all over the place, here. The important thing with this Covariance class is to define an interface and provide default documentation (which would not need to be rewritten by subclasses).

def log_pdet(self):
"""
Log of the pseudo-determinant of the covariance matrix
"""
return np.array(self._log_pdet, dtype=float)[()]

@cached_property
def rank(self):
"""
Rank of the covariance matrix
"""
return np.array(self._rank, dtype=int)[()]

@cached_property
def A(self):
"""
Explicit representation of the covariance matrix
"""
return self._A

@cached_property
def dimensionality(self):
"""
Dimensionality of the vector space
"""
return np.array(self._dimensionality, dtype=int)[()]

@cached_property
def shape(self):
"""
Shape of the covariance array
"""
return self._shape

def _validate_matrix(self, A, name):
A = np.atleast_2d(A)
if not np.issubdtype(A.dtype, np.number):
message = (f"The input `{name}` must be an array of numbers.")
raise ValueError(message)
m, n = A.shape[-2:]
if m != n:
message = (f"`{name}.shape[-2]` must equal `{name}.shape[-1]`")
raise ValueError(message)
return A

def _validate_vector(self, A, name):
A = np.atleast_1d(A)
if not np.issubdtype(A.dtype, np.number):
message = (f"The input `{name}` must be an array of numbers.")
raise ValueError(message)
return A


class CovViaDiagonal(Covariance):
"""
Representation of a covariance provided via the diagonal
"""

def __init__(self, diagonal):
diagonal = self._validate_vector(diagonal, 'diagonal')

i_zero = diagonal <= 0
positive_diagonal = np.array(diagonal, dtype=np.float64)

positive_diagonal[i_zero] = 1 # ones don't affect determinant
self._log_pdet = np.sum(np.log(positive_diagonal), axis=-1)

psuedo_reciprocals = 1 / np.sqrt(positive_diagonal)
psuedo_reciprocals[i_zero] = 0

self._LP = psuedo_reciprocals
self._rank = positive_diagonal.shape[-1] - i_zero.sum(axis=-1)
self._A = np.apply_along_axis(np.diag, -1, diagonal)
self._dimensionality = diagonal.shape[-1]
self._i_zero = i_zero
self._shape = self._A.shape
self._allow_singular = True

def _whiten(self, x):
return _dot_diag(x, self._LP)

def _support_mask(self, x):
"""
Check whether x lies in the support of the distribution.
"""
return ~np.any(_dot_diag(x, self._i_zero), axis=-1)


class CovViaPrecision(Covariance):
"""
Representation of a covariance provided via the precision matrix
"""

def __init__(self, precision, covariance=None):
precision = self._validate_matrix(precision, 'precision')
if covariance is not None:
covariance = self._validate_matrix(covariance, 'covariance')
message = "`precision.shape` must equal `covariance.shape`."
if precision.shape != covariance.shape:
raise ValueError(message)

self._LP = np.linalg.cholesky(precision)
self._log_pdet = -2*np.log(_extract_diag(self._LP)).sum(axis=-1)
self._rank = precision.shape[-1] # must be full rank in invertible
self._precision = precision
self._covariance = covariance
self._dimensionality = self._rank
self._shape = precision.shape
self._allow_singular = False

def _whiten(self, x):
return x @ self._LP

@cached_property
def _A(self):
return (np.linalg.inv(self._precision) if self._covariance is None
else self._covariance)


class CovViaEigendecomposition(Covariance):
Copy link
Owner Author

@mdhaber mdhaber Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One advantage of the eigendecomposition is that it supports singular covariance matrices. But we can compute the key $x^T \Sigma^* x$ product using LDL factors, too.

For nonsingular matrices, this is what it looks like:

import numpy as np
from scipy import linalg

n = 40
rng = np.random.default_rng(14)
x = rng.random(size=(n, 1))
A = rng.random(size=(n, n))
A = A @ A.T

# Eigendecomposition route
w, V = linalg.eigh(A)
z = x.T @ (V * w**(-0.5))
xSxT = (z**2).sum()

# LDL route
L, D, perm = linalg.ldl(A)
y = linalg.solve_triangular(L[perm], x[perm], lower=True).T * np.diag(D)**(-0.5)
xSxT2 = (y**2).sum()

print(xSxT, xSxT2)

For singular matrices, we need to mask out the zero eigenvalues, so it looks a little more complicated. When the vector x is within the right subspace, we can perform the calculation as:

import numpy as np
from scipy import linalg

n = 10
rng = np.random.default_rng(1329348965454623456)
x = rng.random(size=(n, 1))
A = rng.random(size=(n, n))

A[0] = A[1] + A[2]  # make it singular
x[0] = x[1] + x[2]  # ensure x is in the subspace
A = A @ A.T

eps = 1e-10

# Eigendecomposition route
w, V = linalg.eigh(A)
mask = np.abs(w) < eps
w_ = w.copy()
w_[mask] = np.inf
z = x.T @ (V * w_**(-0.5))
xSxT = (z**2).sum()

# LDL route
L, D, perm = linalg.ldl(A)
d = np.diag(D).copy()[:, np.newaxis]
mask = np.abs(d) < eps
d_ = d.copy()
d[mask] = 0
d_[mask] = np.inf
y = linalg.solve_triangular(L[perm], x[perm], lower=True) * d_**(-0.5)
xSxT2 = (y**2).sum()

np.testing.assert_allclose(xSxT, xSxT2)

And we can tell it's in the subspace if np.allclose(L@(d**0.5*y), x). Otherwise, the PDF is zero.

Copy link
Owner Author

@mdhaber mdhaber Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some work. The pseudodeterminant calculation isn't just the pseudodeterminant of the D matrix (product of positive diagonal elements).

class CovViaLDL(Covariance):

    def __init__(self, ldl):
        L, d, perm = ldl
        d = self._validate_vector(d, 'd')
        perm = self._validate_vector(perm, 'perm')
        L = self._validate_matrix(L, 'L')

        i_zero = d <= 0
        positive_d = np.array(d, dtype=np.float64)
        positive_d[i_zero] = 1  # ones don't affect determinant
        self._log_pdet = np.sum(np.log(positive_d), axis=-1)

        psuedo_reciprocals = 1 / np.sqrt(positive_d)
        psuedo_reciprocals[i_zero] = 0

        self._psuedo_reciprocals = psuedo_reciprocals
        self._d = d
        self._perm = perm
        self._L = L
        self._rank = d.shape[-1] - i_zero.sum(axis=-1)
        self._dimensionality = d.shape[-1]
        self._shape = L.shape
        # This is only used for `_support_mask`, not to decide whether
        # the covariance is singular or not.
        self._eps = 1e-8
        self._allow_singular = True

    def _whiten(self, x):
        L = self.L[self.perm]
        x = self.x[self.perm]
        return linalg.solve_triangular(L, x, lower=True) * self.psuedo_reciprocals

    @cached_property
    def _covariance(self):
        return (self._L * self._d) @ self._L.T

   @staticmethod
    def from_ldl(ldl):
        r"""
        Representation of covariance provided via LDL (aka LDLT) decomposition

        Parameters
        ----------
        ldl : sequence
            A sequence (nominally a tuple) containing the lower factor ``L``,
            the diagonal multipliers ``d``, and row-permutation indices
            ``perm`` as computed by `scipy.linalg.ldl`.

        Notes
        -----
        Let the covariance matrix be :math:`A`, and let :math:`L` be a lower
        triangular matrix and :math:`D` be a diagonal matrix such that
        :math:`L D L^T = A`.

        When all of the eigenvalues are strictly positive, whitening of a
        data point :math:`x` is performed by computing
        :math:`x^T (V W^{-1/2})`, where the inverse square root can be taken
        element-wise.
        :math:`\log\det{A}` is calculated as  :math:`tr(\log{W})`,
        where the :math:`\log` operation is performed element-wise.

        This `Covariance` class supports singular covariance matrices. When
        computing ``_log_pdet``, non-positive eigenvalues are ignored.
        Whitening is not well defined when the point to be whitened
        does not lie in the span of the columns of the covariance matrix. The
        convention taken here is to treat the inverse square root of
        non-positive eigenvalues as zeros.

        Examples
        --------
        Prepare a symmetric positive definite covariance matrix ``A`` and a
        data point ``x``.

        >>> import numpy as np
        >>> from scipy import stats
        >>> rng = np.random.default_rng()
        >>> n = 5
        >>> A = rng.random(size=(n, n))
        >>> A = A @ A.T  # make the covariance symmetric positive definite
        >>> x = rng.random(size=n)

        Perform the eigendecomposition of ``A`` and create the `Covariance`
        object.

        >>> w, v = np.linalg.eigh(A)
        >>> cov = stats.Covariance.from_eigendecomposition((w, v))

        Compare the functionality of the `Covariance` object against
        reference implementations.

        >>> res = cov.whiten(x)
        >>> ref = x @ (v @ np.diag(w**-0.5))
        >>> np.allclose(res, ref)
        True
        >>> res = cov.log_pdet
        >>> ref = np.linalg.slogdet(A)[-1]
        >>> np.allclose(res, ref)
        True

        """
        return CovViaLDL(ldl)

"""
Representation of a covariance provided via eigenvalues and eigenvectors
"""

def __init__(self, eigendecomposition):
eigenvalues, eigenvectors = eigendecomposition
eigenvalues = self._validate_vector(eigenvalues, 'eigenvalues')
eigenvectors = self._validate_matrix(eigenvectors, 'eigenvectors')
message = ("The shapes of `eigenvalues` and `eigenvectors` "
"must be compatible.")
try:
eigenvalues = np.expand_dims(eigenvalues, -2)
eigenvectors, eigenvalues = np.broadcast_arrays(eigenvectors,
eigenvalues)
eigenvalues = eigenvalues[..., 0, :]
except ValueError:
raise ValueError(message)

i_zero = eigenvalues <= 0
positive_eigenvalues = np.array(eigenvalues, dtype=np.float64)

positive_eigenvalues[i_zero] = 1 # ones don't affect determinant
self._log_pdet = np.sum(np.log(positive_eigenvalues), axis=-1)

psuedo_reciprocals = 1 / np.sqrt(positive_eigenvalues)
psuedo_reciprocals[i_zero] = 0

self._LP = eigenvectors * np.expand_dims(psuedo_reciprocals, -2)
self._rank = positive_eigenvalues.shape[-1] - i_zero.sum(axis=-1)
self._w = eigenvalues
self._v = eigenvectors
self._dimensionality = eigenvalues.shape[-1]
self._shape = eigenvectors.shape
self._null_basis = eigenvectors * np.expand_dims(i_zero, -2)
self._eps = _multivariate._eigvalsh_to_eps(eigenvalues) * 10**3
self._allow_singular = True

def _whiten(self, x):
return x @ self._LP

@cached_property
def _A(self):
return (self._v * self._w[..., np.newaxis, :]) @ _T(self._v)

def _support_mask(self, x):
"""
Check whether x lies in the support of the distribution.
"""
residual = np.linalg.norm(x @ self._null_basis, axis=-1)
in_support = residual < self._eps
return in_support


class CovViaCov(Covariance):
"""
Representation of a covariance provided via the precision matrix
"""

def __init__(self, cov):
cov = self._validate_matrix(cov, 'cov')

self._factor = _J(np.linalg.cholesky(_J(_T(cov))))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out we can get $x^T \Sigma^{-1} x$ without the _J flipping. See scipygh-16987.

self._log_pdet = 2*np.log(_extract_diag(self._factor)).sum(axis=-1)
self._rank = cov.shape[-1] # must be full rank for cholesky
self._A = cov
self._dimensionality = self._rank
self._shape = cov.shape
self._allow_singular = False

def _whiten(self, x):
res = _solve_triangular(self._factor, x, lower=False)
return res if x.ndim > 1 else res[..., 0, :]


class CovViaPSD(Covariance):
"""
Representation of a covariance provided via an instance of _PSD
"""

def __init__(self, psd):
self._LP = psd.U
self._log_pdet = psd.log_pdet
self._rank = psd.rank
self._A = psd._M
self._dimensionality = psd._M.shape[-1]
self._shape = psd._M.shape
self._psd = psd
self._allow_singular = False # by default

def _whiten(self, x):
return x @ self._LP

def _support_mask(self, x):
return self._psd._support_mask(x)
Loading