diff --git a/src/cobra/flux_analysis/__init__.py b/src/cobra/flux_analysis/__init__.py index 4cca7c940..c324b436e 100644 --- a/src/cobra/flux_analysis/__init__.py +++ b/src/cobra/flux_analysis/__init__.py @@ -7,6 +7,7 @@ single_reaction_deletion, ) from .fastcc import fastcc +from .find_cyclic_reactions import find_cyclic_reactions from .gapfilling import gapfill from .geometric import geometric_fba from .loopless import loopless_solution, add_loopless diff --git a/src/cobra/flux_analysis/find_cyclic_reactions.py b/src/cobra/flux_analysis/find_cyclic_reactions.py new file mode 100644 index 000000000..c8cb3e055 --- /dev/null +++ b/src/cobra/flux_analysis/find_cyclic_reactions.py @@ -0,0 +1,226 @@ +"""Provides a function to find cyclic reactions in a metabolic model.""" + +from typing import TYPE_CHECKING, List, Optional, Tuple + +import numpy as np +import optlang +from optlang.symbolics import Zero + +from ..util import create_stoichiometric_matrix +from ..util import solver as sutil +from .helpers import normalize_cutoff + + +if TYPE_CHECKING: + from cobra import Model + + +def _create_find_cyclic_reactions_problem( + solver: "optlang.interface", + s_int: np.ndarray, + directions_int: np.ndarray, + zero_cutoff: float, + bound: float, +) -> Tuple["optlang.interface.Model", List["optlang.interface.Variable"]]: + """Create an optimization problem to find cyclic reactions. + + This optimization problem includes only internal reactions, assumes + steady-state constraints, and enforces reaction directions. + """ + model = solver.Model() + + q_list = [] + for i in range(s_int.shape[1]): + q = solver.Variable( + name=f"q_{i}", + lb=bound * min(0, directions_int[i, 0]), + ub=bound * max(0, directions_int[i, 1]), + problem=model, + ) + q_list.append(q) + model.add(q_list) + + for idx, row in enumerate(s_int): + nnz_list = np.flatnonzero(np.abs(row) > zero_cutoff) + if len(nnz_list) == 0: + continue + constraint = solver.Constraint(Zero, lb=0, ub=0, name=f"row_{idx}") + model.add([constraint], sloppy=True) + model.constraints[f"row_{idx}"].set_linear_coefficients( + {q_list[i]: row[i] for i in nnz_list} + ) + + model.update() + + model.objective = solver.Objective(Zero) + + return model, q_list + + +def find_cyclic_reactions( + model: "Model", + zero_cutoff: Optional[float] = None, + bound: float = 1e4, + method: str = "optimized", + required_stop_checks_num: int = 3, +) -> Tuple[List[str], List[Tuple[bool, bool]]]: + """Find reactions, that can be in a loop in a steady state flux distribution. + + Parameters + ---------- + model : cobra.Model + The metabolic model to analyze. + zero_cutoff : float, optional + The cutoff value to consider a flux as zero. + The default uses the `model.tolerance` (default None). + bound : float, optional + The bound for the reaction fluxes in the optimization problem. + (default is 1e4). + method : str, optional + The method to use for finding cyclic reactions. + Options are "optimized" (default) or "basic". + See notes for details. + required_stop_checks_num : int, optional + This parameter is used only for the "optimized" method. + The number of random checks to pass to prove that all cyclic + reactions were found. (default is 3). + + Returns + ------- + A tuple containing two lists: + - A list of reaction IDs that can be part of a loop. + - A list of tuples indicating the possible directions of + reactions from the first list in the loop. + Each tuple contains two boolean values: (can_be_negative, can_be_positive). + + Notes + ----- + This function considers only the stoichiometric matrix and reaction directions. + Any other constraints present in the model are ignored. + * If a reaction is identified as cyclic, there may still be no feasible loop + when taking into account all bounds and additional constraints. + * However, if a reaction is identified as non-cyclic, it cannot participate + in any loop in a steady-state flux distribution. + + The "basic" method for each reaction and direction checks if it can be a part of + a loop by optimizing linear programming problem. + + The "optimized" method uses a faster randomized approach to firstly find all + reactions that can be part of a loop and then checks their directions. This method + usually works at least 2 times faster than the "basic" method. + The `required_stop_checks_num` parameter is used to descrease the probability + of missing some cyclic reactions. + """ + + if method not in ["optimized", "basic"]: + raise ValueError( + "The `method` parameter must be either 'optimized' or 'basic'." + ) + + if required_stop_checks_num < 1: + raise ValueError( + "The `required_stop_checks_num` parameter must be greater than 0." + ) + + zero_cutoff = normalize_cutoff(model, zero_cutoff) + + internal = [i for i, r in enumerate(model.reactions) if not r.boundary] + s_int = create_stoichiometric_matrix(model)[:, np.array(internal)] + n = s_int.shape[1] + + bounds_int = np.array([model.reactions[i].bounds for i in internal]) + directions_int = np.sign(bounds_int) + + lp, q_list = _create_find_cyclic_reactions_problem( + model.problem, s_int, directions_int, zero_cutoff, bound + ) + + candidate_reactions = list(range(n)) + can_positive = [False] * n + can_negative = [False] * n + + if method == "optimized": + is_cyclic = [False] * n + + def set_reaction_weights(): + signs = 2 * np.random.randint(low=0, high=2, size=n) - 1 + weights = np.random.uniform(0.5, 1.0, size=n) * signs + lp.objective.set_linear_coefficients( + {q_list[i]: weights[i] for i in range(n) if not is_cyclic[i]} + ) + + set_reaction_weights() + dir_order = ["min", "max"] + stop_checks_num = 0 + cyclic_reactions_num = 0 + + while cyclic_reactions_num < n and stop_checks_num < required_stop_checks_num: + found_cyclic = False + reverse_dir_order = False + + for dir in dir_order: + lp.objective.direction = dir + lp.optimize() + + sutil.check_solver_status(lp.status) + + if abs(lp.objective.value) > zero_cutoff: + remove_coef = {} + for i in range(n): + if not is_cyclic[i] and abs(q_list[i].primal) > zero_cutoff: + is_cyclic[i] = True + cyclic_reactions_num += 1 + remove_coef[q_list[i]] = 0 + if q_list[i].primal > zero_cutoff: + can_positive[i] = True + else: + can_negative[i] = True + + found_cyclic = True + lp.objective.set_linear_coefficients(remove_coef) + stop_checks_num = 0 + break + + reverse_dir_order = True + + if not found_cyclic: + set_reaction_weights() + stop_checks_num += 1 + + if reverse_dir_order: + dir_order.reverse() + + candidate_reactions = [i for i in range(n) if is_cyclic[i]] + + lp.objective = model.problem.Objective(Zero, direction="max") + for i in candidate_reactions: + for dir in ["min", "max"]: + if (dir == "min" and can_negative[i]) or (dir == "max" and can_positive[i]): + continue + + if directions_int[i, int(dir == "max")] == 0: + continue + + lp.objective.set_linear_coefficients( + {q_list[i]: 1.0 if dir == "max" else -1.0} + ) + + lp.optimize() + sutil.check_solver_status(lp.status) + + if lp.objective.value > zero_cutoff: + if dir == "max": + can_positive[i] = True + else: + can_negative[i] = True + + lp.objective.set_linear_coefficients({q_list[i]: 0.0}) + + cyclic_reactions = [] + cyclic_reactions_directions = [] + for i in range(n): + if can_positive[i] or can_negative[i]: + cyclic_reactions.append(model.reactions[internal[i]].id) + cyclic_reactions_directions.append((can_negative[i], can_positive[i])) + + return cyclic_reactions, cyclic_reactions_directions diff --git a/src/cobra/flux_analysis/loopless.py b/src/cobra/flux_analysis/loopless.py index 283fc32d9..94ebd3742 100644 --- a/src/cobra/flux_analysis/loopless.py +++ b/src/cobra/flux_analysis/loopless.py @@ -1,12 +1,13 @@ """Provide functions to remove thermodynamically infeasible loops.""" -from typing import TYPE_CHECKING, Dict, Optional, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Union import numpy as np from optlang.symbolics import Zero from ..core import get_solution -from ..util import create_stoichiometric_matrix, nullspace +from ..util import create_stoichiometric_matrix, nullspace, nullspace_fast_snp +from .find_cyclic_reactions import find_cyclic_reactions from .helpers import normalize_cutoff @@ -14,12 +15,20 @@ from cobra import Model, Reaction, Solution -def add_loopless(model: "Model", zero_cutoff: Optional[float] = None) -> None: +def add_loopless( + model: "Model", + zero_cutoff: Optional[float] = None, + method: str = "fastSNP", + reactions: Optional[List[str]] = None, +) -> None: """Modify a model so all feasible flux distributions are loopless. It adds variables and constraints to a model which will disallow flux - distributions with loops. The used formulation is described in [1]_. - This function *will* modify your model. + distributions with loops. This function *will* modify your model. + + The used formulation is described in [1]_. If `method` is set to + "fastSNP", it uses a faster implementation based on the Fast-SNP + algorithm [2]_. In most cases you probably want to use the much faster `loopless_solution`. May be used in cases where you want to add complex @@ -35,6 +44,14 @@ def add_loopless(model: "Model", zero_cutoff: Optional[float] = None) -> None: Cutoff used for null space. Coefficients with an absolute value smaller than `zero_cutoff` are considered to be zero. The default uses the `model.tolerance` (default None). + method : str, "original" or "fastSNP", optional + The method to use for finding the null space. The "original" method + uses the original method from [1]_, while "fastSNP" uses a faster + implementation based on the FastSNP algorithm. The "fastSNP" method + is much faster and should be used in most cases. + reactions : list of str, optional + The list of reaction IDs to constrain. All cycles within these + reactions will be removed. If `None`, all reactions will be constrained. References ---------- @@ -42,20 +59,59 @@ def add_loopless(model: "Model", zero_cutoff: Optional[float] = None) -> None: metabolic models. Schellenberger J, Lewis NE, Palsson BO. Biophys J. 2011 Feb 2;100(3):544-53. doi: 10.1016/j.bpj.2010.12.3707. Erratum in: Biophys J. 2011 Mar 2;100(5):1381. - + .. [2] Fast-SNP: a fast matrix pre-processing algorithm for efficient + loopless flux optimization of metabolic models. Saa PA, Nielsen LK. + Bioinformatics. 2016 Dec;32(24):3807–3814. doi: 10.1093/bioinformatics/btw555. """ + if method not in ["original", "fastSNP"]: + raise ValueError(f"unsupported method: {method}") + zero_cutoff = normalize_cutoff(model, zero_cutoff) - internal = [i for i, r in enumerate(model.reactions) if not r.boundary] - s_int = create_stoichiometric_matrix(model)[:, np.array(internal)] - n_int = nullspace(s_int).T + if reactions is None and method == "fastSNP": + reactions = find_cyclic_reactions(model, zero_cutoff=zero_cutoff)[0] + + reactions_to_constrain = [ + i for i, r in enumerate(model.reactions) if not r.boundary + ] + if reactions is not None: + reactions_set = set(reactions) + reactions_to_constrain = [ + i + for i, r in enumerate(model.reactions) + if not r.boundary and r.id in reactions_set + ] + + s_int = create_stoichiometric_matrix(model)[:, np.array(reactions_to_constrain)] + + if method == "original": + n_int = nullspace(s_int).T + elif method == "fastSNP": + bounds_int = np.array( + [model.reactions[i].bounds for i in reactions_to_constrain] + ) + directions_int = np.sign(bounds_int) + v_bound = np.max(np.abs(bounds_int)) + n_int = nullspace_fast_snp( + model.problem, + s_int, + directions_int, + v_bound=v_bound, + zero_cutoff=zero_cutoff, + ).T + else: + raise ValueError(f"unsupported method: {method}") + max_bound = max(max(abs(b) for b in r.bounds) for r in model.reactions) prob = model.problem # Add indicator variables and new constraints to_add = [] - for i in internal: - rxn = model.reactions[i] + for i, ridx in enumerate(reactions_to_constrain): + if not (np.abs(n_int[:, i]) > zero_cutoff).any(): + continue + + rxn = model.reactions[ridx] # indicator variable a_i indicator = prob.Variable(f"indicator_{rxn.id}", type="binary") # -M*(1 - a_i) <= v_i <= M*a_i @@ -84,7 +140,7 @@ def add_loopless(model: "Model", zero_cutoff: Optional[float] = None) -> None: model.add_cons_vars([nullspace_constraint]) coefs = { model.variables[f"delta_g_{model.reactions[ridx].id}"]: row[i] - for i, ridx in enumerate(internal) + for i, ridx in enumerate(reactions_to_constrain) if abs(row[i]) > zero_cutoff } model.constraints[name].set_linear_coefficients(coefs) diff --git a/src/cobra/flux_analysis/variability.py b/src/cobra/flux_analysis/variability.py index 38cd937f4..d89d8328b 100644 --- a/src/cobra/flux_analysis/variability.py +++ b/src/cobra/flux_analysis/variability.py @@ -12,8 +12,9 @@ from ..util import ProcessPool from ..util import solver as sutil from .deletion import single_gene_deletion, single_reaction_deletion +from .find_cyclic_reactions import find_cyclic_reactions from .helpers import normalize_cutoff -from .loopless import loopless_fva_iter +from .loopless import add_loopless, loopless_fva_iter from .parsimonious import add_pfba @@ -91,7 +92,7 @@ def _fva_step(reaction_id: str) -> Tuple[str, float]: def flux_variability_analysis( model: "Model", reaction_list: Optional[List[Union["Reaction", str]]] = None, - loopless: bool = False, + loopless: Union[Optional[str], bool] = None, fraction_of_optimum: float = 1.0, pfba_factor: Optional[float] = None, processes: Optional[int] = None, @@ -105,9 +106,11 @@ def flux_variability_analysis( reaction_list : list of cobra.Reaction or str, optional The reactions for which to obtain min/max fluxes. If None will use all reactions in the model (default None). - loopless : bool, optional - Whether to return only loopless solutions. This is significantly - slower. Please also refer to the notes (default False). + loopless : str, "fastSNP" or "cycleFreeFlux", optional + If this value is set, only loopless solutions will be returned. + Boolean values are deprecated. Provided value means the algorithm + to constrain the model to loopless solutions. + Please also refer to the notes (default None). fraction_of_optimum : float, optional Must be <= 1.0. Requires that the objective value is at least the fraction times maximum objective value. A value of 0.85 for instance @@ -143,11 +146,14 @@ def flux_variability_analysis( sub-optimal. Using the loopless option will lead to a significant increase in - computation time (about a factor of 100 for large models). However, the - algorithm used here (see [2]_) is still more than 1000x faster than the - "naive" version using `add_loopless(model)`. Also note that if you have - included constraints that force a loop (for instance by setting all fluxes - in a loop to be non-zero) this loop will be included in the solution. + computation time (about a factor of 100 for large models). + + If `loopless` is set to "fastSNP", the optimal loopless flux bounds will be + found by adding the loopless constraints to the model using efficient + Fast-SNP algorithm (see [2]_). + + If `loopless` is set to "cycleFreeFlux", the loops removal algorithm will be + used (see [3]_). Note: this algorithm does not guarantee to find optimal bounds. References ---------- @@ -156,13 +162,30 @@ def flux_variability_analysis( BMC Bioinformatics. 2010 Sep 29;11:489. doi: 10.1186/1471-2105-11-489, PMID: 20920235 - .. [2] CycleFreeFlux: efficient removal of thermodynamically infeasible + .. [2] Fast-SNP: a fast matrix pre-processing algorithm for efficient + loopless flux optimization of metabolic models. Saa PA, Nielsen LK. + Bioinformatics. 2016 Dec;32(24):3807–3814. doi: 10.1093/bioinformatics/btw555. + + .. [3] CycleFreeFlux: efficient removal of thermodynamically infeasible loops from flux distributions. Desouki AA, Jarre F, Gelius-Dietrich G, Lercher MJ. Bioinformatics. 2015 Jul 1;31(13):2159-65. doi: 10.1093/bioinformatics/btv096. - """ + if loopless is not None and isinstance(loopless, bool): + warn( + "Passing a boolean value to the `loopless` argument is deprecated. " + "Please pass either None, 'fastSNP' or 'cycleFreeFlux'.", + DeprecationWarning, + stacklevel=2, + ) + loopless = "cycleFreeFlux" if loopless else None + + if loopless not in (None, "fastSNP", "cycleFreeFlux"): + raise ValueError( + "The `loopless` argument must be either None, 'fastSNP' or 'cycleFreeFlux'." + ) + if reaction_list is None: reaction_ids = [r.id for r in model.reactions] else: @@ -181,6 +204,31 @@ def flux_variability_analysis( }, index=reaction_ids, ) + + reaction_ids_by_type = [ + { + "minimum": [], + "maximum": [], + }, + { + "minimum": [], + "maximum": [], + }, + ] + if loopless is not None: + cyclic_reactions, cyclic_directions = find_cyclic_reactions(model) + cyclic_reaction_index = {r_id: i for i, r_id in enumerate(cyclic_reactions)} + for r_id in reaction_ids: + i = cyclic_reaction_index.get(r_id) + for loc, dir in enumerate(("minimum", "maximum")): + if i is not None and cyclic_directions[i][loc]: + reaction_ids_by_type[1][dir].append(r_id) + else: + reaction_ids_by_type[0][dir].append(r_id) + else: + reaction_ids_by_type[0]["minimum"] = reaction_ids + reaction_ids_by_type[0]["maximum"] = reaction_ids + prob = model.problem with model: # Safety check before setting up FVA. @@ -229,25 +277,42 @@ def flux_variability_analysis( model.add_cons_vars([flux_sum, flux_sum_constraint]) model.objective = Zero # This will trigger the reset as well - for what in ("minimum", "maximum"): - if processes > 1: - # We create and destroy a new pool here in order to set the - # objective direction for all reactions. This creates a - # slight overhead but seems the most clean. - chunk_size = len(reaction_ids) // processes - with ProcessPool( - processes, - initializer=_init_worker, - initargs=(model, loopless, what[:3]), - ) as pool: - for rxn_id, value in pool.imap_unordered( - _fva_step, reaction_ids, chunksize=chunk_size - ): + for loopless_reactions, opt_rxn_ids in enumerate(reaction_ids_by_type): + if len(opt_rxn_ids["minimum"]) == 0 and len(opt_rxn_ids["maximum"]) == 0: + continue + + run_cycle_free_flux = bool(loopless_reactions) + if loopless_reactions and loopless == "fastSNP": + add_loopless( + model, + method=loopless, + reactions=cyclic_reactions, + ) + run_cycle_free_flux = False + + for what in ("minimum", "maximum"): + if len(opt_rxn_ids[what]) == 0: + continue + + cur_processes = min(processes, len(opt_rxn_ids[what])) + if cur_processes > 1: + # We create and destroy a new pool here in order to set the + # objective direction for all reactions. This creates a + # slight overhead but seems the most clean. + chunk_size = len(opt_rxn_ids[what]) // cur_processes + with ProcessPool( + cur_processes, + initializer=_init_worker, + initargs=(model, run_cycle_free_flux, what[:3]), + ) as pool: + for rxn_id, value in pool.imap_unordered( + _fva_step, opt_rxn_ids[what], chunksize=chunk_size + ): + fva_result.at[rxn_id, what] = value + else: + _init_worker(model, run_cycle_free_flux, what[:3]) + for rxn_id, value in map(_fva_step, opt_rxn_ids[what]): fva_result.at[rxn_id, what] = value - else: - _init_worker(model, loopless, what[:3]) - for rxn_id, value in map(_fva_step, reaction_ids): - fva_result.at[rxn_id, what] = value return fva_result[["minimum", "maximum"]] diff --git a/src/cobra/util/__init__.py b/src/cobra/util/__init__.py index 7fb765993..02be0fa50 100644 --- a/src/cobra/util/__init__.py +++ b/src/cobra/util/__init__.py @@ -3,3 +3,4 @@ from cobra.util.solver import * from cobra.util.util import * from cobra.util.process_pool import * +from cobra.util.fast_snp import * diff --git a/src/cobra/util/fast_snp.py b/src/cobra/util/fast_snp.py new file mode 100644 index 000000000..863f02f75 --- /dev/null +++ b/src/cobra/util/fast_snp.py @@ -0,0 +1,221 @@ +"""Provides Fast-SNP algorithm implementation for finding nullspace basis of matrix.""" + +from typing import List, Tuple + +import numpy as np +import optlang +from optlang.interface import OPTIMAL, Model, Variable +from optlang.symbolics import Zero + + +def _solve_snv( + weights: np.ndarray, model: Model, v_list: List[Variable], positive: bool +) -> np.ndarray: + """Optimize Fast-SNP step for given weights and direction.""" + + dir = 1 if positive else -1 + + model.constraints["nonzero_constraint"].set_linear_coefficients( + {variable: weight * dir for variable, weight in zip(v_list, weights)} + ) + + model.optimize() + if model.status != OPTIMAL: + return None + + result = np.array([variable.primal for variable in v_list]) + result /= np.linalg.norm(result) + + return result + + +def _create_fast_snp_problem( + solver: "optlang.interface", + S: np.ndarray, + directions: np.ndarray, + v_bound: float, + zero_cutoff: float, + bias: float, +) -> Tuple[Model, List[Variable]]: + """Create an optimization problem for Fast-SNP algorithm.""" + + n = S.shape[1] + + model = solver.Model() + modulo_list = [] + v_list = [] + + for i in range(n): + v = solver.Variable( + name=f"v_{i}", + lb=v_bound * min(directions[i, 0], 0), + ub=v_bound * max(directions[i, 1], 0), + problem=model, + ) + model.add([v]) + v_list.append(v) + + if directions[i, 0] == 0: + modulo_list.append((v, 1)) + elif directions[i, 1] == 0: + modulo_list.append((v, -1)) + else: + x = solver.Variable(name=f"x_{i}", lb=0, problem=model) + model.add([x]) + modulo_list.append((x, 1)) + + constraint1 = solver.Constraint( + Zero, + lb=0.0, + name=f"modulo_constraint1_{i}", + ) + constraint2 = solver.Constraint( + Zero, + lb=0.0, + name=f"modulo_constraint2_{i}", + ) + model.add([constraint1, constraint2], sloppy=True) + + model.constraints[f"modulo_constraint1_{i}"].set_linear_coefficients( + {x: 1.0, v: -1.0} + ) + model.constraints[f"modulo_constraint2_{i}"].set_linear_coefficients( + {x: 1.0, v: 1.0} + ) + + for idx, row in enumerate(S): + nnz_list = np.flatnonzero(np.abs(row) > zero_cutoff) + if len(nnz_list) == 0: + continue + + constraint = solver.Constraint(Zero, lb=0.0, ub=0.0, name=f"row_{idx}") + model.add([constraint], sloppy=True) + model.constraints[f"row_{idx}"].set_linear_coefficients( + {v_list[i]: row[i] for i in nnz_list} + ) + + model.add( + [ + solver.Constraint( + Zero, + lb=bias, + name="nonzero_constraint", + ) + ], + sloppy=True, + ) + + model.objective = solver.Objective(Zero) + model.objective.set_linear_coefficients({x: coef for (x, coef) in modulo_list}) + model.objective.direction = "min" + + return model, v_list + + +def _project(N: np.ndarray, w: np.ndarray) -> np.ndarray: + """Project vector w to the complement of the row space of N.""" + return w - (N.T @ (N @ w)) + + +def _get_condition_vector(N: np.ndarray) -> np.ndarray: + """Get a random Fast-SNP nonzero condition vector.""" + return _project(N, np.random.normal(size=N.shape[1])) + + +def nullspace_fast_snp( + solver: "optlang.interface", + S: np.ndarray, + directions: np.ndarray, + v_bound: float = 1e4, + zero_cutoff: float = 1e-6, + bias: float = 1, + required_stop_checks_num: int = 3, +) -> np.ndarray: + """Compute an approximate basis for the nullspace of S with coordinate directions. + + The algorithm used by this function is described in [1]_. + + Parameters + ---------- + solver : "optlang.interface" + The solver interface to use for the optimization problem. + You can use `model.problem` to get the solver interface. + S : numpy.ndarray + The matrix for which the nullspace is computed. + `S` should be a 2-D array. + directions : numpy.ndarray + A 2-D array with shape (k, 2) where `k` is the number of columns in `S`. + This array specifies the directions of coordinates. + Each row should be: + - [0, 0] for coordinates that can be only zero + - [0, 1] for coordinates that can be only positive + - [-1, 0] for coordinates that can be only negative + - [-1, 1] for coordinates that can be both positive and negative + v_bound : float, optional + The bound for the variables in the optimization problem (default 1e4). + zero_cutoff : float, optional + The cutoff value to consider a coordinate value as zero (default 1e-6). + bias : float, optional + The bias for the non-zero constraint in the optimization problem + (default 1). + required_stop_checks_num : int, optional + The number of random checks to pass to prove that basis + could not be expanded (default 3). + + Returns + ------- + numpy.ndarray + If `S` is an array with shape (m, k), then an array + with shape (k, n) will be returned, where `n` is the dimension of the + nullspace of `S` with `directions`. Each column of this array is a basis + vector for the nullspace; each element in numpy.dot(S, column) will be + approximately zero. Each coordinate in the column will have an allowed + sign according to the `directions` parameter. + + References + ---------- + .. [1] Fast-SNP: a fast matrix pre-processing algorithm for efficient + loopless flux optimization of metabolic models. Saa PA, Nielsen LK. + Bioinformatics. 2016 Dec;32(24):3807–3814. doi: 10.1093/bioinformatics/btw555. + """ + + if len(S.shape) != 2 or S.shape[0] == 0 or S.shape[1] == 0: + raise ValueError("Input matrix S must be a 2D array with non-zero dimensions.") + + problem, v_list = _create_fast_snp_problem( + solver, S, directions, v_bound, zero_cutoff, bias + ) + + n = S.shape[1] + N = np.zeros((0, n)) + U = np.zeros((0, n)) + stop_checks_num = 0 + + while N.shape[0] < n and stop_checks_num < required_stop_checks_num: + weights = _get_condition_vector(U) + + v1 = _solve_snv(weights, problem, v_list, True) + v2 = _solve_snv(weights, problem, v_list, False) + + if v1 is None and v2 is None: + stop_checks_num += 1 + continue + + stop_checks_num = 0 + v = v1 + if v1 is None: + v = v2 + + if v1 is not None and v2 is not None: + nnz_v1 = np.sum(np.abs(v1) > zero_cutoff) + nnz_v2 = np.sum(np.abs(v2) > zero_cutoff) + if nnz_v1 > nnz_v2: + v = v2 + + N = np.vstack([N, v]) + + v = _project(U, v) + v /= np.linalg.norm(v) + U = np.vstack([U, v]) + + return N.T diff --git a/tests/test_flux_analysis/test_find_cyclic_reactions.py b/tests/test_flux_analysis/test_find_cyclic_reactions.py new file mode 100644 index 000000000..bf610e540 --- /dev/null +++ b/tests/test_flux_analysis/test_find_cyclic_reactions.py @@ -0,0 +1,37 @@ +"""Test finding cyclic reactions in metabolic model.""" + +from typing import Callable, List + +import pytest + +from cobra import Model +from cobra.flux_analysis.find_cyclic_reactions import find_cyclic_reactions + + +@pytest.mark.parametrize("method", ["basic", "optimized"]) +def test_find_cyclic_reactions_benchmark( + large_model: Model, benchmark: Callable, all_solvers: List[str], method: str +) -> None: + """Benchmark find_cyclic_reactions.""" + large_model.solver = all_solvers + benchmark( + find_cyclic_reactions, + large_model, + method=method, + ) + + +@pytest.mark.parametrize("method", ["basic", "optimized"]) +def test_find_cyclic_reactions( + model: Model, all_solvers: List[str], method: str +) -> None: + """Test find_cyclic_reactions.""" + model.solver = all_solvers + cyclic_reactions, cyclic_directions = find_cyclic_reactions(model, method=method) + + assert len(cyclic_reactions) == 2 + assert len(cyclic_directions) == 2 + for reaction in ["FRD7", "SUCDi"]: + assert reaction in cyclic_reactions, f"Expected {reaction} in cyclic reactions" + for directions in cyclic_directions: + assert directions == (False, True) diff --git a/tests/test_flux_analysis/test_loopless.py b/tests/test_flux_analysis/test_loopless.py index ba37256f3..5ac320c59 100644 --- a/tests/test_flux_analysis/test_loopless.py +++ b/tests/test_flux_analysis/test_loopless.py @@ -42,13 +42,14 @@ def ll_test_model(request: pytest.FixtureRequest) -> Model: return test_model -def test_loopless_benchmark_before(benchmark: Callable) -> None: +@pytest.mark.parametrize("method", ["fastSNP", "original"]) +def test_loopless_benchmark_before(benchmark: Callable, method: str) -> None: """Benchmark initial condition.""" test_model = construct_ll_test_model() def _(): with test_model: - add_loopless(test_model) + add_loopless(test_model, method=method) test_model.optimize() benchmark(_) @@ -81,9 +82,10 @@ def test_loopless_solution_fluxes(model: Model) -> None: assert ll_solution.objective_value == pytest.approx(sol.objective_value) -def test_add_loopless(ll_test_model: Model) -> None: +@pytest.mark.parametrize("method", ["fastSNP", "original"]) +def test_add_loopless(ll_test_model: Model, method: str) -> None: """Test add_loopless().""" - add_loopless(ll_test_model) + add_loopless(ll_test_model, method=method) feasible_status = ll_test_model.optimize().status ll_test_model.reactions.v3.lower_bound = 1 ll_test_model.slim_optimize() diff --git a/tests/test_flux_analysis/test_variability.py b/tests/test_flux_analysis/test_variability.py index 5076c08e3..57e0ac451 100644 --- a/tests/test_flux_analysis/test_variability.py +++ b/tests/test_flux_analysis/test_variability.py @@ -60,14 +60,15 @@ def test_pfba_flux_variability( assert comparison["maximum"].all() -def test_loopless_pfba_fva(model: Model) -> None: +@pytest.mark.parametrize("loopless", ["fastSNP", "cycleFreeFlux"]) +def test_loopless_pfba_fva(model: Model, loopless: str) -> None: """Test loopless FVA using pFBA.""" loop_reactions = [model.reactions.get_by_id(rid) for rid in ("FRD7", "SUCDi")] fva_loopless = flux_variability_analysis( model, pfba_factor=1.1, reaction_list=loop_reactions, - loopless=True, + loopless=loopless, processes=1, ) assert np.allclose(fva_loopless["maximum"], fva_loopless["minimum"]) @@ -97,26 +98,32 @@ def test_parallel_flux_variability( # Loopless FVA +@pytest.mark.parametrize("loopless", ["fastSNP", "cycleFreeFlux"]) def test_flux_variability_loopless_benchmark( - model: Model, benchmark: Callable, all_solvers: List[str] + model: Model, benchmark: Callable, all_solvers: List[str], loopless: str ) -> None: """Benchmark loopless FVA.""" model.solver = all_solvers benchmark( flux_variability_analysis, model, - loopless=True, + loopless=loopless, reaction_list=model.reactions[1::50], ) -def test_flux_variability_loopless(model: Model, all_solvers: List[str]) -> None: +@pytest.mark.parametrize("loopless", ["fastSNP", "cycleFreeFlux"]) +def test_flux_variability_loopless( + model: Model, all_solvers: List[str], loopless: str +) -> None: """Test loopless FVA.""" model.solver = all_solvers loop_reactions = [model.reactions.get_by_id(rid) for rid in ("FRD7", "SUCDi")] fva_normal = flux_variability_analysis(model, reaction_list=loop_reactions) fva_loopless = flux_variability_analysis( - model, reaction_list=loop_reactions, loopless=True + model, + reaction_list=loop_reactions, + loopless=loopless, ) assert not np.allclose(fva_normal["maximum"], fva_normal["minimum"])