Skip to content

Commit

Permalink
Update neo4j and other outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
cthoyt committed Apr 29, 2024
1 parent 6112989 commit 01e9274
Showing 1 changed file with 115 additions and 84 deletions.
199 changes: 115 additions & 84 deletions src/semra/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import csv
import gzip
import logging
import pickle
Expand All @@ -21,6 +22,7 @@
import requests
from bioregistry import Collection
from tqdm.autonotebook import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

from semra.rules import DB_XREF, UNSPECIFIED_MAPPING
from semra.struct import Evidence, Mapping, MappingSet, ReasonedEvidence, Reference, SimpleEvidence
Expand Down Expand Up @@ -50,6 +52,8 @@
#: node to the mapping node(s) from which it was derived
DERIVED_PREDICATE = "derivedFromMapping"

HAS_AUTHOR_PREDICATE = "hasAuthor"

#: The default confidence for ontology-based mappings
DEFAULT_ONTOLOGY_CONFIDENCE = 0.9

Expand Down Expand Up @@ -355,6 +359,8 @@ def _parse_sssom_row(
author = None
if "mapping_set_name" in row and pd.notna(row["mapping_set_name"]):
n = row["mapping_set_name"]
elif "mapping_set" in row and pd.notna(row["mapping_set"]):
n = row["mapping_set"]
elif mapping_set_name is None:
raise KeyError("need a mapping set name")
else:
Expand Down Expand Up @@ -422,8 +428,9 @@ def get_sssom_df(mappings: list[Mapping], *, add_labels: bool = False) -> pd.Dat
]
df = pd.DataFrame(rows, columns=columns)
if add_labels:
for label_column, id_column in [("subject_label", "subject_id"), ("object_label", "object_id")]:
df[label_column] = df[id_column].map(_get_name_by_curie) # type:ignore
with logging_redirect_tqdm():
for label_column, id_column in [("subject_label", "subject_id"), ("object_label", "object_id")]:
df[label_column] = df[id_column].map(_get_name_by_curie) # type:ignore
df = df[
[
"subject_id",
Expand Down Expand Up @@ -474,8 +481,11 @@ def get_orcid_name(orcid: str) -> Optional[str]:
if orcid.startswith("orcid:"):
orcid = orcid[len("orcid:") :]

res = requests.get(f"https://orcid.org/{orcid}", headers={"Accept": "application/json"}, timeout=5).json()
name = res["person"]["name"]
try:
res = requests.get(f"https://orcid.org/{orcid}", headers={"Accept": "application/json"}, timeout=5).json()
except IOError: # e.g., ReadTimeout
return None
name = res.get("person", {}).get("name")
if name is None:
return None
if credit_name := name.get("credit-name"):
Expand Down Expand Up @@ -548,7 +558,7 @@ def _neo4j_bool(b: bool, /) -> Literal["true", "false"]: # noqa:FBT001
return "true" if b else "false" # type:ignore


def _safe_confidence(x) -> str:
def _safe_confidence(x: Evidence) -> str:
confidence = x.get_confidence()
if confidence is None:
return ""
Expand All @@ -564,6 +574,7 @@ def write_neo4j(
add_labels: bool = False,
startup_script_name: str = "startup.sh",
run_script_name: str = "run_on_docker.sh",
sort: bool = False,
) -> None:
"""Write all files needed to construct a Neo4j graph database from a set of mappings.
Expand All @@ -587,6 +598,7 @@ def write_neo4j(
:param startup_script_name: The name of the startup script that the Dockerfile calls
:param run_script_name:
The name of the run script that you as the user should call to wrap building and running the Docker image
:param sort: Should the output nodes files be sorted?
:raises NotADirectoryError:
If the directory given does not already exist. It's suggested
to use :mod:`pystow` to create deterministic directories.
Expand Down Expand Up @@ -617,16 +629,15 @@ def write_neo4j(
run_path = directory.joinpath(run_script_name)
docker_path = directory.joinpath("Dockerfile")

concept_nodes_path = directory.joinpath("concept_nodes.tsv")
concept_nodes_path = directory.joinpath("concept_nodes.tsv.gz")
concepts: set[Reference] = set()
concept_nodes_header = ["curie:ID", ":LABEL", "prefix", "name", "priority:boolean"]
concept_nodes_header = ["curie:ID", "prefix", "name", "priority:boolean"]
if equivalence_classes is None:
equivalence_classes = {}

mapping_nodes_path = directory.joinpath("mapping_nodes.tsv")
mapping_nodes_path = directory.joinpath("mapping_nodes.tsv.gz")
mapping_nodes_header = [
"curie:ID",
":LABEL",
"prefix",
"predicate",
"confidence",
Expand All @@ -635,31 +646,28 @@ def write_neo4j(
"tertiary:boolean",
]

evidence_nodes_path = directory.joinpath("evidence_nodes.tsv")
evidence_nodes_path = directory.joinpath("evidence_nodes.tsv.gz")
evidences = {}
evidence_nodes_header = [
"curie:ID",
":LABEL",
"prefix",
"type",
"mapping_justification",
"confidence:float",
]

mapping_set_nodes_path = directory.joinpath("mapping_set_nodes.tsv")
mapping_set_nodes_path = directory.joinpath("mapping_set_nodes.tsv.gz")
mapping_sets = {}
mapping_set_nodes_header = [
"curie:ID",
":LABEL",
"prefix",
"name",
"license",
"version",
"confidence:float",
]

edges_path = directory.joinpath("edges.tsv")
edges: list[tuple[str, str, str, str | float, str, str, str, str]] = []
mapping_edges_path = directory.joinpath("mapping_edges.tsv.gz")
edges_header = [
":START_ID",
":TYPE",
Expand All @@ -670,117 +678,133 @@ def write_neo4j(
"tertiary:boolean",
"mapping_sets:string[]",
]

for mapping in tqdm(mappings, unit="mapping", unit_scale=True, desc="Preparing Neo4j"):
concepts.add(mapping.s)
concepts.add(mapping.o)

edges.append(
(
mapping.s.curie,
mapping.p.curie,
mapping.o.curie,
_safe_confidence(mapping),
_neo4j_bool(mapping.has_primary),
_neo4j_bool(mapping.has_secondary),
_neo4j_bool(mapping.has_tertiary),
"|".join(sorted({evidence.mapping_set.name for evidence in mapping.evidence if evidence.mapping_set})),
edges_path = directory.joinpath("edges.tsv.gz")
edges_supp_header = [
":START_ID",
":TYPE",
":END_ID",
]
with gzip.open(mapping_edges_path, "wt") as file1, gzip.open(edges_path, "wt") as file2:
mapping_writer = csv.writer(file1, delimiter="\t")
mapping_writer.writerow(edges_header)

edge_writer = csv.writer(file2, delimiter="\t")
edge_writer.writerow(edges_supp_header)

for mapping in tqdm(mappings, unit="mapping", unit_scale=True, desc="Preparing Neo4j"):
concepts.add(mapping.s)
concepts.add(mapping.o)

mapping_writer.writerow(
(
mapping.s.curie,
mapping.p.curie,
mapping.o.curie,
_safe_confidence(mapping),
_neo4j_bool(mapping.has_primary),
_neo4j_bool(mapping.has_secondary),
_neo4j_bool(mapping.has_tertiary),
"|".join(
sorted({evidence.mapping_set.name for evidence in mapping.evidence if evidence.mapping_set})
),
)
)
)
edges.append((mapping.curie, ANNOTATED_SOURCE.curie, mapping.s.curie, "", "", "", "", ""))
edges.append((mapping.curie, ANNOTATED_TARGET.curie, mapping.o.curie, "", "", "", "", ""))
for evidence in mapping.evidence:
edges.append((mapping.curie, HAS_EVIDENCE_PREDICATE, evidence.curie, "", "", "", "", ""))
evidences[evidence.key()] = evidence
if evidence.mapping_set:
mapping_sets[evidence.mapping_set.name] = evidence.mapping_set
edges.append((evidence.curie, FROM_SET_PREDICATE, evidence.mapping_set.curie, "", "", "", "", ""))
elif isinstance(evidence, ReasonedEvidence):
for mmm in evidence.mappings:
edges.append((evidence.curie, DERIVED_PREDICATE, mmm.curie, "", "", "", "", ""))
# elif isinstance(evidence, SimpleEvidence):
# pass
# else:
# raise TypeError

# Add authorship information for the evidence, if available
if evidence.author:
concepts.add(evidence.author)
edges.append((evidence.curie, "hasAuthor", evidence.author.curie, "", "", "", "", ""))

_write_tsv(
edge_writer.writerow((mapping.curie, ANNOTATED_SOURCE.curie, mapping.s.curie))
edge_writer.writerow((mapping.curie, ANNOTATED_TARGET.curie, mapping.o.curie))
for evidence in mapping.evidence:
edge_writer.writerow((mapping.curie, HAS_EVIDENCE_PREDICATE, evidence.curie))
evidences[evidence.key()] = evidence
if evidence.mapping_set:
mapping_sets[evidence.mapping_set.name] = evidence.mapping_set
edge_writer.writerow((evidence.curie, FROM_SET_PREDICATE, evidence.mapping_set.curie))
elif isinstance(evidence, ReasonedEvidence):
for mmm in evidence.mappings:
edge_writer.writerow((evidence.curie, DERIVED_PREDICATE, mmm.curie))
# elif isinstance(evidence, SimpleEvidence):
# pass
# else:
# raise TypeError

# Add authorship information for the evidence, if available
if evidence.author:
concepts.add(evidence.author)
edge_writer.writerow((evidence.curie, HAS_AUTHOR_PREDICATE, evidence.author.curie))

sorted_concepts = sorted(concepts, key=lambda n: n.curie) if sort else list(concepts)
_write_tsv_gz(
concept_nodes_path,
concept_nodes_header,
(
(
concept.curie,
"concept",
concept.prefix,
_get_name_by_curie(concept.curie) or "" if add_labels else "",
_neo4j_bool(equivalence_classes.get(concept, False)),
)
for concept in sorted(concepts, key=lambda n: n.curie)
for concept in tqdm(sorted_concepts, desc="writing concept nodes", unit_scale=True, unit="concept")
),
)
_write_tsv(

sorted_mappings = sorted(mappings, key=lambda n: n.curie) if sort else mappings
_write_tsv_gz(
mapping_nodes_path,
mapping_nodes_header,
(
(
mapping.curie,
"mapping",
"semra.mapping",
mapping.p.curie,
_safe_confidence(mapping),
_neo4j_bool(mapping.has_primary),
_neo4j_bool(mapping.has_secondary),
_neo4j_bool(mapping.has_tertiary),
)
for mapping in sorted(mappings, key=lambda n: n.curie)
for mapping in tqdm(sorted_mappings, desc="writing mapping nodes", unit_scale=True, unit="mapping")
),
)
_write_tsv(

sorted_mapping_sets = sorted(mapping_sets.values(), key=lambda n: n.curie) if sort else list(mapping_sets.values())
_write_tsv_gz(
mapping_set_nodes_path,
mapping_set_nodes_header,
(
(
mapping_set.curie,
"mappingset",
"semra.mappingset",
mapping_set.name,
mapping_set.license or "",
mapping_set.version or "",
_safe_confidence(mapping_set),
)
for mapping_set in sorted(mapping_sets.values(), key=lambda n: n.curie)
for mapping_set in sorted_mapping_sets
),
)
_write_tsv(

sorted_evidences = sorted(evidences.values(), key=lambda row: row.curie) if sort else list(evidences.values())
_write_tsv_gz(
evidence_nodes_path,
evidence_nodes_header,
(
(
evidence.curie,
"evidence",
"semra.evidence",
evidence.evidence_type,
evidence.justification.curie,
_safe_confidence(evidence),
)
for evidence in sorted(evidences.values(), key=lambda row: row.curie)
for evidence in tqdm(
sorted_evidences, desc="Writing evidence nodes", leave=False, unit_scale=True, unit="evidence"
)
),
)
_write_tsv(edges_path, edges_header, sorted(set(edges), key=_edge_key))

startup_commands = dedent(
"""\
#!/bin/bash
neo4j start
# Get the port
until [ \
"$(curl -s -w '%{http_code}' -o /dev/null "http://localhost:7474")" \
-eq 200 ]
until [ "$(curl -s -w '%{http_code}' -o /dev/null "http://localhost:7474")" -eq 200 ]
do
sleep 5
done
Expand Down Expand Up @@ -810,26 +834,33 @@ def write_neo4j(
apt-get install -y git zip unzip bzip2 gcc pkg-config python3.11 && \\
curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11
ARG twiddle1=dee
RUN python3.11 -m pip install "semra[web] @ git+https://github.com/biopragmatics/semra.git"
# Add graph content
ARG twiddle2=dee
COPY concept_nodes.tsv /sw/concept_nodes.tsv
COPY mapping_nodes.tsv /sw/mapping_nodes.tsv
COPY evidence_nodes.tsv /sw/evidence_nodes.tsv
COPY mapping_set_nodes.tsv /sw/mapping_set_nodes.tsv
COPY edges.tsv /sw/edges.tsv
COPY concept_nodes.tsv.gz /sw/concept_nodes.tsv.gz
COPY mapping_nodes.tsv.gz /sw/mapping_nodes.tsv.gz
COPY evidence_nodes.tsv.gz /sw/evidence_nodes.tsv.gz
COPY mapping_set_nodes.tsv.gz /sw/mapping_set_nodes.tsv.gz
COPY mapping_edges.tsv.gz /sw/mapping_edges.tsv.gz
COPY edges.tsv.gz /sw/edges.tsv.gz
# Ingest graph content into neo4j
RUN sed -i 's/#dbms.default_listen_address/dbms.default_listen_address/' /etc/neo4j/neo4j.conf && \\
sed -i 's/#dbms.security.auth_enabled/dbms.security.auth_enabled/' /etc/neo4j/neo4j.conf && \\
neo4j-admin import --delimiter='TAB' --skip-duplicate-nodes=true --skip-bad-relationships=true \\
--relationships /sw/mapping_edges.tsv \\
--relationships /sw/edges.tsv \\
--nodes /sw/concept_nodes.tsv \\
--nodes /sw/mapping_nodes.tsv \\
--nodes /sw/mapping_set_nodes.tsv \\
--nodes /sw/evidence_nodes.tsv
--nodes=concept=/sw/concept_nodes.tsv \\
--nodes=mapping=/sw/mapping_nodes.tsv \\
--nodes=mappingset=/sw/mapping_set_nodes.tsv \\
--nodes=evidence=/sw/evidence_nodes.tsv
RUN rm /sw/concept_nodes.tsv.gz
RUN rm /sw/mapping_nodes.tsv.gz
RUN rm /sw/evidence_nodes.tsv.gz
RUN rm /sw/mapping_set_nodes.tsv.gz
RUN rm /sw/edges.tsv.gz
RUN rm /sw/mapping_edges.tsv.gz
COPY startup.sh startup.sh
ENTRYPOINT ["/bin/bash", "/sw/startup.sh"]
Expand Down Expand Up @@ -865,9 +896,9 @@ def write_neo4j(
# command_path.write_text(shell_command)


def _write_tsv(path, header, rows) -> None:
def _write_tsv_gz(path, header, rows) -> None:
click.echo(f"writing to {path}")
with path.open("w") as file:
print(*header, sep="\t", file=file) # noqa:T201
for row in rows:
print(*row, sep="\t", file=file) # noqa:T201
with gzip.open(path, "wt") as file:
writer = csv.writer(file, delimiter="\t")
writer.writerow(header)
writer.writerows(rows)

0 comments on commit 01e9274

Please sign in to comment.