diff --git a/src/semra/io.py b/src/semra/io.py index f5aad28..4e1bada 100644 --- a/src/semra/io.py +++ b/src/semra/io.py @@ -391,7 +391,7 @@ def _get_sssom_row(mapping: Mapping, e: Evidence): ",".join(sorted(e.mapping_set_names)), mapping_set_version, mapping_set_license, - round(confidence, CONFIDENCE_PRECISION) if (confidence := e.confidence) is not None else "", + _safe_confidence(e), e.author.curie if e.author else "", e.explanation, ) @@ -436,6 +436,13 @@ def _neo4j_bool(b: bool, /) -> Literal["true", "false"]: # noqa:FBT001 return "true" if b else "false" # type:ignore +def _safe_confidence(x) -> str: + confidence = x.get_confidence() + if confidence is None: + return "" + return str(round(confidence, CONFIDENCE_PRECISION)) + + def write_neo4j( mappings: list[Mapping], directory: str | Path, @@ -515,7 +522,7 @@ def write_neo4j( mapping.s.curie, mapping.p.curie, mapping.o.curie, - round(c, CONFIDENCE_PRECISION) if (c := mapping.confidence) is not None else "", + _safe_confidence(mapping), _neo4j_bool(mapping.has_primary), _neo4j_bool(mapping.has_secondary), _neo4j_bool(mapping.has_tertiary), @@ -566,7 +573,7 @@ def write_neo4j( "mapping", "semra.mapping", mapping.p.curie, - mapping.confidence and round(mapping.confidence, CONFIDENCE_PRECISION), + _safe_confidence(mapping), _neo4j_bool(mapping.has_primary), _neo4j_bool(mapping.has_secondary), _neo4j_bool(mapping.has_tertiary), @@ -585,7 +592,7 @@ def write_neo4j( mapping_set.name, mapping_set.license or "", mapping_set.version or "", - c if (c := mapping_set.confidence) is not None else "", + _safe_confidence(mapping_set), ) for mapping_set in sorted(mapping_sets.values(), key=lambda n: n.curie) ), @@ -600,7 +607,7 @@ def write_neo4j( "semra.evidence", evidence.evidence_type, evidence.justification.curie, - c if (c := evidence.confidence) is not None else "", + _safe_confidence(evidence), ) for evidence in sorted(evidences.values(), key=lambda row: row.curie) ), diff --git a/src/semra/struct.py b/src/semra/struct.py index 9289c63..a4e60c5 100644 --- a/src/semra/struct.py +++ b/src/semra/struct.py @@ -8,7 +8,7 @@ from collections.abc import Iterable from hashlib import md5 from itertools import islice -from typing import Annotated, Literal, Optional, Union +from typing import Annotated, ClassVar, Literal, Optional, Union import pydantic from curies import Reference @@ -37,6 +37,7 @@ def triple_key(triple: Triple) -> tuple[str, str, str]: return triple[0].curie, triple[2].curie, triple[1].curie +EPSILON = 1e-6 EvidenceType = Literal["simple", "mutated", "reasoned"] JUSTIFICATION_FIELD = Field(description="A SSSOM-compliant justification") @@ -47,43 +48,70 @@ def _md5_hexdigest(picklable) -> str: return hasher.hexdigest() -class EvidenceMixin: +class KeyedMixin: + """A mixin for a class that can be hashed and CURIE-encoded.""" + + #: The prefix for CURIEs for instances of this class + _prefix: ClassVar[str] + + def __init_subclass__(cls, *, prefix: str, **kwargs): + cls._prefix = prefix + def key(self): + """Return a picklable key.""" raise NotImplementedError def hexdigest(self) -> str: + """Generate a hexadecimal representation of the MD5 hash of the pickled key() for this class.""" key = self.key() return _md5_hexdigest(key) def get_reference(self) -> Reference: - return Reference(prefix="semra.evidence", identifier=self.hexdigest()) + """Get a CURIE reference using this class's prefix and its hexadecimal representation.""" + return Reference(prefix=self._prefix, identifier=self.hexdigest()) @property def curie(self) -> str: + """Get a string representing the CURIE.""" return self.get_reference().curie -class MappingSet(pydantic.BaseModel): +class ConfidenceMixin: + def get_confidence(self) -> float: + raise NotImplementedError + + +class EvidenceMixin: + @property + def explanation(self) -> str: + return "" + + +class MappingSet(pydantic.BaseModel, ConfidenceMixin, KeyedMixin, prefix="semra.mappingset"): + """Represents a set of semantic mappings. + + For example, this might correspond to: + + 1. All the mappings extracted from an ontology + 2. All the mappings published with a database + 3. All the mappings inferred by SeMRA based on a given configuration + """ + name: str = Field(..., description="Name of the mapping set") version: Optional[str] = Field(default=None, description="The version of the dataset from which the mapping comes") license: Optional[str] = Field(default=None, description="License name or URL for mapping set") - confidence: Optional[float] = Field(default=None, description="Mapping set level confidence") + confidence: float = Field(..., description="Mapping set level confidence") def key(self): - return self.name, self.version or "", self.license or "", 1.0 if self.confidence is None else self.confidence + """Get a picklable key representing the mapping set.""" + return self.name, self.version or "", self.license or "", self.confidence - def hexdigest(self) -> str: - return _md5_hexdigest(self.key()) + def get_confidence(self) -> float: + """Get the confidence for the mapping set.""" + return self.confidence - def get_reference(self) -> Reference: - return Reference(prefix="semra.mappingset", identifier=self.hexdigest()) - @property - def curie(self) -> str: - return self.get_reference().curie - - -class SimpleEvidence(pydantic.BaseModel, EvidenceMixin): +class SimpleEvidence(pydantic.BaseModel, KeyedMixin, EvidenceMixin, ConfidenceMixin, prefix="semra.evidence"): """Evidence for a mapping. Ideally, this matches the SSSOM data model. @@ -108,6 +136,7 @@ class Config: ], ) uuid: UUID4 = Field(default_factory=uuid.uuid4) + confidence: Optional[float] = Field(None, description="The confidence") def key(self): """Get a key suitable for hashing the evidence. @@ -116,22 +145,17 @@ def key(self): Note: this should be extended to include basically _all_ fields """ - return (self.evidence_type, self.justification, self.author, self.mapping_set.key(), self.uuid) + return self.evidence_type, self.justification, self.author, self.mapping_set.key(), self.uuid @property def mapping_set_names(self) -> set[str]: return {self.mapping_set.name} - @property - def confidence(self) -> Optional[float]: - return self.mapping_set.confidence - - @property - def explanation(self) -> str: - return "" + def get_confidence(self) -> float: + return self.confidence if self.confidence is not None else self.mapping_set.confidence -class ReasonedEvidence(pydantic.BaseModel, EvidenceMixin): +class ReasonedEvidence(pydantic.BaseModel, KeyedMixin, EvidenceMixin, ConfidenceMixin, prefix="semra.evidence"): """A complex evidence based on multiple mappings.""" class Config: @@ -145,7 +169,7 @@ class Config: ..., description="A list of mappings and their evidences consumed to create this evidence" ) author: Optional[Reference] = None - confidence_factor: float = 1.0 + confidence_factor: float = Field(1.0, description="The probability that the reasoning method is correct") def key(self): return ( @@ -154,13 +178,9 @@ def key(self): *((*m.triple, *(e.key() for e in m.evidence)) for m in self.mappings), ) - @property - def confidence(self) -> Optional[float]: - confidences = [mapping.confidence for mapping in self.mappings] - nn_confidences = [c for c in confidences if c is not None] - if not nn_confidences: - return None - return self.confidence_factor * _joint_probability(nn_confidences) + def get_confidence(self) -> float: + confidences = [mapping.get_confidence() for mapping in self.mappings] + return _joint_probability([self.confidence_factor, *confidences]) @property def mapping_set(self) -> None: @@ -183,7 +203,7 @@ def explanation(self) -> str: ] -class Mapping(pydantic.BaseModel): +class Mapping(pydantic.BaseModel, ConfidenceMixin, KeyedMixin, prefix="semra.mapping"): """A semantic mapping.""" class Config: @@ -202,30 +222,16 @@ def triple(self) -> Triple: return self.s, self.p, self.o @classmethod - def from_triple(cls, triple: Triple, evidence: Union[list[Evidence], None] = None) -> Mapping: + def from_triple(cls, triple: Triple, evidence: Optional[list[Evidence]] = None) -> Mapping: """Instantiate a mapping from a triple.""" s, p, o = triple return cls(s=s, p=p, o=o, evidence=evidence or []) - @property - def confidence(self) -> Optional[float]: + def get_confidence(self) -> float: + """Get the mapping's confidence by aggregating its evidences' confidences in a binomial model.""" if not self.evidence: - return None - confidences = [e.confidence for e in self.evidence] - nn_confidences = [c for c in confidences if c is not None] - if not nn_confidences: - return None - return _joint_probability(nn_confidences) - - def hexdigest(self) -> str: - return _md5_hexdigest(self.triple) - - def get_reference(self) -> Reference: - return Reference(prefix="semra.mapping", identifier=self.hexdigest()) - - @property - def curie(self) -> str: - return self.get_reference().curie + raise ValueError("can not calculate confidence since no evidence") + return _joint_probability(e.get_confidence() for e in self.evidence) @property def has_primary(self) -> bool: diff --git a/tests/test_api.py b/tests/test_api.py index b19881d..c6fe7d2 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -4,6 +4,7 @@ from semra.api import ( BROAD_MATCH, + DB_XREF, EXACT_MATCH, NARROW_MATCH, Index, @@ -13,11 +14,12 @@ get_index, get_many_to_many, infer_chains, + infer_mutations, infer_reversible, keep_prefixes, project, ) -from semra.rules import MANUAL_MAPPING +from semra.rules import KNOWLEDGE_MAPPING, MANUAL_MAPPING from semra.struct import Mapping, MappingSet, ReasonedEvidence, Reference, SimpleEvidence, line, triple_key @@ -295,3 +297,31 @@ def test_get_many_to_many(self): m4 = Mapping(s=a3, p=EXACT_MATCH, o=b2) self.assert_same_triples([m3, m4], get_many_to_many([m2, m3, m4])) + + +class TestUpgrades(unittest.TestCase): + """Test inferring mutations.""" + + def test_infer_mutations(self): + """Test inferring mutations.""" + (a1,) = _get_references(1, prefix="a") + (b1,) = _get_references(1, prefix="b") + original_confidence = 0.95 + mutation_confidence = 0.80 + m1 = Mapping(s=a1, p=DB_XREF, o=b1, evidence=[SimpleEvidence(confidence=original_confidence, mapping_set=MS)]) + new_mappings = infer_mutations( + [m1], {("a", "b"): mutation_confidence}, old=DB_XREF, new=EXACT_MATCH, progress=False + ) + self.assertEqual(2, len(new_mappings)) + new_m1, new_m2 = new_mappings + self.assertEqual(m1, new_m1) + self.assertEqual(a1, new_m2.s) + self.assertEqual(EXACT_MATCH, new_m2.p) + self.assertEqual(b1, new_m2.o) + self.assertEqual(1, len(new_m2.evidence)) + new_evidence = new_m2.evidence[0] + self.assertIsInstance(new_evidence, ReasonedEvidence) + new_confidence = new_evidence.get_confidence() + self.assertIsNotNone(new_confidence) + self.assertEqual(1 - (1 - original_confidence) * (1 - mutation_confidence), new_confidence) + self.assertEqual(KNOWLEDGE_MAPPING, new_evidence.justification)