diff --git a/pgmpy/factors/hybrid/FunctionalCPD_Refactor.py b/pgmpy/factors/hybrid/FunctionalCPD_Refactor.py index edf929a73..b38dec818 100644 --- a/pgmpy/factors/hybrid/FunctionalCPD_Refactor.py +++ b/pgmpy/factors/hybrid/FunctionalCPD_Refactor.py @@ -1,10 +1,7 @@ -import numpy as np - -from pgmpy.estimators import MaximumLikelihoodEstimator as MLE from pgmpy.factors.base import BaseFactor -from pgmpy.factors.continuous import LinearGaussianCPD -from pgmpy.factors.discrete import TabularCPD +from pgmpy.factors.hybrid.LinearGaussianAdapter import LinearGaussianAdapter from pgmpy.factors.hybrid.SkproAdapter import SkproAdapter +from pgmpy.factors.hybrid.TabularAdapter import TabularAdapter class FunctionalCPD(BaseFactor): @@ -34,58 +31,18 @@ def fit(self, data, target=None, parents=None): return self def _fit_tabular(self): - if self.estimator not in ("MLE", MLE): - raise ValueError("For tabular tag, only MLE estimator is currently supported.") - - variable_states = sorted(self.data_[self.variable].dropna().unique()) - if not self.parents_: - counts = self.data_[self.variable].value_counts().reindex(variable_states, fill_value=0) - probs = counts.values / counts.values.sum() - values = [[prob] for prob in probs] - self.fitted_cpd_ = TabularCPD(variable=self.variable, variable_card=len(variable_states), values=values) - return - - parent_states = [sorted(self.data_[parent].dropna().unique()) for parent in self.parents_] - grouped = ( - self.data_.groupby(self.parents_ + [self.variable], dropna=False) - .size() - .unstack(self.variable, fill_value=0) - .reindex(columns=variable_states, fill_value=0) - ) - grouped = grouped.T - grouped = grouped / grouped.sum(axis=0).replace(0, 1) - self.fitted_cpd_ = TabularCPD( + self.fitted_cpd_ = TabularAdapter( variable=self.variable, - variable_card=len(variable_states), - values=grouped.values, - evidence=self.parents_, - evidence_card=[len(states) for states in parent_states], - ) + estimator=self.estimator, + parents=self.parents_, + ).fit(self.data_) def _fit_linear(self): - if self.estimator not in ("MLE", "OLS", None): - raise ValueError(f"For linear tag, MLE/OLS is supported. Got {self.estimator}") - - target_data = self.data_[self.variable].values - - if not self.parents_: - mean = np.mean(target_data) - std = np.std(target_data) - beta = [mean] - else: - evidence_data = self.data_[self.parents_].values - - X = np.c_[np.ones(evidence_data.shape[0]), evidence_data] - - beta, residuals, rank, s = np.linalg.lstsq(X, target_data, rcond=None) - if len(residuals) > 0: - variance = residuals[0] / len(target_data) - else: - predictions = X @ beta - variance = np.mean((target_data - predictions) ** 2) - std = np.sqrt(variance) - - self.fitted_cpd_ = LinearGaussianCPD(variable=self.variable, beta=beta, std=std, evidence=self.parents_) + self.fitted_cpd_ = LinearGaussianAdapter( + variable=self.variable, + estimator=self.estimator, + parents=self.parents_, + ).fit(self.data_) def _fit_external_ml(self): if self.estimator is None: @@ -103,30 +60,8 @@ def __repr__(self): f"tag='{tag_display}', status='unfitted') at {hex(id(self))}>" ) - if self.tag_name_ == "tabular" and hasattr(self, "fitted_cpd_"): - cpd = self.fitted_cpd_ - var_str = f"" - - elif self.tag_name_ == "linear" and hasattr(self, "fitted_cpd_"): - cpd = self.fitted_cpd_ - beta_str = f"{cpd.beta[0]:.3f}" # Intercept - for i, parent in enumerate(cpd.evidence): - beta_str += f" + {cpd.beta[i + 1]:.3f}*{parent}" - - return ( - f"" - ) + if hasattr(self, "fitted_cpd_"): + return f"" return ( f"" diff --git a/pgmpy/factors/hybrid/LinearGaussianAdapter.py b/pgmpy/factors/hybrid/LinearGaussianAdapter.py new file mode 100644 index 000000000..404577073 --- /dev/null +++ b/pgmpy/factors/hybrid/LinearGaussianAdapter.py @@ -0,0 +1,53 @@ +import numpy as np + +from pgmpy.factors.continuous import LinearGaussianCPD + + +class LinearGaussianAdapter: + """ + Adapter that fits data into a `LinearGaussianCPD`. + """ + + def __init__(self, variable, estimator=None, parents=None): + self.variable = variable + self.estimator = estimator + self.parents = parents if parents is not None else [] + + def fit(self, data): + if self.estimator not in ("MLE", "OLS", None): + raise ValueError(f"For linear tag, MLE/OLS is supported. Got {self.estimator}") + + target_data = data[self.variable].values + + if not self.parents: + mean = np.mean(target_data) + std = np.std(target_data) + beta = [mean] + else: + evidence_data = data[self.parents].values + X = np.c_[np.ones(evidence_data.shape[0]), evidence_data] + + beta, residuals, rank, s = np.linalg.lstsq(X, target_data, rcond=None) + if len(residuals) > 0: + variance = residuals[0] / len(target_data) + else: + predictions = X @ beta + variance = np.mean((target_data - predictions) ** 2) + std = np.sqrt(variance) + + self.fitted_cpd_ = LinearGaussianCPD(variable=self.variable, beta=beta, std=std, evidence=self.parents) + return self + + def __repr__(self): + if not hasattr(self, "fitted_cpd_"): + return f"" + + cpd = self.fitted_cpd_ + beta_str = f"{cpd.beta[0]:.3f}" + for i, parent in enumerate(cpd.evidence): + beta_str += f" + {cpd.beta[i + 1]:.3f}*{parent}" + + return ( + f"" + ) diff --git a/pgmpy/factors/hybrid/TabularAdapter.py b/pgmpy/factors/hybrid/TabularAdapter.py new file mode 100644 index 000000000..90559ca31 --- /dev/null +++ b/pgmpy/factors/hybrid/TabularAdapter.py @@ -0,0 +1,60 @@ +from pgmpy.estimators import MaximumLikelihoodEstimator as MLE +from pgmpy.factors.discrete import TabularCPD + + +class TabularAdapter: + """ + Adapter that fits data into a `TabularCPD`. + """ + + def __init__(self, variable, estimator, parents=None): + self.variable = variable + self.estimator = estimator + self.parents = parents if parents is not None else [] + + def fit(self, data): + if self.estimator not in ("MLE", MLE): + raise ValueError("For tabular tag, only MLE estimator is currently supported.") + + variable_states = sorted(data[self.variable].dropna().unique()) + if not self.parents: + counts = data[self.variable].value_counts().reindex(variable_states, fill_value=0) + probs = counts.values / counts.values.sum() + values = [[prob] for prob in probs] + self.fitted_cpd_ = TabularCPD(variable=self.variable, variable_card=len(variable_states), values=values) + return self + + parent_states = [sorted(data[parent].dropna().unique()) for parent in self.parents] + grouped = ( + data.groupby(self.parents + [self.variable], dropna=False) + .size() + .unstack(self.variable, fill_value=0) + .reindex(columns=variable_states, fill_value=0) + ) + grouped = grouped.T + grouped = grouped / grouped.sum(axis=0).replace(0, 1) + self.fitted_cpd_ = TabularCPD( + variable=self.variable, + variable_card=len(variable_states), + values=grouped.values, + evidence=self.parents, + evidence_card=[len(states) for states in parent_states], + ) + return self + + def __repr__(self): + if not hasattr(self, "fitted_cpd_"): + return f"" + + cpd = self.fitted_cpd_ + var_str = f"" diff --git a/pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py b/pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py index 004f019a2..09b927d37 100644 --- a/pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py +++ b/pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py @@ -3,8 +3,10 @@ import pytest from pgmpy.estimators import MaximumLikelihoodEstimator as MLE +from pgmpy.factors.hybrid.LinearGaussianAdapter import LinearGaussianAdapter from pgmpy.factors.hybrid.FunctionalCPD_Refactor import FunctionalCPD from pgmpy.factors.hybrid.SkproAdapter import SkproAdapter +from pgmpy.factors.hybrid.TabularAdapter import TabularAdapter from pgmpy.models.FunctionalBayesianNetwork_Refactor import FunctionalBayesianNetwork as FunctionalBN @@ -34,15 +36,20 @@ def test_fit_learns_mixed_tabular_and_linear_cpds(): fitted_c = model.get_cpds("C").fitted_cpd_ fitted_d = model.get_cpds("D").fitted_cpd_ - np.testing.assert_allclose(fitted_a.get_values().reshape(-1), [0.5, 0.5], atol=0.08) - np.testing.assert_allclose(fitted_b.get_values().reshape(-1), [0.5, 0.5], atol=0.08) + assert isinstance(fitted_a, TabularAdapter) + assert isinstance(fitted_b, TabularAdapter) + assert isinstance(fitted_c, LinearGaussianAdapter) + assert isinstance(fitted_d, LinearGaussianAdapter) - np.testing.assert_allclose(fitted_c.beta, [5.0], atol=0.2) - np.testing.assert_allclose(fitted_c.std, 2.0, atol=0.2) + np.testing.assert_allclose(fitted_a.fitted_cpd_.get_values().reshape(-1), [0.5, 0.5], atol=0.08) + np.testing.assert_allclose(fitted_b.fitted_cpd_.get_values().reshape(-1), [0.5, 0.5], atol=0.08) - np.testing.assert_allclose(fitted_d.beta[0], 0.0, atol=0.25) - np.testing.assert_allclose(fitted_d.beta[1:], [2.5, -1.5, 3.0], atol=0.2) - np.testing.assert_allclose(fitted_d.std, 1.0, atol=0.15) + np.testing.assert_allclose(fitted_c.fitted_cpd_.beta, [5.0], atol=0.2) + np.testing.assert_allclose(fitted_c.fitted_cpd_.std, 2.0, atol=0.2) + + np.testing.assert_allclose(fitted_d.fitted_cpd_.beta[0], 0.0, atol=0.25) + np.testing.assert_allclose(fitted_d.fitted_cpd_.beta[1:], [2.5, -1.5, 3.0], atol=0.2) + np.testing.assert_allclose(fitted_d.fitted_cpd_.std, 1.0, atol=0.15) class DummySkproRegressor: