diff --git a/src/semra/api.py b/src/semra/api.py index d0921ba..000c579 100644 --- a/src/semra/api.py +++ b/src/semra/api.py @@ -5,7 +5,7 @@ import itertools as itt import logging from collections import Counter, defaultdict -from collections.abc import Callable, Iterable +from collections.abc import Iterable from typing import cast import networkx as nx @@ -194,7 +194,8 @@ def infer_chains( return [*mappings, *new_mappings] -def index_str(index: Index) -> str: +def tabulate_index(index: Index) -> str: + """Tabulate""" from tabulate import tabulate rows: list[tuple[str, str, str, str]] = [] @@ -487,35 +488,6 @@ def validate_mappings(mappings: list[Mapping]) -> None: raise ValueError(f"banana in mapping object: {mapping}") -def df_to_mappings( - df, - *, - source_prefix: str, - target_prefix: str, - evidence: Callable[[], Evidence], - source_identifier_column: str | None = None, - target_identifier_column: str | None = None, -) -> list[Mapping]: - if source_identifier_column is None: - source_identifier_column = source_prefix - if target_identifier_column is None: - target_identifier_column = target_prefix - return [ - Mapping( - s=Reference(prefix=source_prefix, identifier=source_id), - p=EXACT_MATCH, - o=Reference(prefix=target_prefix, identifier=target_id), - evidence=[evidence()], - ) - for source_id, target_id in tqdm( - df[[source_identifier_column, target_identifier_column]].values, - unit="mapping", - unit_scale=True, - desc=f"Processing {source_prefix}", - ) - ] - - def summarize_prefixes(mappings: list[Mapping]) -> pd.DataFrame: """Get a dataframe summarizing the prefixes appearing in the mappings.""" import bioregistry diff --git a/src/semra/sources/ncit.py b/src/semra/sources/ncit.py index 4aa4389..ec624a5 100644 --- a/src/semra/sources/ncit.py +++ b/src/semra/sources/ncit.py @@ -1,13 +1,16 @@ """Get mappings from NCIT.""" +from __future__ import annotations from functools import lru_cache +from typing import Callable import bioregistry import pandas as pd import requests +from curies import Reference +from tqdm.asyncio import tqdm -from semra import UNSPECIFIED_MAPPING, Mapping, MappingSet, SimpleEvidence -from semra.api import df_to_mappings +from semra import EXACT_MATCH, UNSPECIFIED_MAPPING, Evidence, Mapping, MappingSet, SimpleEvidence __all__ = [ "get_ncit_hgnc_mappings", @@ -50,7 +53,7 @@ def _get_evidence() -> SimpleEvidence: def get_ncit_hgnc_mappings() -> list[Mapping]: df = pd.read_csv(HGNC_MAPPINGS_URL, sep="\t", header=None, names=["ncit", "hgnc"]) df["hgnc"] = df["hgnc"].map(lambda s: s.removeprefix("HGNC:")) # type:ignore - return df_to_mappings( + return _df_to_mappings( df, source_prefix="ncit", target_prefix="hgnc", @@ -61,7 +64,7 @@ def get_ncit_hgnc_mappings() -> list[Mapping]: def get_ncit_go_mappings() -> list[Mapping]: df = pd.read_csv(HGNC_MAPPINGS_URL, sep="\t", header=None, names=["go", "ncit"]) df["go"] = df["go"].map(lambda s: s.removeprefix("GO:")) # type:ignore - return df_to_mappings( + return _df_to_mappings( df, source_prefix="ncit", target_prefix="go", @@ -72,7 +75,7 @@ def get_ncit_go_mappings() -> list[Mapping]: def get_ncit_chebi_mappings() -> list[Mapping]: df = pd.read_csv(HGNC_MAPPINGS_URL, sep="\t", header=None, names=["ncit", "chebi"]) df["chebi"] = df["chebi"].map(lambda s: s.removeprefix("CHEBI:")) # type:ignore - return df_to_mappings( + return _df_to_mappings( df, source_prefix="ncit", target_prefix="chebi", @@ -82,7 +85,7 @@ def get_ncit_chebi_mappings() -> list[Mapping]: def get_ncit_uniprot_mappings() -> list[Mapping]: df = pd.read_csv(SWISSPROT_MAPPINGS_URL, sep="\t", header=None, names=["ncit", "uniprot"]) - return df_to_mappings( + return _df_to_mappings( df, source_prefix="ncit", target_prefix="uniprot", @@ -90,6 +93,35 @@ def get_ncit_uniprot_mappings() -> list[Mapping]: ) +def _df_to_mappings( + df, + *, + source_prefix: str, + target_prefix: str, + evidence: Callable[[], Evidence], + source_identifier_column: str | None = None, + target_identifier_column: str | None = None, +) -> list[Mapping]: + if source_identifier_column is None: + source_identifier_column = source_prefix + if target_identifier_column is None: + target_identifier_column = target_prefix + return [ + Mapping( + s=Reference(prefix=source_prefix, identifier=source_id), + p=EXACT_MATCH, + o=Reference(prefix=target_prefix, identifier=target_id), + evidence=[evidence()], + ) + for source_id, target_id in tqdm( + df[[source_identifier_column, target_identifier_column]].values, + unit="mapping", + unit_scale=True, + desc=f"Processing {source_prefix}", + ) + ] + + if __name__ == "__main__": from semra.api import print_source_target_counts