diff --git a/src/semra/io.py b/src/semra/io.py index cd0637b..3b5209b 100644 --- a/src/semra/io.py +++ b/src/semra/io.py @@ -2,6 +2,7 @@ from __future__ import annotations +import csv import gzip import logging import pickle @@ -21,6 +22,7 @@ import requests from bioregistry import Collection from tqdm.autonotebook import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm from semra.rules import DB_XREF, UNSPECIFIED_MAPPING from semra.struct import Evidence, Mapping, MappingSet, ReasonedEvidence, Reference, SimpleEvidence @@ -50,6 +52,8 @@ #: node to the mapping node(s) from which it was derived DERIVED_PREDICATE = "derivedFromMapping" +HAS_AUTHOR_PREDICATE = "hasAuthor" + #: The default confidence for ontology-based mappings DEFAULT_ONTOLOGY_CONFIDENCE = 0.9 @@ -355,6 +359,8 @@ def _parse_sssom_row( author = None if "mapping_set_name" in row and pd.notna(row["mapping_set_name"]): n = row["mapping_set_name"] + elif "mapping_set" in row and pd.notna(row["mapping_set"]): + n = row["mapping_set"] elif mapping_set_name is None: raise KeyError("need a mapping set name") else: @@ -422,8 +428,9 @@ def get_sssom_df(mappings: list[Mapping], *, add_labels: bool = False) -> pd.Dat ] df = pd.DataFrame(rows, columns=columns) if add_labels: - for label_column, id_column in [("subject_label", "subject_id"), ("object_label", "object_id")]: - df[label_column] = df[id_column].map(_get_name_by_curie) # type:ignore + with logging_redirect_tqdm(): + for label_column, id_column in [("subject_label", "subject_id"), ("object_label", "object_id")]: + df[label_column] = df[id_column].map(_get_name_by_curie) # type:ignore df = df[ [ "subject_id", @@ -474,8 +481,11 @@ def get_orcid_name(orcid: str) -> Optional[str]: if orcid.startswith("orcid:"): orcid = orcid[len("orcid:") :] - res = requests.get(f"https://orcid.org/{orcid}", headers={"Accept": "application/json"}, timeout=5).json() - name = res["person"]["name"] + try: + res = requests.get(f"https://orcid.org/{orcid}", headers={"Accept": "application/json"}, timeout=5).json() + except IOError: # e.g., ReadTimeout + return None + name = res.get("person", {}).get("name") if name is None: return None if credit_name := name.get("credit-name"): @@ -548,7 +558,7 @@ def _neo4j_bool(b: bool, /) -> Literal["true", "false"]: # noqa:FBT001 return "true" if b else "false" # type:ignore -def _safe_confidence(x) -> str: +def _safe_confidence(x: Evidence) -> str: confidence = x.get_confidence() if confidence is None: return "" @@ -564,6 +574,7 @@ def write_neo4j( add_labels: bool = False, startup_script_name: str = "startup.sh", run_script_name: str = "run_on_docker.sh", + sort: bool = False, ) -> None: """Write all files needed to construct a Neo4j graph database from a set of mappings. @@ -587,6 +598,7 @@ def write_neo4j( :param startup_script_name: The name of the startup script that the Dockerfile calls :param run_script_name: The name of the run script that you as the user should call to wrap building and running the Docker image + :param sort: Should the output nodes files be sorted? :raises NotADirectoryError: If the directory given does not already exist. It's suggested to use :mod:`pystow` to create deterministic directories. @@ -617,16 +629,15 @@ def write_neo4j( run_path = directory.joinpath(run_script_name) docker_path = directory.joinpath("Dockerfile") - concept_nodes_path = directory.joinpath("concept_nodes.tsv") + concept_nodes_path = directory.joinpath("concept_nodes.tsv.gz") concepts: set[Reference] = set() - concept_nodes_header = ["curie:ID", ":LABEL", "prefix", "name", "priority:boolean"] + concept_nodes_header = ["curie:ID", "prefix", "name", "priority:boolean"] if equivalence_classes is None: equivalence_classes = {} - mapping_nodes_path = directory.joinpath("mapping_nodes.tsv") + mapping_nodes_path = directory.joinpath("mapping_nodes.tsv.gz") mapping_nodes_header = [ "curie:ID", - ":LABEL", "prefix", "predicate", "confidence", @@ -635,22 +646,20 @@ def write_neo4j( "tertiary:boolean", ] - evidence_nodes_path = directory.joinpath("evidence_nodes.tsv") + evidence_nodes_path = directory.joinpath("evidence_nodes.tsv.gz") evidences = {} evidence_nodes_header = [ "curie:ID", - ":LABEL", "prefix", "type", "mapping_justification", "confidence:float", ] - mapping_set_nodes_path = directory.joinpath("mapping_set_nodes.tsv") + mapping_set_nodes_path = directory.joinpath("mapping_set_nodes.tsv.gz") mapping_sets = {} mapping_set_nodes_header = [ "curie:ID", - ":LABEL", "prefix", "name", "license", @@ -658,8 +667,7 @@ def write_neo4j( "confidence:float", ] - edges_path = directory.joinpath("edges.tsv") - edges: list[tuple[str, str, str, str | float, str, str, str, str]] = [] + mapping_edges_path = directory.joinpath("mapping_edges.tsv.gz") edges_header = [ ":START_ID", ":TYPE", @@ -670,65 +678,80 @@ def write_neo4j( "tertiary:boolean", "mapping_sets:string[]", ] - - for mapping in tqdm(mappings, unit="mapping", unit_scale=True, desc="Preparing Neo4j"): - concepts.add(mapping.s) - concepts.add(mapping.o) - - edges.append( - ( - mapping.s.curie, - mapping.p.curie, - mapping.o.curie, - _safe_confidence(mapping), - _neo4j_bool(mapping.has_primary), - _neo4j_bool(mapping.has_secondary), - _neo4j_bool(mapping.has_tertiary), - "|".join(sorted({evidence.mapping_set.name for evidence in mapping.evidence if evidence.mapping_set})), + edges_path = directory.joinpath("edges.tsv.gz") + edges_supp_header = [ + ":START_ID", + ":TYPE", + ":END_ID", + ] + with gzip.open(mapping_edges_path, "wt") as file1, gzip.open(edges_path, "wt") as file2: + mapping_writer = csv.writer(file1, delimiter="\t") + mapping_writer.writerow(edges_header) + + edge_writer = csv.writer(file2, delimiter="\t") + edge_writer.writerow(edges_supp_header) + + for mapping in tqdm(mappings, unit="mapping", unit_scale=True, desc="Preparing Neo4j"): + concepts.add(mapping.s) + concepts.add(mapping.o) + + mapping_writer.writerow( + ( + mapping.s.curie, + mapping.p.curie, + mapping.o.curie, + _safe_confidence(mapping), + _neo4j_bool(mapping.has_primary), + _neo4j_bool(mapping.has_secondary), + _neo4j_bool(mapping.has_tertiary), + "|".join( + sorted({evidence.mapping_set.name for evidence in mapping.evidence if evidence.mapping_set}) + ), + ) ) - ) - edges.append((mapping.curie, ANNOTATED_SOURCE.curie, mapping.s.curie, "", "", "", "", "")) - edges.append((mapping.curie, ANNOTATED_TARGET.curie, mapping.o.curie, "", "", "", "", "")) - for evidence in mapping.evidence: - edges.append((mapping.curie, HAS_EVIDENCE_PREDICATE, evidence.curie, "", "", "", "", "")) - evidences[evidence.key()] = evidence - if evidence.mapping_set: - mapping_sets[evidence.mapping_set.name] = evidence.mapping_set - edges.append((evidence.curie, FROM_SET_PREDICATE, evidence.mapping_set.curie, "", "", "", "", "")) - elif isinstance(evidence, ReasonedEvidence): - for mmm in evidence.mappings: - edges.append((evidence.curie, DERIVED_PREDICATE, mmm.curie, "", "", "", "", "")) - # elif isinstance(evidence, SimpleEvidence): - # pass - # else: - # raise TypeError - - # Add authorship information for the evidence, if available - if evidence.author: - concepts.add(evidence.author) - edges.append((evidence.curie, "hasAuthor", evidence.author.curie, "", "", "", "", "")) - - _write_tsv( + edge_writer.writerow((mapping.curie, ANNOTATED_SOURCE.curie, mapping.s.curie)) + edge_writer.writerow((mapping.curie, ANNOTATED_TARGET.curie, mapping.o.curie)) + for evidence in mapping.evidence: + edge_writer.writerow((mapping.curie, HAS_EVIDENCE_PREDICATE, evidence.curie)) + evidences[evidence.key()] = evidence + if evidence.mapping_set: + mapping_sets[evidence.mapping_set.name] = evidence.mapping_set + edge_writer.writerow((evidence.curie, FROM_SET_PREDICATE, evidence.mapping_set.curie)) + elif isinstance(evidence, ReasonedEvidence): + for mmm in evidence.mappings: + edge_writer.writerow((evidence.curie, DERIVED_PREDICATE, mmm.curie)) + # elif isinstance(evidence, SimpleEvidence): + # pass + # else: + # raise TypeError + + # Add authorship information for the evidence, if available + if evidence.author: + concepts.add(evidence.author) + edge_writer.writerow((evidence.curie, HAS_AUTHOR_PREDICATE, evidence.author.curie)) + + sorted_concepts = sorted(concepts, key=lambda n: n.curie) if sort else list(concepts) + _write_tsv_gz( concept_nodes_path, concept_nodes_header, ( ( concept.curie, - "concept", concept.prefix, _get_name_by_curie(concept.curie) or "" if add_labels else "", _neo4j_bool(equivalence_classes.get(concept, False)), ) - for concept in sorted(concepts, key=lambda n: n.curie) + for concept in tqdm(sorted_concepts, desc="writing concept nodes", unit_scale=True, unit="concept") ), ) - _write_tsv( + + sorted_mappings = sorted(mappings, key=lambda n: n.curie) if sort else mappings + _write_tsv_gz( mapping_nodes_path, mapping_nodes_header, ( ( mapping.curie, - "mapping", "semra.mapping", mapping.p.curie, _safe_confidence(mapping), @@ -736,41 +759,44 @@ def write_neo4j( _neo4j_bool(mapping.has_secondary), _neo4j_bool(mapping.has_tertiary), ) - for mapping in sorted(mappings, key=lambda n: n.curie) + for mapping in tqdm(sorted_mappings, desc="writing mapping nodes", unit_scale=True, unit="mapping") ), ) - _write_tsv( + + sorted_mapping_sets = sorted(mapping_sets.values(), key=lambda n: n.curie) if sort else list(mapping_sets.values()) + _write_tsv_gz( mapping_set_nodes_path, mapping_set_nodes_header, ( ( mapping_set.curie, - "mappingset", "semra.mappingset", mapping_set.name, mapping_set.license or "", mapping_set.version or "", _safe_confidence(mapping_set), ) - for mapping_set in sorted(mapping_sets.values(), key=lambda n: n.curie) + for mapping_set in sorted_mapping_sets ), ) - _write_tsv( + + sorted_evidences = sorted(evidences.values(), key=lambda row: row.curie) if sort else list(evidences.values()) + _write_tsv_gz( evidence_nodes_path, evidence_nodes_header, ( ( evidence.curie, - "evidence", "semra.evidence", evidence.evidence_type, evidence.justification.curie, _safe_confidence(evidence), ) - for evidence in sorted(evidences.values(), key=lambda row: row.curie) + for evidence in tqdm( + sorted_evidences, desc="Writing evidence nodes", leave=False, unit_scale=True, unit="evidence" + ) ), ) - _write_tsv(edges_path, edges_header, sorted(set(edges), key=_edge_key)) startup_commands = dedent( """\ @@ -778,9 +804,7 @@ def write_neo4j( neo4j start # Get the port - until [ \ - "$(curl -s -w '%{http_code}' -o /dev/null "http://localhost:7474")" \ - -eq 200 ] + until [ "$(curl -s -w '%{http_code}' -o /dev/null "http://localhost:7474")" -eq 200 ] do sleep 5 done @@ -810,26 +834,33 @@ def write_neo4j( apt-get install -y git zip unzip bzip2 gcc pkg-config python3.11 && \\ curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11 - ARG twiddle1=dee RUN python3.11 -m pip install "semra[web] @ git+https://github.com/biopragmatics/semra.git" # Add graph content - ARG twiddle2=dee - COPY concept_nodes.tsv /sw/concept_nodes.tsv - COPY mapping_nodes.tsv /sw/mapping_nodes.tsv - COPY evidence_nodes.tsv /sw/evidence_nodes.tsv - COPY mapping_set_nodes.tsv /sw/mapping_set_nodes.tsv - COPY edges.tsv /sw/edges.tsv + COPY concept_nodes.tsv.gz /sw/concept_nodes.tsv.gz + COPY mapping_nodes.tsv.gz /sw/mapping_nodes.tsv.gz + COPY evidence_nodes.tsv.gz /sw/evidence_nodes.tsv.gz + COPY mapping_set_nodes.tsv.gz /sw/mapping_set_nodes.tsv.gz + COPY mapping_edges.tsv.gz /sw/mapping_edges.tsv.gz + COPY edges.tsv.gz /sw/edges.tsv.gz # Ingest graph content into neo4j RUN sed -i 's/#dbms.default_listen_address/dbms.default_listen_address/' /etc/neo4j/neo4j.conf && \\ sed -i 's/#dbms.security.auth_enabled/dbms.security.auth_enabled/' /etc/neo4j/neo4j.conf && \\ neo4j-admin import --delimiter='TAB' --skip-duplicate-nodes=true --skip-bad-relationships=true \\ + --relationships /sw/mapping_edges.tsv \\ --relationships /sw/edges.tsv \\ - --nodes /sw/concept_nodes.tsv \\ - --nodes /sw/mapping_nodes.tsv \\ - --nodes /sw/mapping_set_nodes.tsv \\ - --nodes /sw/evidence_nodes.tsv + --nodes=concept=/sw/concept_nodes.tsv \\ + --nodes=mapping=/sw/mapping_nodes.tsv \\ + --nodes=mappingset=/sw/mapping_set_nodes.tsv \\ + --nodes=evidence=/sw/evidence_nodes.tsv + + RUN rm /sw/concept_nodes.tsv.gz + RUN rm /sw/mapping_nodes.tsv.gz + RUN rm /sw/evidence_nodes.tsv.gz + RUN rm /sw/mapping_set_nodes.tsv.gz + RUN rm /sw/edges.tsv.gz + RUN rm /sw/mapping_edges.tsv.gz COPY startup.sh startup.sh ENTRYPOINT ["/bin/bash", "/sw/startup.sh"] @@ -865,9 +896,9 @@ def write_neo4j( # command_path.write_text(shell_command) -def _write_tsv(path, header, rows) -> None: +def _write_tsv_gz(path, header, rows) -> None: click.echo(f"writing to {path}") - with path.open("w") as file: - print(*header, sep="\t", file=file) # noqa:T201 - for row in rows: - print(*row, sep="\t", file=file) # noqa:T201 + with gzip.open(path, "wt") as file: + writer = csv.writer(file, delimiter="\t") + writer.writerow(header) + writer.writerows(rows)