Skip to content

Commit

Permalink
Update probability model and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
cthoyt committed Jan 22, 2024
1 parent 2151ec6 commit a8091bb
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 59 deletions.
17 changes: 12 additions & 5 deletions src/semra/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def _get_sssom_row(mapping: Mapping, e: Evidence):
",".join(sorted(e.mapping_set_names)),
mapping_set_version,
mapping_set_license,
round(confidence, CONFIDENCE_PRECISION) if (confidence := e.confidence) is not None else "",
_safe_confidence(e),
e.author.curie if e.author else "",
e.explanation,
)
Expand Down Expand Up @@ -436,6 +436,13 @@ def _neo4j_bool(b: bool, /) -> Literal["true", "false"]: # noqa:FBT001
return "true" if b else "false" # type:ignore


def _safe_confidence(x) -> str:
confidence = x.get_confidence()
if confidence is None:
return ""
return str(round(confidence, CONFIDENCE_PRECISION))


def write_neo4j(
mappings: list[Mapping],
directory: str | Path,
Expand Down Expand Up @@ -515,7 +522,7 @@ def write_neo4j(
mapping.s.curie,
mapping.p.curie,
mapping.o.curie,
round(c, CONFIDENCE_PRECISION) if (c := mapping.confidence) is not None else "",
_safe_confidence(mapping),
_neo4j_bool(mapping.has_primary),
_neo4j_bool(mapping.has_secondary),
_neo4j_bool(mapping.has_tertiary),
Expand Down Expand Up @@ -566,7 +573,7 @@ def write_neo4j(
"mapping",
"semra.mapping",
mapping.p.curie,
mapping.confidence and round(mapping.confidence, CONFIDENCE_PRECISION),
_safe_confidence(mapping),
_neo4j_bool(mapping.has_primary),
_neo4j_bool(mapping.has_secondary),
_neo4j_bool(mapping.has_tertiary),
Expand All @@ -585,7 +592,7 @@ def write_neo4j(
mapping_set.name,
mapping_set.license or "",
mapping_set.version or "",
c if (c := mapping_set.confidence) is not None else "",
_safe_confidence(mapping_set),
)
for mapping_set in sorted(mapping_sets.values(), key=lambda n: n.curie)
),
Expand All @@ -600,7 +607,7 @@ def write_neo4j(
"semra.evidence",
evidence.evidence_type,
evidence.justification.curie,
c if (c := evidence.confidence) is not None else "",
_safe_confidence(evidence),
)
for evidence in sorted(evidences.values(), key=lambda row: row.curie)
),
Expand Down
112 changes: 59 additions & 53 deletions src/semra/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections.abc import Iterable
from hashlib import md5
from itertools import islice
from typing import Annotated, Literal, Optional, Union
from typing import Annotated, ClassVar, Literal, Optional, Union

import pydantic
from curies import Reference
Expand Down Expand Up @@ -37,6 +37,7 @@ def triple_key(triple: Triple) -> tuple[str, str, str]:
return triple[0].curie, triple[2].curie, triple[1].curie


EPSILON = 1e-6
EvidenceType = Literal["simple", "mutated", "reasoned"]
JUSTIFICATION_FIELD = Field(description="A SSSOM-compliant justification")

Expand All @@ -47,43 +48,70 @@ def _md5_hexdigest(picklable) -> str:
return hasher.hexdigest()


class EvidenceMixin:
class KeyedMixin:
"""A mixin for a class that can be hashed and CURIE-encoded."""

#: The prefix for CURIEs for instances of this class
_prefix: ClassVar[str]

def __init_subclass__(cls, *, prefix: str, **kwargs):
cls._prefix = prefix

def key(self):
"""Return a picklable key."""
raise NotImplementedError

def hexdigest(self) -> str:
"""Generate a hexadecimal representation of the MD5 hash of the pickled key() for this class."""
key = self.key()
return _md5_hexdigest(key)

def get_reference(self) -> Reference:
return Reference(prefix="semra.evidence", identifier=self.hexdigest())
"""Get a CURIE reference using this class's prefix and its hexadecimal representation."""
return Reference(prefix=self._prefix, identifier=self.hexdigest())

@property
def curie(self) -> str:
"""Get a string representing the CURIE."""
return self.get_reference().curie


class MappingSet(pydantic.BaseModel):
class ConfidenceMixin:
def get_confidence(self) -> float:
raise NotImplementedError


class EvidenceMixin:
@property
def explanation(self) -> str:
return ""


class MappingSet(pydantic.BaseModel, ConfidenceMixin, KeyedMixin, prefix="semra.mappingset"):
"""Represents a set of semantic mappings.
For example, this might correspond to:
1. All the mappings extracted from an ontology
2. All the mappings published with a database
3. All the mappings inferred by SeMRA based on a given configuration
"""

name: str = Field(..., description="Name of the mapping set")
version: Optional[str] = Field(default=None, description="The version of the dataset from which the mapping comes")
license: Optional[str] = Field(default=None, description="License name or URL for mapping set")
confidence: Optional[float] = Field(default=None, description="Mapping set level confidence")
confidence: float = Field(..., description="Mapping set level confidence")

def key(self):
return self.name, self.version or "", self.license or "", 1.0 if self.confidence is None else self.confidence
"""Get a picklable key representing the mapping set."""
return self.name, self.version or "", self.license or "", self.confidence

def hexdigest(self) -> str:
return _md5_hexdigest(self.key())
def get_confidence(self) -> float:
"""Get the confidence for the mapping set."""
return self.confidence

def get_reference(self) -> Reference:
return Reference(prefix="semra.mappingset", identifier=self.hexdigest())

@property
def curie(self) -> str:
return self.get_reference().curie


class SimpleEvidence(pydantic.BaseModel, EvidenceMixin):
class SimpleEvidence(pydantic.BaseModel, KeyedMixin, EvidenceMixin, ConfidenceMixin, prefix="semra.evidence"):
"""Evidence for a mapping.
Ideally, this matches the SSSOM data model.
Expand All @@ -108,6 +136,7 @@ class Config:
],
)
uuid: UUID4 = Field(default_factory=uuid.uuid4)
confidence: Optional[float] = Field(None, description="The confidence")

def key(self):
"""Get a key suitable for hashing the evidence.
Expand All @@ -116,22 +145,17 @@ def key(self):
Note: this should be extended to include basically _all_ fields
"""
return (self.evidence_type, self.justification, self.author, self.mapping_set.key(), self.uuid)
return self.evidence_type, self.justification, self.author, self.mapping_set.key(), self.uuid

@property
def mapping_set_names(self) -> set[str]:
return {self.mapping_set.name}

@property
def confidence(self) -> Optional[float]:
return self.mapping_set.confidence

@property
def explanation(self) -> str:
return ""
def get_confidence(self) -> float:
return self.confidence if self.confidence is not None else self.mapping_set.confidence


class ReasonedEvidence(pydantic.BaseModel, EvidenceMixin):
class ReasonedEvidence(pydantic.BaseModel, KeyedMixin, EvidenceMixin, ConfidenceMixin, prefix="semra.evidence"):
"""A complex evidence based on multiple mappings."""

class Config:
Expand All @@ -145,7 +169,7 @@ class Config:
..., description="A list of mappings and their evidences consumed to create this evidence"
)
author: Optional[Reference] = None
confidence_factor: float = 1.0
confidence_factor: float = Field(1.0, description="The probability that the reasoning method is correct")

def key(self):
return (
Expand All @@ -154,13 +178,9 @@ def key(self):
*((*m.triple, *(e.key() for e in m.evidence)) for m in self.mappings),
)

@property
def confidence(self) -> Optional[float]:
confidences = [mapping.confidence for mapping in self.mappings]
nn_confidences = [c for c in confidences if c is not None]
if not nn_confidences:
return None
return self.confidence_factor * _joint_probability(nn_confidences)
def get_confidence(self) -> float:
confidences = [mapping.get_confidence() for mapping in self.mappings]
return _joint_probability([self.confidence_factor, *confidences])

@property
def mapping_set(self) -> None:
Expand All @@ -183,7 +203,7 @@ def explanation(self) -> str:
]


class Mapping(pydantic.BaseModel):
class Mapping(pydantic.BaseModel, ConfidenceMixin, KeyedMixin, prefix="semra.mapping"):
"""A semantic mapping."""

class Config:
Expand All @@ -202,30 +222,16 @@ def triple(self) -> Triple:
return self.s, self.p, self.o

@classmethod
def from_triple(cls, triple: Triple, evidence: Union[list[Evidence], None] = None) -> Mapping:
def from_triple(cls, triple: Triple, evidence: Optional[list[Evidence]] = None) -> Mapping:
"""Instantiate a mapping from a triple."""
s, p, o = triple
return cls(s=s, p=p, o=o, evidence=evidence or [])

@property
def confidence(self) -> Optional[float]:
def get_confidence(self) -> float:
"""Get the mapping's confidence by aggregating its evidences' confidences in a binomial model."""
if not self.evidence:
return None
confidences = [e.confidence for e in self.evidence]
nn_confidences = [c for c in confidences if c is not None]
if not nn_confidences:
return None
return _joint_probability(nn_confidences)

def hexdigest(self) -> str:
return _md5_hexdigest(self.triple)

def get_reference(self) -> Reference:
return Reference(prefix="semra.mapping", identifier=self.hexdigest())

@property
def curie(self) -> str:
return self.get_reference().curie
raise ValueError("can not calculate confidence since no evidence")
return _joint_probability(e.get_confidence() for e in self.evidence)

@property
def has_primary(self) -> bool:
Expand Down
32 changes: 31 additions & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from semra.api import (
BROAD_MATCH,
DB_XREF,
EXACT_MATCH,
NARROW_MATCH,
Index,
Expand All @@ -13,11 +14,12 @@
get_index,
get_many_to_many,
infer_chains,
infer_mutations,
infer_reversible,
keep_prefixes,
project,
)
from semra.rules import MANUAL_MAPPING
from semra.rules import KNOWLEDGE_MAPPING, MANUAL_MAPPING
from semra.struct import Mapping, MappingSet, ReasonedEvidence, Reference, SimpleEvidence, line, triple_key


Expand Down Expand Up @@ -295,3 +297,31 @@ def test_get_many_to_many(self):

m4 = Mapping(s=a3, p=EXACT_MATCH, o=b2)
self.assert_same_triples([m3, m4], get_many_to_many([m2, m3, m4]))


class TestUpgrades(unittest.TestCase):
"""Test inferring mutations."""

def test_infer_mutations(self):
"""Test inferring mutations."""
(a1,) = _get_references(1, prefix="a")
(b1,) = _get_references(1, prefix="b")
original_confidence = 0.95
mutation_confidence = 0.80
m1 = Mapping(s=a1, p=DB_XREF, o=b1, evidence=[SimpleEvidence(confidence=original_confidence, mapping_set=MS)])
new_mappings = infer_mutations(
[m1], {("a", "b"): mutation_confidence}, old=DB_XREF, new=EXACT_MATCH, progress=False
)
self.assertEqual(2, len(new_mappings))
new_m1, new_m2 = new_mappings
self.assertEqual(m1, new_m1)
self.assertEqual(a1, new_m2.s)
self.assertEqual(EXACT_MATCH, new_m2.p)
self.assertEqual(b1, new_m2.o)
self.assertEqual(1, len(new_m2.evidence))
new_evidence = new_m2.evidence[0]
self.assertIsInstance(new_evidence, ReasonedEvidence)
new_confidence = new_evidence.get_confidence()
self.assertIsNotNone(new_confidence)
self.assertEqual(1 - (1 - original_confidence) * (1 - mutation_confidence), new_confidence)
self.assertEqual(KNOWLEDGE_MAPPING, new_evidence.justification)

0 comments on commit a8091bb

Please sign in to comment.