diff --git a/mumble/__init__.py b/mumble/__init__.py index 73e0c75..140e2aa 100644 --- a/mumble/__init__.py +++ b/mumble/__init__.py @@ -1,3 +1,3 @@ -__version__ = "0.1.2" +__version__ = "0.2.0" from mumble.mumble import PSMHandler diff --git a/mumble/__main__.py b/mumble/__main__.py index 705d3dd..fdf180b 100644 --- a/mumble/__main__.py +++ b/mumble/__main__.py @@ -1,4 +1,5 @@ import click +import cProfile from mumble import PSMHandler @@ -75,9 +76,27 @@ "help": "Path to a config file", "default": None, }, + "profile": { + "is_flag": True, + "help": "Profile the code with cProfile", + "default": False, + "show_default": True, + }, } +def profile(fnc, filepath): + """A decorator that uses cProfile to profile a function""" + + def inner(*args, **kwargs): + with cProfile.Profile() as profiler: + return_value = fnc(*args, **kwargs) + profiler.dump_stats(filepath + ".profile.prof") + return return_value + + return inner + + @click.command("cli", context_settings={"show_default": True}) @click.argument("input_file", type=click.Path(exists=True), default=None) def main(**kwargs): @@ -92,6 +111,11 @@ def main(**kwargs): for key, value in kwargs.items() if ctx.get_parameter_source(key) == click.core.ParameterSource.COMMANDLINE } + if kwargs["profile"]: + pr = cProfile.Profile() + pr.enable() + else: + pr = None # Initialize PSMHandler with priority for CLI params psm_handler = PSMHandler( @@ -99,6 +123,9 @@ def main(**kwargs): **cli_params, ) modified_psm_list = psm_handler.add_modified_psms(kwargs["input_file"]) + if pr: + pr.disable() + pr.dump_stats(kwargs["input_file"] + ".profile.prof") psm_handler.write_modified_psm_list(modified_psm_list) diff --git a/mumble/mumble.py b/mumble/mumble.py index b735f44..3764e16 100644 --- a/mumble/mumble.py +++ b/mumble/mumble.py @@ -3,19 +3,25 @@ import itertools import os import json -from collections import namedtuple +import numpy as np from pathlib import Path +from dataclasses import dataclass +from itertools import product +from functools import lru_cache +import concurrent.futures +from functools import partial + import pandas as pd import pickle from psm_utils.io import read_file, write_file -from psm_utils import PSMList, PSM, Peptidoform +from psm_utils import PSMList from psm_utils.utils import mz_to_mass from pyteomics import proforma -from pyteomics.mass import std_aa_mass, calculate_mass, unimod +from pyteomics.mass import std_aa_mass, unimod from pyteomics.fasta import IndexedFASTA -from rich.progress import track from rich.logging import RichHandler +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn # setup logging logging.basicConfig( @@ -59,7 +65,7 @@ def _load_parameters(self, overrides: dict) -> dict: Load configuration parameters from the JSON file and apply overrides. Args: - overrides (dict): Dictionary of parameters to override. + overrides (dict): Dictionary of parameters _getto override. Returns: dict: Consolidated configuration parameters. @@ -91,147 +97,6 @@ def _load_parameters(self, overrides: dict) -> dict: return params - @staticmethod - def _find_mod_locations(peptidoform): - """ - Find the locations of existing modifications in a peptide. - - Args: - peptidoform (psm_utils.Peptidoform): Peptidoform object - - return: - list: List of locations of existing modifications - """ - locations = [] - - if peptidoform.properties["n_term"] is not None: - locations.append("N-term") - - if peptidoform.properties["c_term"] is not None: - locations.append("C-term") - - for i, aa in enumerate(peptidoform.parsed_sequence): - if aa[1] is not None: - locations.append(i) - - return locations - - def _return_mass_shifted_peptidoform( - self, modification_tuple_list, peptidoform - ) -> Peptidoform: - """ - Apply a list of modification tuples to a peptidoform. - - Args: - modification_tuple_list (list of tuples): List of modification tuples containing local mass shifts. - peptidoform (psm_utils.Peptidoform): Original peptidoform object to modify. - - Returns: - list of psm_utils.Peptidoform: List of new peptidoform objects with applied modifications, or None if conflicting modifications exist. - """ - - new_peptidoforms = [] - new_peptidoform = deepcopy(peptidoform) - - existing_mod_locations = self._find_mod_locations(new_peptidoform) - - for modification_tuple in modification_tuple_list: - - new_peptidoform = deepcopy(peptidoform) - for localised_mass_shift in modification_tuple.Localised_mass_shifts: - loc, mod = localised_mass_shift - if loc in existing_mod_locations: - return None - else: - if loc == "N-term": - new_peptidoform.properties["n_term"] = [proforma.process_tag_tokens(mod)] - elif loc == "C-term": - new_peptidoform.properties["c_term"] = [proforma.process_tag_tokens(mod)] - elif loc == "prepeptide": - new_peptidoform.parsed_sequence = [ - (aa, None) for aa in mod - ] + new_peptidoform.parsed_sequence - - elif loc == "postpeptide": - new_peptidoform.parsed_sequence = new_peptidoform.parsed_sequence + [ - (aa, None) for aa in mod - ] - else: - try: - aa = new_peptidoform.parsed_sequence[loc][0] - except IndexError: - logger.warning(f"IndexError for {peptidoform} at {loc} with {mod}") - raise IndexError("Localisation is not in peptide") - - # If the modification is an amino acid substitution - if mod in self.modification_handler.aa_sub_dict.keys(): - if ( - aa == self.modification_handler.aa_sub_dict[mod][0] - ): # TODO named tuple so indexing is not necesary and more clear - new_peptidoform.parsed_sequence[loc] = ( - self.modification_handler.aa_sub_dict[mod][1], - None, - ) - # If the modification is a standard modification - else: - new_peptidoform.parsed_sequence[loc] = ( - aa, - [proforma.process_tag_tokens(mod)], - ) - new_peptidoforms.append(new_peptidoform) - return new_peptidoforms - - @staticmethod - def _create_new_psm(psm, new_peptidoform) -> PSM: - """ - Create new psm with new peptidoform. - - Args: - psm (psm_utils.PSM): PSM object - new_peptidoform (psm_utils.Peptidoform): Peptidoform object - - return: - psm_utils.PSM: PSM object - """ - if new_peptidoform is None: - return - copy_psm = deepcopy(psm) - copy_psm.peptidoform = new_peptidoform - return copy_psm - - def _get_modified_peptidoforms(self, psm, keep_original=False) -> list: - """ - Get modified peptidoforms derived from a single PSM. - - Args: - psm (psm_utils.PSM): Original PSM object. - keep_original (bool, optional): Whether to keep the original PSM alongside modified ones. Defaults to False. - - Returns: - list: List of modified PSMs, or None if no modifications were applied. - """ - modified_peptidoforms = [] - - if keep_original: - psm["metadata"]["original_psm"] = True - modified_peptidoforms.append(psm) - - modification_tuple_list = self.modification_handler.localize_mass_shift(psm) - if modification_tuple_list: - new_proteoforms_list = self._return_mass_shifted_peptidoform( - modification_tuple_list, psm.peptidoform - ) - for new_proteoform in new_proteoforms_list: - new_psm = self._create_new_psm( - psm, - new_proteoform, - ) - if new_psm is not None: - new_psm["metadata"]["original_psm"] = False - modified_peptidoforms.append(new_psm) - - return modified_peptidoforms - def get_modified_peptidoforms_list(self, psm, keep_original=False) -> PSMList: """ Get modified peptidoforms derived from 1 PSM in a PSMList. @@ -243,7 +108,10 @@ def get_modified_peptidoforms_list(self, psm, keep_original=False) -> PSMList: return: psm_utils.PSMList: PSMList object """ - modified_peptidoforms = self._get_modified_peptidoforms(psm, keep_original=keep_original) + modified_peptidoforms = self.modification_handler._get_modified_peptidoforms( + psm, keep_original=keep_original, generate_modified_decoys=False + ) + return PSMList(psm_list=modified_peptidoforms) def add_modified_psms( @@ -270,36 +138,65 @@ def add_modified_psms( pass else: raise ValueError("No PSM list provided") - if not generate_modified_decoys: + if generate_modified_decoys is None: generate_modified_decoys = self.params["generate_modified_decoys"] - if not keep_original: + if keep_original is None: keep_original = self.params["keep_original"] if not psm_file_type: psm_file_type = self.params["psm_file_type"] logger.info( - f"Adding modified PSMs to PSMlist {'WITH' if keep_original else 'WITHOUT'} originals, {'INCLUDING' if generate_modified_decoys else 'EXCLUDING'} modfied decoys" + f"Adding modified PSMs to PSMlist {'WITH' if keep_original else 'WITHOUT'} originals, {'INCLUDING' if generate_modified_decoys else 'EXCLUDING'} modified decoys" ) parsed_psm_list = self._parse_psm_list( psm_list, psm_file_type, self.params["modification_mapping"] ) new_psm_list = [] - num_added_psms = 0 + total_num_added_psms = 0 + + # Define a partial function with fixed parameters + process_func = partial( + process_single_psm, + modification_handler=self.modification_handler, + keep_original=keep_original, + generate_modified_decoys=generate_modified_decoys, + ) - for psm in track( - parsed_psm_list, - description="Parsing PSMs in PSMList...", - total=len(parsed_psm_list), - ): - if (psm.is_decoy) & (not generate_modified_decoys): - continue - new_psms = self._get_modified_peptidoforms(psm, keep_original=keep_original) - if new_psms: - num_added_psms += len(new_psms) if not keep_original else len(new_psms) - 1 - new_psm_list.extend(new_psms) - if num_added_psms != 0: - logger.info(f"Added {num_added_psms} additional modified PSMs") + # Setup the progress bar with enhanced columns + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("{task.completed}/{task.total} PSMs processed"), + TimeElapsedColumn(), + transient=False, + ) as progress: + task = progress.add_task("Processing PSMs...", total=len(parsed_psm_list)) + + # Use ThreadPoolExecutor to avoid pickling issues + with concurrent.futures.ThreadPoolExecutor() as executor: + # Submit all tasks to the executor + futures = {executor.submit(process_func, psm): psm for psm in parsed_psm_list} + + for future in concurrent.futures.as_completed(futures): + try: + modified_psms = future.result() + if modified_psms: + if keep_original: + # Assuming the first PSM is the original + total_num_added_psms += len(modified_psms) - 1 + else: + total_num_added_psms += len(modified_psms) + new_psm_list.extend(modified_psms) + except Exception as e: + psm = futures[future] + logger.error(f"Error processing PSM {psm}: {e}") + finally: + progress.advance(task) + + if total_num_added_psms != 0: + logger.info(f"Added {total_num_added_psms} additional modified PSMs") else: logger.warning("No modified PSMs found, ensure open modification search was enabled") @@ -326,7 +223,7 @@ def _parse_psm_list( elif type(psm_list) is str: self.psm_file_name = Path(psm_list) psm_list = read_file(psm_list, filetype=psm_file_type) - psm_list.rename_modifications({"+15.9949": "Oxidation"}) + psm_list.rename_modifications(modification_mapping) elif type(psm_list) is not PSMList: raise TypeError("psm_list should be a path to a file or a PSMList object") @@ -362,6 +259,35 @@ def write_modified_psm_list(self, psm_list, output_file=None, psm_file_type=None write_file(psm_list=psm_list, filename=output_file, filetype=psm_file_type) +def process_single_psm(psm, modification_handler, keep_original, generate_modified_decoys): + """ + Processes a single PSM to generate modified PSMs. + + Args: + psm (psm_utils.PSM): The original PSM object. + modification_handler (_ModificationHandler): Instance handling modifications. + keep_original (bool): Whether to keep the original PSM. + generate_modified_decoys (bool): Whether to generate decoys. + + Returns: + list: List of modified PSMs. + """ + # Add logging to confirm the function is being called + + if psm.is_decoy and not generate_modified_decoys: + return [] + + try: + modified_peptidoforms = modification_handler._get_modified_peptidoforms( + psm, keep_original=keep_original + ) + logger.debug(f"Generated {len(modified_peptidoforms)} modified PSMs for {psm}") + return modified_peptidoforms + except Exception as e: + logger.error(f"Exception in processing PSM {psm}: {e}") + return [] + + class _ModificationHandler: """Class that handles modifications.""" @@ -420,81 +346,21 @@ def _get_name_to_mass_residue_dict(self): return: dict: Dictionary with name as key and mass and residue as value """ - Modification = namedtuple("modification", ["mass", "residues", "restrictions"]) return { - row.name: Modification(row.monoisotopic_mass, row.residue, row.restriction) + row.name: Modification( + name=row.name, + monoisotopic_mass=row.monoisotopic_mass, + residues=row.residue, + restrictions=row.restriction, + ) for row in self.modification_df.groupby(["monoisotopic_mass", "name"]) .agg({"residue": list, "restriction": list}) .reset_index() .itertuples() - } # TODO: used named tuple here - - def get_localisation( - self, psm, modification_name, residue_list, restrictions - ) -> set[namedtuple]: - """ - Localise a given modification in a peptide - - Args: - psm (psm_utils.PSM): PSM object - modification_name (str): Name of the modification - residue_list (list): List of residues - restrictions (list): List of restrictions - - return: - list: List of localised_mass_shift - """ - loc_list = [] - Localised_mass_shift = namedtuple("Localised_mass_shift", ["loc", "modification"]) - - amino_acids_peptide = [x[0] for x in psm.peptidoform.parsed_sequence] - - for residue, restriction in zip(residue_list, restrictions): - if (residue == "N-term") and (psm.peptidoform.properties["n_term"] is None): - loc_list.append(Localised_mass_shift("N-term", modification_name)) - - elif residue == "C-term" and (psm.peptidoform.properties["c_term"] is None): - loc_list.append(Localised_mass_shift("C-term", modification_name)) - - elif residue == "protein_level": - loc_list.extend( - [ - Localised_mass_shift(loc, mod) - for loc, mod in self.check_protein_level(psm, modification_name) - ] - ) - elif restriction == "N-term" or restriction == "C-term": - if ( - restriction == "N-term" - and (psm.peptidoform.properties["n_term"] is None) - and (psm.peptidoform.parsed_sequence[0][0] == residue) - ): - loc_list.append(Localised_mass_shift("N-term", modification_name)) - - elif ( - restriction == "C-term" - and (psm.peptidoform.properties["c_term"] is None) - and (psm.peptidoform.parsed_sequence[-1][0] == residue) - ): - loc_list.append(Localised_mass_shift("C-term", modification_name)) - - else: - continue - - elif residue in amino_acids_peptide: - loc_list.extend( - [ - Localised_mass_shift(i, modification_name) - for i, aa in enumerate(amino_acids_peptide) - if (aa == residue) and (psm.peptidoform.parsed_sequence[i][1] is None) - ] - ) - - # remove duplicate locations - return set(loc_list) + } - def localize_mass_shift(self, psm) -> list[namedtuple]: + def _localize_mass_shift(self, psm): """Give potential localisations of a mass shift in a peptide Args: @@ -504,87 +370,88 @@ def localize_mass_shift(self, psm) -> list[namedtuple]: list: List of Modification_candidate([localised_mass_shift]) """ expmass = mz_to_mass(psm.precursor_mz, psm.get_precursor_charge()) - calcmass = calculate_mass(psm.peptidoform.composition) + calcmass = psm.peptidoform.theoretical_mass mass_shift = expmass - calcmass + peptidofrom_as_list = ( + (["N-term"] if not psm.peptidoform.properties["n_term"] else [None]) + + (["Protein_level"] if self.protein_level_check else [None]) + + ([x[0] if not x[1] else None for x in psm.peptidoform.parsed_sequence]) + + (["Protein_level"] if self.protein_level_check else [None]) + + (["C-term"] if not psm.peptidoform.properties["c_term"] else [None]) + ) + # get all potential modifications - try: - potential_modifications_indices = self._binary_range_search( - self.monoisotopic_masses, mass_shift, self.mass_error - ) - if potential_modifications_indices: - potential_modifications_tuples = self.modifications_names[ - potential_modifications_indices[0] : potential_modifications_indices[1] + 1 - ] - else: - return [] - except KeyError: - return None - - Modification_candidate = namedtuple("Modification_candidate", ["Localised_mass_shifts"]) - - # cache to store results for combinations - combination_cache = {} - - def check_combination(combination, psm): - if not combination: - return [] - - if combination in combination_cache: - return combination_cache[combination] - - if len(combination) == 1: - # Case: combination with no child combinations and not cached - mod_name = combination[0] - residues = self.name_to_mass_residue_dict[mod_name].residues - restrictions = self.name_to_mass_residue_dict[mod_name].restrictions - localizations = self.get_localisation(psm, mod_name, residues, restrictions) - # Store the results as a list of feasible modification candidates - result = [ - Modification_candidate(Localised_mass_shifts=[localization]) - for localization in localizations - ] - combination_cache[combination] = result - return result + search_indices = self._binary_range_search( + self.monoisotopic_masses, mass_shift, self.mass_error + ) + if not search_indices: + return [] + canididate_mass_shifts = self.modifications_names[ + search_indices[0] : search_indices[1] + 1 + ] + psm_modifications = [] + for modifications in canididate_mass_shifts: + # Try localise all modifications for each mass shift candidate + peptidoform_candidates = [] + for mod_name in modifications: + localised_mod = self.name_to_mass_residue_dict[mod_name].localise( + peptidofrom_as_list + ) + if localised_mod[1] or localised_mod[-2]: + localised_mod[1], localised_mod[-2] = self.check_protein_level(psm, mod_name) - else: - # Case: combination with child combinations and not cached - # child_combinations = [combo for combo in itertools.product(*[[(mod,) for mod in combination]])] - child_combinations = itertools.product(combination) - - # Get possible mass shift combinations for each child - child_results = [] - for child in child_combinations: - child_results.append(check_combination(child, psm)) - - # Combine child mass shift possibilities - combined_results = [] - for child_result_list in itertools.product(*child_results): - # Flatten the list of Localised_mass_shifts from all child results - all_shifts = [ - shift - for result in child_result_list - for shift in result.Localised_mass_shifts - ] + if not localised_mod.any(): + break - # Check for position conflicts - positions = [shift.loc for shift in all_shifts] - if len(set(positions)) == len(positions): # No overlap in positions - combined_results.append( - Modification_candidate(Localised_mass_shifts=all_shifts) - ) - - combination_cache[combination] = combined_results - return combined_results - - feasible_modifications_candidates = [] - for potential_mods_combination in potential_modifications_tuples: - # check every combination recursively - feasible_modifications_candidates.extend( - check_combination(potential_mods_combination, psm) - ) + peptidoform_candidates.append(localised_mod) + if peptidoform_candidates: + final_candidates = self._generate_combinations(peptidoform_candidates) + psm_modifications.extend(final_candidates) - return feasible_modifications_candidates if feasible_modifications_candidates else None + return psm_modifications + + @staticmethod + def _generate_combinations(mod_lists): + """Give a list of lists of single modification placements, generate all potential combinations""" + mod_array = np.array(mod_lists, dtype=object) + + # Use np.where to get indices of non-False values for each row + valid_indices = [ + np.where(mod_array[i] != False)[0].tolist() for i in range(mod_array.shape[0]) # NOQA + ] + # generate all possible combinations of indices + valid_combinations = [ + comb for comb in list(product(*valid_indices)) if len(set(comb)) == len(mod_lists) + ] + + candidates = [] + # Generate the candidate modifications + for combination in valid_combinations: + candidate = [None] * len(mod_array[0]) + for i, idx in enumerate(combination): + name = mod_array[i][idx] + if candidate[idx] == "[B]": + candidate = False + break + + elif "[B]" in name: + candidate[idx] = name.replace("[B]", "") + # check that there are no other modifications on before this position + if any(candidate[:idx]): + candidate = False + break + else: + # block everything before this position + candidate[:idx] = ["[B]"] * idx + else: + candidate[idx] = name + if candidate: + # replace [B] with None + candidate = [None if x == "[B]" else x for x in candidate] + candidates.append(candidate) + + return candidates @staticmethod def _binary_range_search(arr, target, error) -> tuple[int, int]: @@ -638,7 +505,7 @@ def binary_right_index(arr, value) -> int: # Check bounds and validity if left <= right and left < len(arr) and right >= 0: return (left, right) - return () + return None def _get_aa_sub_dict(self): """ @@ -691,6 +558,61 @@ def _add_amino_acid_combinations(self, number_of_aa=1): ] ) + def _get_modified_peptidoforms(self, psm, keep_original=False) -> list: + modified_peptidoforms = [] + + if keep_original: + psm["metadata"]["original_psm"] = True + modified_peptidoforms.append(psm) + + localised_mass_shifts = self._localize_mass_shift(psm) + if localised_mass_shifts: + modified_peptidoforms.extend( + self._return_mass_shifted_psms(localised_mass_shifts, psm) + ) + + return modified_peptidoforms + + def _return_mass_shifted_psms(self, modification_lists, psm) -> list: + mass_shifted_psms = [] + + for assigned_modifications in modification_lists: + mass_shifted_psm = deepcopy(psm) + new_peptidoform = mass_shifted_psm.peptidoform + + # Handle terminal modifications + if nterm := assigned_modifications[0]: + new_peptidoform.properties["n_term"] = [self.cached_process_tag_tokens(nterm)] + if cterm := assigned_modifications[-1]: + new_peptidoform.properties["c_term"] = [self.cached_process_tag_tokens(cterm)] + + # Handle peptide modifications + for i, mod in enumerate(assigned_modifications[2:-2]): + if mod in self.aa_sub_dict.keys(): + new_peptidoform.parsed_sequence[i] = ( + self.aa_sub_dict[mod][1], + None, + ) + elif mod: + new_peptidoform.parsed_sequence[i] = ( + new_peptidoform.parsed_sequence[i][0], + [self.cached_process_tag_tokens(mod)], + ) + + # Handle peptide additions + if additional_aa := assigned_modifications[1]: + new_peptidoform.parsed_sequence = [ + (aa, None) for aa in additional_aa + ] + new_peptidoform.parsed_sequence + if additional_aa := assigned_modifications[-2]: + new_peptidoform.parsed_sequence = new_peptidoform.parsed_sequence + [ + (aa, None) for aa in additional_aa + ] + + mass_shifted_psms.append(mass_shifted_psm) + + return mass_shifted_psms + def check_protein_level(self, psm, additional_aa): """ Check if amino acid(s) precedes or follows a peptide in the protein sequence. @@ -705,7 +627,6 @@ def check_protein_level(self, psm, additional_aa): if psm.is_decoy: return [] - found_additional_amino_acids = [] protein_sequence = self.fasta_file[psm.protein_list[0]].sequence peptide_start_position = protein_sequence.find(psm.peptidoform.sequence) @@ -716,15 +637,32 @@ def check_protein_level(self, psm, additional_aa): protein_sequence[peptide_start_position - additional_aa_len : peptide_start_position] == additional_aa ): - found_additional_amino_acids.append(("prepeptide", additional_aa)) + prepeptide = additional_aa + else: + prepeptide = False if ( protein_sequence[peptide_end_position : peptide_end_position + additional_aa_len] == additional_aa ): - found_additional_amino_acids.append(("postpeptide", additional_aa)) + postpeptide = additional_aa + else: + postpeptide = False - return found_additional_amino_acids + return prepeptide, postpeptide + + @lru_cache(maxsize=None) + def cached_process_tag_tokens(self, tag): + """ + Process a tag token and cache the result. + + Args: + tag (str): Tag token to process. + + Returns: + str: Processed tag token. + """ + return proforma.process_tag_tokens(tag) class ModificationCache: @@ -768,7 +706,7 @@ def _get_cache_file_path(self): # Create the cache directory if it doesn't exist os.makedirs(cache_dir, exist_ok=True) - cache_file = os.path.join(cache_dir, "modification_cache.h5") + cache_file = os.path.join(cache_dir, "modification_cache.pkl") return cache_file def _load_or_generate_data(self, cache_file: str) -> None: @@ -833,7 +771,7 @@ def get_unimod_database(self): monoisotopic_mass = mod.monoisotopic_mass for specificity in mod.specificities: - classification = specificity.classification + classification = specificity.classification.classification # Skip based on classification if classification == "Isotopic label": @@ -984,6 +922,69 @@ def _read_unimod_file(self, modification_file=None): return None, None +@dataclass +class Modification: + name: str + monoisotopic_mass: float + residues: np.ndarray + restrictions: np.ndarray + + def __post_init__(self): + assert len(self.residues) == len( + self.restrictions + ), "Residues and restrictions should have the same length" + + def localise(self, peptidoform_as_list): + """ + Localise the modification in a peptide based on residues and restrictions. + + Args: + peptidoform_as_list (np.ndarray): Array of the original peptidoform. + + Returns: + np.ndarray: Array with modifcation name, [B]Modification name or False for each element. + """ + # convert to array + peptidoform_as_list = np.array(peptidoform_as_list) + # Initialize result array with False + result = np.full(peptidoform_as_list.shape, False, dtype=object) + + # Precompute positions + n_term_positions = np.array([0, 2]) + c_term_positions = np.array([len(peptidoform_as_list) - 1, len(peptidoform_as_list) - 3]) + + for residue, restriction in zip(self.residues, self.restrictions): + # Find matches + matches = peptidoform_as_list == residue + + if restriction == "anywhere": + result[matches] = self.name + + elif restriction == "N-term": + # Handle N-term restrictions + n_term_indices = np.intersect1d(np.where(matches)[0], n_term_positions) + result[n_term_indices] = "[B]" + self.name + result[np.where(matches)[0][~np.isin(np.where(matches)[0], n_term_positions)]] = ( + False + ) + result[np.where(matches)[0][np.isin(np.where(matches)[0], [0])]] = self.name + + elif restriction == "C-term": + # Handle C-term restrictions + c_term_indices = np.intersect1d(np.where(matches)[0], c_term_positions) + result[c_term_indices] = "[B]" + self.name + result[np.where(matches)[0][~np.isin(np.where(matches)[0], c_term_positions)]] = ( + False + ) + result[ + np.where(matches)[0][ + np.isin(np.where(matches)[0], [len(peptidoform_as_list) - 1]) + ] + ] = self.name + + return result + + class JSONConfigLoader: """Loads a single-level configuration from a JSON file.""" diff --git a/pyproject.toml b/pyproject.toml index 860cd47..ea232d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mumble" -version = "0.1.2" +version = "0.2.0" description = "Finding the perfect modification for your mass shift" readme = "README.md" keywords = [ @@ -49,7 +49,7 @@ target-version = 'py38' extend-select = ["T201", "T203"] [tool.bumpver] -current_version = "0.0.1" +current_version = "0.2.0" version_pattern = "MAJOR.MINOR.PATCH" commit_message = "Bump version {old_version} -> {new_version}" commit = true @@ -58,4 +58,4 @@ push = false [tool.bumpver.file_patterns] "pyproject.toml" = ['current_version = "{version}"', 'version = "{version}"'] -"mumble/__init__.py" = ["{version}"] +"mumble/__init__.py" = ['__version__ = "{version}"']