diff --git a/pgmpy/factors/hybrid/Adapters.py b/pgmpy/factors/hybrid/Adapters.py new file mode 100644 index 000000000..22d122f8a --- /dev/null +++ b/pgmpy/factors/hybrid/Adapters.py @@ -0,0 +1,104 @@ +import numpy as np + +from pgmpy.estimators import MaximumLikelihoodEstimator as MLE +from pgmpy.factors.continuous import LinearGaussianCPD +from pgmpy.factors.discrete import TabularCPD + + +class TabularAdapter: + """ + Adapter for fitting and representing tabular CPDs. + """ + + def __init__(self, variable, estimator=None, parents=None): + self.variable = variable + self.estimator = estimator + self.parents = parents if parents is not None else [] + + def fit(self, data): + if self.estimator not in ("MLE", MLE): + raise ValueError("For tabular tag, only MLE estimator is currently supported.") + + variable_states = sorted(data[self.variable].dropna().unique()) + if not self.parents: + counts = data[self.variable].value_counts().reindex(variable_states, fill_value=0) + probs = counts.values / counts.values.sum() + values = [[prob] for prob in probs] + self.fitted_cpd_ = TabularCPD(variable=self.variable, variable_card=len(variable_states), values=values) + return self + + parent_states = [sorted(data[parent].dropna().unique()) for parent in self.parents] + grouped = ( + data.groupby(self.parents + [self.variable], dropna=False) + .size() + .unstack(self.variable, fill_value=0) + .reindex(columns=variable_states, fill_value=0) + ) + grouped = grouped.T + grouped = grouped / grouped.sum(axis=0).replace(0, 1) + self.fitted_cpd_ = TabularCPD( + variable=self.variable, + variable_card=len(variable_states), + values=grouped.values, + evidence=self.parents, + evidence_card=[len(states) for states in parent_states], + ) + return self + + def __repr__(self): + cpd = self.fitted_cpd_ + var_str = f"" + + +class LinearGaussianAdapter: + """ + Adapter for fitting and representing linear Gaussian CPDs. + """ + + def __init__(self, variable, estimator=None, parents=None): + self.variable = variable + self.estimator = estimator + self.parents = parents if parents is not None else [] + + def fit(self, data): + if self.estimator not in ("MLE", "OLS", None): + raise ValueError(f"For linear tag, MLE/OLS is supported. Got {self.estimator}") + + target_data = data[self.variable].values + if not self.parents: + mean = np.mean(target_data) + std = np.std(target_data) + beta = [mean] + else: + evidence_data = data[self.parents].values + design_matrix = np.c_[np.ones(evidence_data.shape[0]), evidence_data] + + beta, residuals, _, _ = np.linalg.lstsq(design_matrix, target_data, rcond=None) + if len(residuals) > 0: + variance = residuals[0] / len(target_data) + else: + predictions = design_matrix @ beta + variance = np.mean((target_data - predictions) ** 2) + std = np.sqrt(variance) + + self.fitted_cpd_ = LinearGaussianCPD(variable=self.variable, beta=beta, std=std, evidence=self.parents) + return self + + def __repr__(self): + cpd = self.fitted_cpd_ + beta_str = f"{cpd.beta[0]:.3f}" + for i, parent in enumerate(cpd.evidence): + beta_str += f" + {cpd.beta[i + 1]:.3f}*{parent}" + + return ( + f"" + ) diff --git a/pgmpy/factors/hybrid/FunctionalCPD_Refactor.py b/pgmpy/factors/hybrid/FunctionalCPD_Refactor.py index edf929a73..6d15699f8 100644 --- a/pgmpy/factors/hybrid/FunctionalCPD_Refactor.py +++ b/pgmpy/factors/hybrid/FunctionalCPD_Refactor.py @@ -1,9 +1,5 @@ -import numpy as np - -from pgmpy.estimators import MaximumLikelihoodEstimator as MLE +from pgmpy.factors.hybrid.Adapters import LinearGaussianAdapter, TabularAdapter from pgmpy.factors.base import BaseFactor -from pgmpy.factors.continuous import LinearGaussianCPD -from pgmpy.factors.discrete import TabularCPD from pgmpy.factors.hybrid.SkproAdapter import SkproAdapter @@ -34,66 +30,19 @@ def fit(self, data, target=None, parents=None): return self def _fit_tabular(self): - if self.estimator not in ("MLE", MLE): - raise ValueError("For tabular tag, only MLE estimator is currently supported.") - - variable_states = sorted(self.data_[self.variable].dropna().unique()) - if not self.parents_: - counts = self.data_[self.variable].value_counts().reindex(variable_states, fill_value=0) - probs = counts.values / counts.values.sum() - values = [[prob] for prob in probs] - self.fitted_cpd_ = TabularCPD(variable=self.variable, variable_card=len(variable_states), values=values) - return - - parent_states = [sorted(self.data_[parent].dropna().unique()) for parent in self.parents_] - grouped = ( - self.data_.groupby(self.parents_ + [self.variable], dropna=False) - .size() - .unstack(self.variable, fill_value=0) - .reindex(columns=variable_states, fill_value=0) - ) - grouped = grouped.T - grouped = grouped / grouped.sum(axis=0).replace(0, 1) - self.fitted_cpd_ = TabularCPD( - variable=self.variable, - variable_card=len(variable_states), - values=grouped.values, - evidence=self.parents_, - evidence_card=[len(states) for states in parent_states], - ) + self.adapter_ = TabularAdapter(variable=self.variable, estimator=self.estimator, parents=self.parents_) + self.fitted_cpd_ = self.adapter_.fit(self.data_).fitted_cpd_ def _fit_linear(self): - if self.estimator not in ("MLE", "OLS", None): - raise ValueError(f"For linear tag, MLE/OLS is supported. Got {self.estimator}") - - target_data = self.data_[self.variable].values - - if not self.parents_: - mean = np.mean(target_data) - std = np.std(target_data) - beta = [mean] - else: - evidence_data = self.data_[self.parents_].values - - X = np.c_[np.ones(evidence_data.shape[0]), evidence_data] - - beta, residuals, rank, s = np.linalg.lstsq(X, target_data, rcond=None) - if len(residuals) > 0: - variance = residuals[0] / len(target_data) - else: - predictions = X @ beta - variance = np.mean((target_data - predictions) ** 2) - std = np.sqrt(variance) - - self.fitted_cpd_ = LinearGaussianCPD(variable=self.variable, beta=beta, std=std, evidence=self.parents_) + self.adapter_ = LinearGaussianAdapter(variable=self.variable, estimator=self.estimator, parents=self.parents_) + self.fitted_cpd_ = self.adapter_.fit(self.data_).fitted_cpd_ def _fit_external_ml(self): if self.estimator is None: raise ValueError("For skpro tag, `estimator` must be provided.") - self.fitted_cpd_ = SkproAdapter(variable=self.variable, model=self.estimator, parents=self.parents_).fit( - self.data_ - ) + self.adapter_ = SkproAdapter(variable=self.variable, model=self.estimator, parents=self.parents_).fit(self.data_) + self.fitted_cpd_ = self.adapter_ def __repr__(self): if not getattr(self, "is_fitted_", False): @@ -103,30 +52,8 @@ def __repr__(self): f"tag='{tag_display}', status='unfitted') at {hex(id(self))}>" ) - if self.tag_name_ == "tabular" and hasattr(self, "fitted_cpd_"): - cpd = self.fitted_cpd_ - var_str = f"" - - elif self.tag_name_ == "linear" and hasattr(self, "fitted_cpd_"): - cpd = self.fitted_cpd_ - beta_str = f"{cpd.beta[0]:.3f}" # Intercept - for i, parent in enumerate(cpd.evidence): - beta_str += f" + {cpd.beta[i + 1]:.3f}*{parent}" - - return ( - f"" - ) + if self.tag_name_ in {"tabular", "linear"} and hasattr(self, "adapter_"): + return repr(self.adapter_) return ( f"" diff --git a/pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py b/pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py index 004f019a2..8141ee562 100644 --- a/pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py +++ b/pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py @@ -3,6 +3,7 @@ import pytest from pgmpy.estimators import MaximumLikelihoodEstimator as MLE +from pgmpy.factors.hybrid.Adapters import LinearGaussianAdapter, TabularAdapter from pgmpy.factors.hybrid.FunctionalCPD_Refactor import FunctionalCPD from pgmpy.factors.hybrid.SkproAdapter import SkproAdapter from pgmpy.models.FunctionalBayesianNetwork_Refactor import FunctionalBayesianNetwork as FunctionalBN @@ -45,6 +46,31 @@ def test_fit_learns_mixed_tabular_and_linear_cpds(): np.testing.assert_allclose(fitted_d.std, 1.0, atol=0.15) +def test_fit_uses_tabular_and_linear_adapters(): + rng = np.random.default_rng(21) + data = pd.DataFrame( + { + "A": rng.integers(0, 2, size=80), + "B": rng.normal(size=80), + "C": rng.normal(size=80), + } + ) + + model = FunctionalBN([("A", "C"), ("B", "C")]) + cpd_a = FunctionalCPD(variable="A", tag="tabular", estimator=MLE) + cpd_b = FunctionalCPD(variable="B", tag="linear", estimator="MLE") + cpd_c = FunctionalCPD(variable="C", tag="linear", estimator="MLE") + model.add_cpds(cpd_a, cpd_b, cpd_c) + model.fit(data) + + fitted_a = model.get_cpds("A") + fitted_b = model.get_cpds("B") + assert isinstance(fitted_a.adapter_, TabularAdapter) + assert isinstance(fitted_b.adapter_, LinearGaussianAdapter) + assert "FunctionalCPD(tabular)" in repr(fitted_a) + assert "FunctionalCPD(linear)" in repr(fitted_b) + + class DummySkproRegressor: def fit(self, X, y): self.was_fit = True