Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 13 additions & 78 deletions pgmpy/factors/hybrid/FunctionalCPD_Refactor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import numpy as np

from pgmpy.estimators import MaximumLikelihoodEstimator as MLE
from pgmpy.factors.base import BaseFactor
from pgmpy.factors.continuous import LinearGaussianCPD
from pgmpy.factors.discrete import TabularCPD
from pgmpy.factors.hybrid.LinearGaussianAdapter import LinearGaussianAdapter
from pgmpy.factors.hybrid.SkproAdapter import SkproAdapter
from pgmpy.factors.hybrid.TabularAdapter import TabularAdapter


class FunctionalCPD(BaseFactor):
Expand Down Expand Up @@ -34,58 +31,18 @@ def fit(self, data, target=None, parents=None):
return self

def _fit_tabular(self):
if self.estimator not in ("MLE", MLE):
raise ValueError("For tabular tag, only MLE estimator is currently supported.")

variable_states = sorted(self.data_[self.variable].dropna().unique())
if not self.parents_:
counts = self.data_[self.variable].value_counts().reindex(variable_states, fill_value=0)
probs = counts.values / counts.values.sum()
values = [[prob] for prob in probs]
self.fitted_cpd_ = TabularCPD(variable=self.variable, variable_card=len(variable_states), values=values)
return

parent_states = [sorted(self.data_[parent].dropna().unique()) for parent in self.parents_]
grouped = (
self.data_.groupby(self.parents_ + [self.variable], dropna=False)
.size()
.unstack(self.variable, fill_value=0)
.reindex(columns=variable_states, fill_value=0)
)
grouped = grouped.T
grouped = grouped / grouped.sum(axis=0).replace(0, 1)
self.fitted_cpd_ = TabularCPD(
self.fitted_cpd_ = TabularAdapter(
variable=self.variable,
variable_card=len(variable_states),
values=grouped.values,
evidence=self.parents_,
evidence_card=[len(states) for states in parent_states],
)
estimator=self.estimator,
parents=self.parents_,
).fit(self.data_)

def _fit_linear(self):
if self.estimator not in ("MLE", "OLS", None):
raise ValueError(f"For linear tag, MLE/OLS is supported. Got {self.estimator}")

target_data = self.data_[self.variable].values

if not self.parents_:
mean = np.mean(target_data)
std = np.std(target_data)
beta = [mean]
else:
evidence_data = self.data_[self.parents_].values

X = np.c_[np.ones(evidence_data.shape[0]), evidence_data]

beta, residuals, rank, s = np.linalg.lstsq(X, target_data, rcond=None)
if len(residuals) > 0:
variance = residuals[0] / len(target_data)
else:
predictions = X @ beta
variance = np.mean((target_data - predictions) ** 2)
std = np.sqrt(variance)

self.fitted_cpd_ = LinearGaussianCPD(variable=self.variable, beta=beta, std=std, evidence=self.parents_)
self.fitted_cpd_ = LinearGaussianAdapter(
variable=self.variable,
estimator=self.estimator,
parents=self.parents_,
).fit(self.data_)

def _fit_external_ml(self):
if self.estimator is None:
Expand All @@ -103,30 +60,8 @@ def __repr__(self):
f"tag='{tag_display}', status='unfitted') at {hex(id(self))}>"
)

if self.tag_name_ == "tabular" and hasattr(self, "fitted_cpd_"):
cpd = self.fitted_cpd_
var_str = f"<FunctionalCPD(tabular) representing P({cpd.variable}:{cpd.variable_card}"

evidence = cpd.variables[1:]
evidence_card = cpd.cardinality[1:]

if evidence:
evidence_str = " | " + ", ".join([f"{var}:{card}" for var, card in zip(evidence, evidence_card)])
else:
evidence_str = ""

return var_str + evidence_str + f") at {hex(id(self))}>"

elif self.tag_name_ == "linear" and hasattr(self, "fitted_cpd_"):
cpd = self.fitted_cpd_
beta_str = f"{cpd.beta[0]:.3f}" # Intercept
for i, parent in enumerate(cpd.evidence):
beta_str += f" + {cpd.beta[i + 1]:.3f}*{parent}"

return (
f"<FunctionalCPD(linear) representing P({cpd.variable} | {', '.join(cpd.evidence)}) "
f"~ N({beta_str}, std={cpd.std:.3f}) at {hex(id(self))}>"
)
if hasattr(self, "fitted_cpd_"):
return f"<FunctionalCPD(tag='{self.tag_name_}', fitted={self.fitted_cpd_}) at {hex(id(self))}>"

return (
f"<FunctionalCPD(variable='{self.variable}', tag='{self.tag_name_}', status='fitted') at {hex(id(self))}>"
Expand Down
53 changes: 53 additions & 0 deletions pgmpy/factors/hybrid/LinearGaussianAdapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import numpy as np

from pgmpy.factors.continuous import LinearGaussianCPD


class LinearGaussianAdapter:
"""
Adapter that fits data into a `LinearGaussianCPD`.
"""

def __init__(self, variable, estimator=None, parents=None):
self.variable = variable
self.estimator = estimator
self.parents = parents if parents is not None else []

def fit(self, data):
if self.estimator not in ("MLE", "OLS", None):
raise ValueError(f"For linear tag, MLE/OLS is supported. Got {self.estimator}")

target_data = data[self.variable].values

if not self.parents:
mean = np.mean(target_data)
std = np.std(target_data)
beta = [mean]
else:
evidence_data = data[self.parents].values
X = np.c_[np.ones(evidence_data.shape[0]), evidence_data]

beta, residuals, rank, s = np.linalg.lstsq(X, target_data, rcond=None)
if len(residuals) > 0:
variance = residuals[0] / len(target_data)
else:
predictions = X @ beta
variance = np.mean((target_data - predictions) ** 2)
std = np.sqrt(variance)

self.fitted_cpd_ = LinearGaussianCPD(variable=self.variable, beta=beta, std=std, evidence=self.parents)
return self

def __repr__(self):
if not hasattr(self, "fitted_cpd_"):
return f"<LinearGaussianAdapter(variable='{self.variable}', status='unfitted') at {hex(id(self))}>"

cpd = self.fitted_cpd_
beta_str = f"{cpd.beta[0]:.3f}"
for i, parent in enumerate(cpd.evidence):
beta_str += f" + {cpd.beta[i + 1]:.3f}*{parent}"

return (
f"<LinearGaussianAdapter representing P({cpd.variable} | {', '.join(cpd.evidence)}) "
f"~ N({beta_str}, std={cpd.std:.3f}) at {hex(id(self))}>"
)
Comment on lines +50 to +53
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current __repr__ format for a fitted model without evidence variables results in P(variable | ), which is slightly awkward. It would be cleaner to omit the | when there are no evidence variables, similar to how it's handled in TabularAdapter.

Suggested change
return (
f"<LinearGaussianAdapter representing P({cpd.variable} | {', '.join(cpd.evidence)}) "
f"~ N({beta_str}, std={cpd.std:.3f}) at {hex(id(self))}>"
)
evidence_str = f" | {', '.join(cpd.evidence)}" if cpd.evidence else ""
return (
f"<LinearGaussianAdapter representing P({cpd.variable}{evidence_str}) "
f"~ N({beta_str}, std={cpd.std:.3f}) at {hex(id(self))}>"
)

60 changes: 60 additions & 0 deletions pgmpy/factors/hybrid/TabularAdapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from pgmpy.estimators import MaximumLikelihoodEstimator as MLE
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The file uses numpy features (e.g., counts.values.sum()), and a suggested fix below requires an explicit np reference. It's best practice to add import numpy as np at the top of the file for clarity and to support the fix.

Suggested change
from pgmpy.estimators import MaximumLikelihoodEstimator as MLE
import numpy as np
from pgmpy.estimators import MaximumLikelihoodEstimator as MLE

from pgmpy.factors.discrete import TabularCPD


class TabularAdapter:
"""
Adapter that fits data into a `TabularCPD`.
"""

def __init__(self, variable, estimator, parents=None):
self.variable = variable
self.estimator = estimator
self.parents = parents if parents is not None else []

def fit(self, data):
if self.estimator not in ("MLE", MLE):
raise ValueError("For tabular tag, only MLE estimator is currently supported.")

variable_states = sorted(data[self.variable].dropna().unique())
if not self.parents:
counts = data[self.variable].value_counts().reindex(variable_states, fill_value=0)
probs = counts.values / counts.values.sum()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If counts.values.sum() is zero (e.g., when the input data for the variable is empty or all NaN), this division will result in NaN values and a RuntimeWarning. The case with parents handles division by zero by using .replace(0, 1). A similar safeguard should be added here. Using np.nan_to_num will safely convert NaN to 0.0.

Suggested change
probs = counts.values / counts.values.sum()
probs = np.nan_to_num(counts.values / counts.values.sum())

values = [[prob] for prob in probs]
self.fitted_cpd_ = TabularCPD(variable=self.variable, variable_card=len(variable_states), values=values)
return self

parent_states = [sorted(data[parent].dropna().unique()) for parent in self.parents]
grouped = (
data.groupby(self.parents + [self.variable], dropna=False)
.size()
.unstack(self.variable, fill_value=0)
.reindex(columns=variable_states, fill_value=0)
)
grouped = grouped.T
grouped = grouped / grouped.sum(axis=0).replace(0, 1)
self.fitted_cpd_ = TabularCPD(
variable=self.variable,
variable_card=len(variable_states),
values=grouped.values,
evidence=self.parents,
evidence_card=[len(states) for states in parent_states],
)
return self

def __repr__(self):
if not hasattr(self, "fitted_cpd_"):
return f"<TabularAdapter(variable='{self.variable}', status='unfitted') at {hex(id(self))}>"

cpd = self.fitted_cpd_
var_str = f"<TabularAdapter representing P({cpd.variable}:{cpd.variable_card}"

evidence = cpd.variables[1:]
evidence_card = cpd.cardinality[1:]

if evidence:
evidence_str = " | " + ", ".join([f"{var}:{card}" for var, card in zip(evidence, evidence_card)])
else:
evidence_str = ""

return var_str + evidence_str + f") at {hex(id(self))}>"
21 changes: 14 additions & 7 deletions pgmpy/tests/test_models/test_FunctionalBayesianNetwork_Refactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import pytest

from pgmpy.estimators import MaximumLikelihoodEstimator as MLE
from pgmpy.factors.hybrid.LinearGaussianAdapter import LinearGaussianAdapter
from pgmpy.factors.hybrid.FunctionalCPD_Refactor import FunctionalCPD
from pgmpy.factors.hybrid.SkproAdapter import SkproAdapter
from pgmpy.factors.hybrid.TabularAdapter import TabularAdapter
from pgmpy.models.FunctionalBayesianNetwork_Refactor import FunctionalBayesianNetwork as FunctionalBN


Expand Down Expand Up @@ -34,15 +36,20 @@ def test_fit_learns_mixed_tabular_and_linear_cpds():
fitted_c = model.get_cpds("C").fitted_cpd_
fitted_d = model.get_cpds("D").fitted_cpd_

np.testing.assert_allclose(fitted_a.get_values().reshape(-1), [0.5, 0.5], atol=0.08)
np.testing.assert_allclose(fitted_b.get_values().reshape(-1), [0.5, 0.5], atol=0.08)
assert isinstance(fitted_a, TabularAdapter)
assert isinstance(fitted_b, TabularAdapter)
assert isinstance(fitted_c, LinearGaussianAdapter)
assert isinstance(fitted_d, LinearGaussianAdapter)

np.testing.assert_allclose(fitted_c.beta, [5.0], atol=0.2)
np.testing.assert_allclose(fitted_c.std, 2.0, atol=0.2)
np.testing.assert_allclose(fitted_a.fitted_cpd_.get_values().reshape(-1), [0.5, 0.5], atol=0.08)
np.testing.assert_allclose(fitted_b.fitted_cpd_.get_values().reshape(-1), [0.5, 0.5], atol=0.08)

np.testing.assert_allclose(fitted_d.beta[0], 0.0, atol=0.25)
np.testing.assert_allclose(fitted_d.beta[1:], [2.5, -1.5, 3.0], atol=0.2)
np.testing.assert_allclose(fitted_d.std, 1.0, atol=0.15)
np.testing.assert_allclose(fitted_c.fitted_cpd_.beta, [5.0], atol=0.2)
np.testing.assert_allclose(fitted_c.fitted_cpd_.std, 2.0, atol=0.2)

np.testing.assert_allclose(fitted_d.fitted_cpd_.beta[0], 0.0, atol=0.25)
np.testing.assert_allclose(fitted_d.fitted_cpd_.beta[1:], [2.5, -1.5, 3.0], atol=0.2)
np.testing.assert_allclose(fitted_d.fitted_cpd_.std, 1.0, atol=0.15)


class DummySkproRegressor:
Expand Down
Loading