Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions release-notes/next-release.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ reaction.build_reaction_from_string

Fix `reaction.check_mass_balance` giving incorrect results for reactions with floating point coefficients.

Fixes FastCC. This now implements the full algorithm as in the original paper and gives
the same results as the Matlab implementation (within the solver tolerance).

## Other

## Deprecated features
Expand Down
179 changes: 112 additions & 67 deletions src/cobra/flux_analysis/fastcc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Provide an implementation of FASTCC."""

from typing import TYPE_CHECKING, List, Optional
from logging import getLogger
from typing import TYPE_CHECKING, List, Optional, Set

from optlang.symbolics import Zero

Expand All @@ -10,9 +11,43 @@
if TYPE_CHECKING:
from cobra.core import Model, Reaction

logger = getLogger(__name__)
LARGE_VALUE = 1.0e6


def _add_lp7_vars(
model: "Model", rxns: List["Reaction"], flux_threshold: float
) -> None:
"""Add the variables and constraints for the LP.

Parameters
----------
model: cobra.Model
The model to operate on.
rxns: list of cobra.Reaction
The reactions to use for LP.
flux_threshold: float
The upper threshold an auxiliary variable can have.

"""
prob = model.problem
vars_and_cons = []

for rxn in rxns:
var = prob.Variable(f"auxiliary_{rxn.id}", lb=0.0, ub=flux_threshold)
const = prob.Constraint(
rxn.flux_expression - var,
name="aux_constraint_{}".format(rxn.id),
lb=-LARGE_VALUE,
)
vars_and_cons.extend([var, const])

model.add_cons_vars(vars_and_cons)
model.solver.update()


def _find_sparse_mode(
model: "Model", rxns: List["Reaction"], flux_threshold: float, zero_cutoff: float
model: "Model", rxn_ids: Set[str], zero_cutoff: float
) -> List["Reaction"]:
"""Perform the LP required for FASTCC.

Expand All @@ -22,8 +57,6 @@ def _find_sparse_mode(
The model to perform FASTCC on.
rxns: list of cobra.Reaction
The reactions to use for LP.
flux_threshold: float
The upper threshold an auxiliary variable can have.
zero_cutoff: float
The cutoff below which flux is considered zero.

Expand All @@ -33,36 +66,27 @@ def _find_sparse_mode(
The list of reactions to consider as consistent.

"""
if rxns:
obj_vars = []
vars_and_cons = []
prob = model.problem

for rxn in rxns:
var = prob.Variable(
"auxiliary_{}".format(rxn.id), lb=0.0, ub=flux_threshold
)
const = prob.Constraint(
rxn.forward_variable + rxn.reverse_variable - var,
name="constraint_{}".format(rxn.id),
lb=0.0,
)
vars_and_cons.extend([var, const])
obj_vars.append(var)
if not rxn_ids:
return set()

model.add_cons_vars(vars_and_cons)
model.objective = prob.Objective(Zero, sloppy=True)
model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars})
# Enable constraints for the reactions
for rid in rxn_ids:
model.constraints.get(f"aux_constraint_{rid}").lb = 0.0

model.optimize(objective_sense="max")
result = [rxn for rxn in model.reactions if abs(rxn.flux) > zero_cutoff]
else:
result = []
obj_vars = [model.variables.get(f"auxiliary_{rid}") for rid in rxn_ids]
model.objective = Zero
model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars})

return result
sol = model.optimize(objective_sense="max")

# Disable constraints for the reactions
for rid in rxn_ids:
model.constraints.get("aux_constraint_{}".format(rid)).lb = -LARGE_VALUE

def _flip_coefficients(model: "Model", rxns: List["Reaction"]) -> None:
return set(sol.fluxes[sol.fluxes.abs() > zero_cutoff].index)


def _flip_coefficients(model: "Model", rxn_ids: Set[str]) -> None:
"""Flip the coefficients for optimizing in reverse direction.

Parameters
Expand All @@ -73,17 +97,20 @@ def _flip_coefficients(model: "Model", rxns: List["Reaction"]) -> None:
The list of reactions whose coefficients will be flipped.

"""
if not rxn_ids:
return
# flip reactions
for rxn in rxns:
const = model.constraints.get("constraint_{}".format(rxn.id))
var = model.variables.get("auxiliary_{}".format(rxn.id))
for rxn in rxn_ids:
const = model.constraints.get(f"aux_constraint_{rxn}")
var = model.variables.get(f"auxiliary_{rxn}")
coefs = const.get_linear_coefficients(const.variables)
const.set_linear_coefficients({k: -v for k, v in coefs.items() if k is not var})
model.solver.update()


# flip objective
objective = model.objective
objective_coefs = objective.get_linear_coefficients(objective.variables)
objective.set_linear_coefficients({k: -v for k, v in objective_coefs.items()})
def _any_set(s):
for x in s:
return set([x])


def fastcc(
Expand Down Expand Up @@ -133,46 +160,64 @@ def fastcc(
"""
zero_cutoff = normalize_cutoff(model, zero_cutoff)

irreversible_rxns = [rxn for rxn in model.reactions if not rxn.reversibility]
all_rxns = {rxn.id for rxn in model.reactions}
irreversible_rxns = {rxn.id for rxn in model.reactions if not rxn.reversibility}
rxns_to_check = irreversible_rxns
flipped = False
singletons = False

with model:
rxns_to_keep = _find_sparse_mode(
model, rxns_to_check, flux_threshold, zero_cutoff
_add_lp7_vars(model, model.reactions, flux_threshold)

rxns_to_keep = _find_sparse_mode(model, rxns_to_check, zero_cutoff)
rxns_to_check = all_rxns.difference(rxns_to_keep)
logger.info(
"Initial step found %d consistent reactions. "
"Starting the consistency loop for the remaining %d reactions.",
len(rxns_to_keep),
len(rxns_to_check),
)

rxns_to_check = list(set(model.reactions).difference(rxns_to_keep))

while rxns_to_check:
with model:
new_rxns = _find_sparse_mode(
model, rxns_to_check, flux_threshold, zero_cutoff
while rxns_to_check:
logger.debug(
"reactions to check: %d - consistent reactions:"
" %d - flipped: %d - singletons: %d",
len(rxns_to_check),
len(rxns_to_keep),
flipped,
singletons,
)
rxns_to_keep.extend(new_rxns)

# this condition will be valid for all but the last iteration
if list(set(rxns_to_check).intersection(rxns_to_keep)):
rxns_to_check = list(set(rxns_to_check).difference(rxns_to_keep))
check = _any_set(rxns_to_check) if singletons else rxns_to_check
new_rxns = _find_sparse_mode(model, check, zero_cutoff)
rxns_to_keep.update(new_rxns)

if rxns_to_check.intersection(rxns_to_keep):
rxns_to_check = rxns_to_check.difference(rxns_to_keep)
flipped = False
else:
rxns_to_flip = list(set(rxns_to_check).difference(irreversible_rxns))
_flip_coefficients(model, rxns_to_flip)
sol = model.optimize(min)
to_add_rxns = sol.fluxes.index[sol.fluxes.abs() > zero_cutoff].tolist()
rxns_to_keep.extend(
[model.reactions.get_by_id(rxn) for rxn in to_add_rxns]
)
# since this is the last iteration, it needs to break or else
# it will run forever since rxns_to_check won't be empty
break

consistent_rxns = set(rxns_to_keep)
# need the ids since Reaction objects are created fresh with model.copy()
rxns_to_remove = [
rxn.id for rxn in set(model.reactions).difference(consistent_rxns)
]
check_irr = check.difference(irreversible_rxns)
if flipped or not check_irr:
if singletons:
logger.debug("%s is inconsistent", check)
rxns_to_check = rxns_to_check.difference(check)
flipped = False
singletons = True
else:
flipped = True
check = _any_set(rxns_to_check) if singletons else rxns_to_check
_flip_coefficients(model, check_irr)
logger.info(
"Final - consistent reactions: %d"
" - inconsistent reactions: %d [eps=%.2g, tol=%.2g]",
len(rxns_to_keep),
len(all_rxns) - len(rxns_to_keep),
flux_threshold,
zero_cutoff,
)

consistent_model = model.copy()
consistent_model.remove_reactions(rxns_to_remove, remove_orphans=True)
consistent_model.remove_reactions(
all_rxns.difference(rxns_to_keep), remove_orphans=True
)

return consistent_model
29 changes: 20 additions & 9 deletions tests/test_flux_analysis/test_fastcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

from cobra import Model, Reaction
from cobra.flux_analysis import fastcc, flux_variability_analysis
from cobra.flux_analysis import fastcc, find_blocked_reactions


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -102,14 +102,25 @@ def test_opposing(opposing_model: Model, all_solvers: List[str]) -> None:
assert expected_reactions == {rxn.id for rxn in consistent_model.reactions}


def test_fastcc_against_fva_nonblocked_rxns(
def test_fastcc_against_nonblocked_rxns(
model: Model, all_solvers: List[str]
) -> None:
"""Test non-blocked reactions obtained by FASTCC against FVA."""
"""Test non-blocked reactions obtained by FASTCC."""
model.solver = all_solvers
fastcc_consistent_model = fastcc(model)
fva = flux_variability_analysis(model, fraction_of_optimum=0.0)
nonblocked_rxns_fva = fva.index[
(fva.minimum.abs() > model.tolerance) | (fva.maximum.abs() > model.tolerance)
]
assert all(fastcc_consistent_model.reactions) == all(nonblocked_rxns_fva.tolist())
model.tolerance = 1e-6
fastcc_consistent_model = fastcc(model, 1e-3, 1e-6)
blocked = find_blocked_reactions(model)
fastcc_ids = set(rxn.id for rxn in fastcc_consistent_model.reactions)
assert len(model.reactions) - len(blocked) == len(fastcc_consistent_model.reactions)
assert fastcc_ids & set(blocked) == set()


def test_fastcc_against_nonblocked_rxns_large(large_model: Model) -> None:
"""Test non-blocked reactions obtained by FASTCC."""
model = large_model
model.tolerance = 1e-7
fastcc_consistent_model = fastcc(model, 1e-3, 1e-7)
blocked = find_blocked_reactions(model, zero_cutoff=1e-7)
fastcc_ids = set(rxn.id for rxn in fastcc_consistent_model.reactions)
assert len(model.reactions) - len(blocked) == len(fastcc_consistent_model.reactions)
assert fastcc_ids & set(blocked) == set()
Loading