Skip to content

Commit 8f710d6

Browse files
author
Rithvik Donnipadu
committed
Test files validation on github actions
1 parent f662d84 commit 8f710d6

24 files changed

+851
-0
lines changed

GetCEScores.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import numpy as np
2+
from scipy.interpolate import interp1d
3+
from ConvertSupport import convert_support
4+
from typing import List, Dict, Any
5+
6+
def get_ce_scores(y: List[np.ndarray], t: List[np.ndarray], optns: Dict[str, Any],
7+
mu: np.ndarray, obs_grid: np.ndarray, fitted_cov: np.ndarray,
8+
lambda_: np.ndarray, phi: np.ndarray, sigma2: float = 0.0) -> List[Dict[str, Any]]:
9+
if lambda_.shape[0] != phi.shape[1]:
10+
raise ValueError("Number of eigenvalues does not match number of eigenfunctions.")
11+
12+
sigma_y = fitted_cov + np.eye(phi.shape[0]) * sigma2
13+
mu_phi_sig = get_mu_phi_sig(t, obs_grid, mu, phi, sigma_y)
14+
15+
results = []
16+
for y_vec, mps in zip(y, mu_phi_sig):
17+
result = get_ind_ce_scores(y_vec, mps['mu_vec'], lambda_, mps['phi_mat'], mps['sigma_yi'],
18+
verbose=optns.get('verbose', False))
19+
results.append(result)
20+
return results
21+
22+
def get_mu_phi_sig(t: List[np.ndarray], obs_grid: np.ndarray, mu: np.ndarray,
23+
phi: np.ndarray, sigma_y: np.ndarray) -> List[Dict[str, Any]]:
24+
mu_interp = interp1d(obs_grid, mu, kind='linear', fill_value="extrapolate")
25+
phi_interps = [
26+
interp1d(obs_grid, phi[:, i], kind='linear', fill_value="extrapolate")
27+
for i in range(phi.shape[1])
28+
]
29+
30+
ret = []
31+
for tvec in t:
32+
if len(tvec) == 0:
33+
ret.append({'mu_vec': np.array([]), 'phi_mat': np.array([]), 'sigma_yi': np.array([])})
34+
continue
35+
36+
mu_vec = mu_interp(tvec)
37+
phi_mat = np.column_stack([interp(tvec) for interp in phi_interps])
38+
sigma_yi = convert_support(obs_grid, tvec, mu=sigma_y)
39+
40+
ret.append({'mu_vec': mu_vec, 'phi_mat': phi_mat, 'sigma_yi': sigma_yi})
41+
42+
return ret
43+
44+
45+
46+
def get_ind_ce_scores(y_vec: np.ndarray, mu_vec: np.ndarray, lam_vec: np.ndarray,
47+
phi_mat: np.ndarray, sigma_yi: np.ndarray,
48+
newy_ind: int = None, verbose: bool = False) -> Dict[str, Any]:
49+
if len(y_vec) == 0:
50+
if verbose:
51+
print("Empty observation found, possibly due to truncation.")
52+
return {
53+
'xi_est': np.full((len(lam_vec),), np.nan),
54+
'xi_var': np.full((len(lam_vec), len(lam_vec)), np.nan),
55+
'fitted_y': np.full((0, 0), np.nan)
56+
}
57+
58+
if newy_ind is not None:
59+
if len(y_vec) != 1:
60+
new_phi = phi_mat[newy_ind, :].reshape(1, -1)
61+
new_mu = mu_vec[newy_ind]
62+
y_vec = np.delete(y_vec, newy_ind)
63+
mu_vec = np.delete(mu_vec, newy_ind)
64+
phi_mat = np.delete(phi_mat, newy_ind, axis=0)
65+
sigma_yi = np.delete(np.delete(sigma_yi, newy_ind, axis=0), newy_ind, axis=1)
66+
return GetIndCEScoresCPPnewInd(y_vec, mu_vec, lam_vec, phi_mat, sigma_yi, new_phi, new_mu)
67+
else:
68+
lam_phi = np.diag(lam_vec) @ phi_mat.T
69+
lam_phi_sig = lam_phi @ np.linalg.inv(sigma_yi)
70+
xi_est = lam_phi_sig @ (y_vec - mu_vec)
71+
xi_var = np.diag(lam_vec) - lam_phi @ lam_phi_sig.T
72+
return {'xi_est': xi_est, 'xi_var': xi_var, 'fitted_y': np.nan}
73+
74+
return GetIndCEScoresCPP(y_vec, mu_vec, lam_vec, phi_mat, sigma_yi)

GetCovDense.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import numpy as np
2+
import pandas as pd
3+
4+
def get_cov_dense(ymat, mu, optns):
5+
"""
6+
Calculate the sample covariance matrix for dense, regular functional data.
7+
8+
Parameters:
9+
- ymat: np.ndarray, shape (n, p) - matrix of dense regular functional data.
10+
- mu: np.ndarray, shape (p,) - estimated cross-sectional mean vector.
11+
- optns: dict - options containing:
12+
* 'dataType' (str): Must be "Dense" or "DenseWithMV".
13+
* 'userMu' (optional, np.ndarray): If provided, adjusts ymat by subtracting this vector.
14+
* 'error' (bool): If True, adjusts diagonal for variance.
15+
* 'userSigma2' (optional, float): User-provided variance for diagonal adjustment.
16+
17+
Returns:
18+
- dict with keys:
19+
* 'rawCov': None (since it's not computed in this function)
20+
* 'smoothCov': np.ndarray - sample covariance matrix on observed grid.
21+
* 'bwCov': None
22+
* 'sigma2': float - estimated variance if 'error' is True; None otherwise.
23+
* 'outGrid': None
24+
"""
25+
26+
if optns['dataType'] not in ['Dense', 'DenseWithMV']:
27+
raise ValueError("Sample Covariance is only applicable for dataType='Dense' or 'DenseWithMV'.")
28+
29+
n, m = ymat.shape
30+
31+
# Adjust ymat by subtracting mu if 'userMu' is provided in options
32+
if optns.get('userMu') is not None:
33+
ymat = ymat - np.tile(mu, (n, 1)) # Repeat mu across rows
34+
K = np.zeros((m, m))
35+
36+
# Compute the covariance matrix manually while handling NaNs
37+
for i in range(m):
38+
for j in range(m):
39+
XcNaNindx = np.isnan(ymat[:, i])
40+
YcNaNindx = np.isnan(ymat[:, j])
41+
NaNrows = np.where(XcNaNindx | YcNaNindx)[0]
42+
indx = np.setdiff1d(np.arange(n), NaNrows)
43+
K[i, j] = np.sum(ymat[indx, i] * ymat[indx, j]) / (n - 1 - len(NaNrows))
44+
else:
45+
# Use pairwise complete observations to calculate covariance if 'userMu' is not provided
46+
K = np.cov(ymat, rowvar=False, bias=False)
47+
48+
# Ensure symmetry of K
49+
K = 0.5 * (K + K.T)
50+
51+
# Check for any NaN in the covariance matrix
52+
if np.isnan(K).any():
53+
raise ValueError("Data is too sparse to be considered DenseWithMV. Remove sparse observations or specify dataType='Sparse' for FPCA.")
54+
55+
sigma2 = None
56+
if optns.get('error', False):
57+
# Use the 2nd order difference method for estimating variance, if not provided
58+
if 'userSigma2' in optns:
59+
sigma2 = optns['userSigma2']
60+
else:
61+
ord_diff = 2
62+
sigma2 = np.mean(np.diff(ymat, n=ord_diff, axis=1)**2, where=~np.isnan(ymat)) / np.math.comb(2 * ord_diff, ord_diff)
63+
np.fill_diagonal(K, np.diag(K) - sigma2)
64+
65+
# Create return dictionary with similar structure to SmoothCov object in R
66+
ret = {
67+
'rawCov': None,
68+
'smoothCov': K,
69+
'bwCov': None,
70+
'sigma2': sigma2,
71+
'outGrid': None
72+
}
73+
74+
return ret

GetEigenAnalysisResults.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import numpy as np
2+
import sys
3+
import os
4+
sys.path.append(os.path.abspath('src'))
5+
from trapzRcpp import trapz
6+
7+
8+
def get_eigen_analysis_results(smoothCov, regGrid, optns, muWork=None):
9+
"""
10+
Perform eigenanalysis on the covariance matrix and select components
11+
based on specified variance explained threshold.
12+
13+
Parameters:
14+
- smoothCov: np.ndarray - covariance matrix
15+
- regGrid: np.ndarray - regular grid for integration
16+
- optns: dict - options containing:
17+
* 'maxK': int, maximum number of principal components
18+
* 'FVEthreshold': float, functional variance explained threshold
19+
* 'FVEfittedCov': float, threshold for fitted covariance, if applicable
20+
* 'verbose': bool, whether to print messages
21+
- muWork: np.ndarray or None, optional mean work vector (default None)
22+
23+
Returns:
24+
- dict with keys:
25+
* 'lambda': np.ndarray - eigenvalues selected
26+
* 'phi': np.ndarray - selected eigenvectors
27+
* 'cumFVE': np.ndarray - cumulative FVE
28+
* 'kChoosen': int - number of components chosen
29+
* 'fittedCov': np.ndarray - fitted covariance
30+
* 'fittedCovUser': np.ndarray or None - fitted covariance with user-specified threshold
31+
* 'fittedCorrUser': np.ndarray or None - correlation matrix if diagonal is non-zero
32+
"""
33+
maxK = optns['maxK']
34+
FVEthreshold = optns['FVEthreshold']
35+
FVEfittedCov = optns.get('FVEfittedCov', None)
36+
verbose = optns['verbose']
37+
38+
gridSize = regGrid[1] - regGrid[0]
39+
numGrids = smoothCov.shape[0]
40+
41+
# Eigen decomposition
42+
eig_values, eig_vectors = np.linalg.eigh(smoothCov)
43+
44+
# Select positive eigenvalues
45+
positive_ind = eig_values >= 0
46+
if np.sum(positive_ind) == 0:
47+
raise ValueError("All eigenvalues are negative. The covariance estimate is incorrect.")
48+
49+
d = eig_values[positive_ind][::-1] # Sort in descending order
50+
eigenV = eig_vectors[:, positive_ind][:, ::-1] # Match ordering with eigenvalues
51+
52+
# Threshold based on maxK
53+
if maxK < len(d):
54+
if verbose:
55+
print(f"At most {len(d)} number of PCs can be selected, thresholded by `maxK` = {maxK}.")
56+
57+
d = d[:maxK]
58+
eigenV = eigenV[:, :maxK]
59+
60+
# Calculate cumulative FVE
61+
FVE = np.cumsum(d) / np.sum(d)
62+
no_opt = np.min(np.where(FVE >= FVEthreshold)[0]) + 1 # Select minimum components for FVE threshold
63+
64+
# Normalization of eigenvectors
65+
if muWork is None:
66+
muWork = np.arange(eigenV.shape[0]) + 1 # Default mean work
67+
68+
def normalize_vector(x):
69+
"""Normalize vector x using trapezoidal integration and adjust sign based on mean."""
70+
x /= np.sqrt(trapz(regGrid, x**2))
71+
return x if np.sum(x * muWork) >= 0 else -x
72+
73+
phi = np.apply_along_axis(normalize_vector, 0, eigenV)
74+
lambda_ = gridSize * d
75+
76+
# Covariance matrix construction
77+
no_fittedCov = np.min(np.where(FVE >= FVEfittedCov)[0]) + 1 if FVEfittedCov is not None else phi.shape[1]
78+
fittedCovUser = phi[:, :no_fittedCov] @ np.diag(lambda_[:no_fittedCov]) @ phi[:, :no_fittedCov].T
79+
fittedCov = phi @ np.diag(lambda_) @ phi.T
80+
81+
# Fitted correlation matrix
82+
if np.any(np.diag(fittedCovUser) == 0):
83+
fittedCorrUser = None
84+
else:
85+
diag_sqrt_inv = np.diag(1 / np.sqrt(np.diag(fittedCovUser)))
86+
fittedCorrUser = diag_sqrt_inv @ fittedCovUser @ diag_sqrt_inv
87+
np.fill_diagonal(fittedCorrUser, 1)
88+
89+
return {
90+
'lambda': lambda_[:no_opt],
91+
'phi': phi[:, :no_opt],
92+
'cumFVE': FVE,
93+
'kChoosen': no_opt,
94+
'fittedCov': fittedCov,
95+
'fittedCovUser': fittedCovUser,
96+
'fittedCorrUser': fittedCorrUser
97+
}

GetRawCov.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import numpy as np
2+
from scipy import linalg
3+
4+
def uniqueM(x):
5+
""" Helper function to map values in x to unique integers """
6+
unique_vals = np.unique(x)
7+
id1 = np.zeros(len(x), dtype=int)
8+
for i, val in enumerate(unique_vals):
9+
id1[np.where(x == val)[0]] = i + 1
10+
return id1
11+
12+
def meshgrid(x, y):
13+
""" Custom meshgrid function to replicate R's meshgrid functionality """
14+
X, Y = np.meshgrid(x, y)
15+
return {'X': X, 'Y': Y}
16+
17+
def GetRawCov(y, t, obsGridnew, mu, dataType, error):
18+
"""
19+
Obtain raw covariance.
20+
21+
Parameters:
22+
- y: list of n arrays (repeated measurements for n subjects)
23+
- t: list of n arrays (time points for n subjects)
24+
- obsGridnew: array of m time points corresponding to mu
25+
- mu: array of fitted mean functions (corresponding to pooled unique time points from t)
26+
- dataType: output of IsRegular() (should be one of 'Sparse', 'DenseWithMV', 'Dense', 'RegularWithMV')
27+
- error: boolean flag (True if measurement error assumption is applied, False otherwise)
28+
29+
Returns:
30+
A dictionary containing:
31+
- tPairs: (N, 2) matrix of pairs of time points for subjects
32+
- cxxn: 1D array of raw covariance corresponding to tPairs
33+
- indx: 1D array of indices for each subject
34+
- win: 1D array of weights for 2-D smoother (if required)
35+
- cyy: 1D array of raw covariance for all pairs of time points
36+
- diag: 2-column matrix for raw covariance along diagonal if error == True
37+
"""
38+
39+
ncohort = len(y)
40+
obsGrid = np.sort(np.unique(np.concatenate(t))) # sort and flatten the time points
41+
mu_interpolated = np.interp(obsGrid, obsGridnew, mu) # interpolate mu to match obsGrid
42+
count = None
43+
indx = None
44+
diag = None
45+
46+
if dataType in ['Sparse', 'DenseWithMV']:
47+
Ys = [meshgrid(yi, t[i]) for i, yi in enumerate(y)]
48+
Xs = [meshgrid(ti, t[i]) for i, ti in enumerate(t)]
49+
50+
# Vectorize the grids for y & t
51+
xx1 = np.concatenate([x['X'].flatten() for x in Xs])
52+
xx2 = np.concatenate([x['Y'].flatten() for x in Xs])
53+
yy2 = np.concatenate([y['Y'].flatten() for y in Ys])
54+
yy1 = np.concatenate([y['X'].flatten() for y in Ys])
55+
56+
# Get id1/2 such that xx1/2 = q(id1/2), where q = unique(xx1/2)
57+
id1 = uniqueM(xx1)
58+
id2 = uniqueM(xx2)
59+
cyy = (yy1 - mu_interpolated[id1]) * (yy2 - mu_interpolated[id2])
60+
61+
# Index for subject i
62+
indx = np.repeat(np.arange(len(y)), [len(yi) ** 2 for yi in y])
63+
64+
tPairs = np.column_stack([xx1, xx2])
65+
66+
if error:
67+
tneq = np.where(xx1 != xx2)[0]
68+
teq = np.where(xx1 == xx2)[0]
69+
indx = indx[tneq]
70+
diag = np.column_stack([tPairs[teq, 0], cyy[teq]])
71+
tPairs = tPairs[tneq]
72+
cxxn = cyy[tneq]
73+
else:
74+
cxxn = cyy
75+
76+
elif dataType == 'Dense':
77+
yy = np.array([np.ravel(yi) for yi in y]).T
78+
MU = np.tile(mu, (len(y), 1)).T
79+
t1 = t[0]
80+
81+
yy = yy - MU
82+
cyy = np.dot(yy.T, yy) / ncohort
83+
cyy = cyy.flatten()
84+
85+
cxxn = cyy
86+
xxyy = meshgrid(t1, t1) # Create meshgrid for t1
87+
88+
tPairs = np.column_stack([xxyy['X'].flatten(), xxyy['Y'].flatten()])
89+
90+
if error:
91+
tneq = np.where(tPairs[:, 0] != tPairs[:, 1])[0]
92+
teq = np.where(tPairs[:, 0] == tPairs[:, 1])[0]
93+
diag = np.column_stack([tPairs[teq, 0], cyy[teq]])
94+
tPairs = tPairs[tneq]
95+
cxxn = cyy[tneq]
96+
else:
97+
cxxn = cyy
98+
99+
elif dataType == 'RegularWithMV':
100+
raise ValueError("This is not implemented yet. Contact Pantelis!")
101+
102+
else:
103+
raise ValueError("Invalid 'dataType' argument type")
104+
105+
result = {
106+
'tPairs': tPairs,
107+
'cxxn': cxxn,
108+
'indx': indx,
109+
'cyy': cyy,
110+
'diag': diag,
111+
'count': count,
112+
'error': error,
113+
'dataType': dataType
114+
}
115+
116+
return result

PC_CovE.py

Whitespace-only changes.
7.12 KB
Binary file not shown.
166 Bytes
Binary file not shown.
5.53 KB
Binary file not shown.

__pycache__/Wiener.cpython-311.pyc

1.77 KB
Binary file not shown.
35 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)