Skip to content

Commit

Permalink
move filters to separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
mochic committed Sep 20, 2024
1 parent a567769 commit db3c5e1
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 300 deletions.
174 changes: 5 additions & 169 deletions src/aind_slims_api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@

import base64
import logging
from copy import deepcopy
from functools import lru_cache
from typing import Any, Optional, Type, TypeVar, get_type_hints
from typing import Any, Optional, Type, TypeVar

from pydantic import ValidationError
from requests import Response
from slims.criteria import Criterion, Expression, Junction, conjunction, equals
from slims.criteria import Criterion, conjunction, equals
from slims.internal import Record as SlimsRecord
from slims.slims import Slims, _SlimsApiException

from aind_slims_api import config
from aind_slims_api.exceptions import SlimsRecordNotFound
from aind_slims_api.filters import resolve_filter_args
from aind_slims_api.models import SlimsAttachment
from aind_slims_api.models.base import SlimsBaseModel
from aind_slims_api.types import SLIMS_TABLES
Expand Down Expand Up @@ -104,35 +104,6 @@ def fetch(

return records

@staticmethod
def resolve_model_alias(
model: Type[SlimsBaseModelTypeVar],
attr_name: str,
) -> str:
"""Given a SlimsBaseModel object, resolve its pk to the actual value
Notes
-----
- Raises ValueError if the alias cannot be resolved
- Resolves the validation alias for a given field name
- If prefixed with `-` will return the validation alias prefixed with
`-`
"""
has_prefix = attr_name.startswith("-")
_attr_name = attr_name.lstrip("-")
for field_name, field_info in model.model_fields.items():
if (
field_name == _attr_name
and field_info.validation_alias
and isinstance(field_info.validation_alias, str)
):
alias = field_info.validation_alias
if has_prefix:
return f"-{alias}"
return alias
else:
raise ValueError(f"Cannot resolve alias for {attr_name} on {model}")

@staticmethod
def _validate_models(
model_type: Type[SlimsBaseModelTypeVar], records: list[SlimsRecord]
Expand All @@ -147,141 +118,6 @@ def _validate_models(
logger.error(f"SLIMS data validation failed, {repr(e)}")
return validated

@staticmethod
def _resolve_criteria(
model_type: Type[SlimsBaseModelTypeVar], criteria: Criterion
) -> Criterion:
"""Resolves criterion field name to serialization alias in a criterion."""
if isinstance(criteria, Junction):
criteria.members = [
SlimsClient._resolve_criteria(model_type, sub_criteria)
for sub_criteria in criteria.members
]
return criteria
elif isinstance(criteria, Expression):
if criteria.criterion["fieldName"] == "isNaFilter":
criteria.criterion["value"] = SlimsClient.resolve_model_alias(
model_type,
criteria.criterion["value"],
)
else:
criteria.criterion["fieldName"] = SlimsClient.resolve_model_alias(
model_type,
criteria.criterion["fieldName"],
)
return criteria
else:
raise ValueError(f"Invalid criterion type: {type(criteria)}")

@staticmethod
def _validate_field_name(
model_type: Type[SlimsBaseModelTypeVar],
field_name: str,
) -> None:
"""Check if field_name is a field on a model. Raises a ValueError if it
is not.
"""
field_type_map = get_type_hints(model_type)
if field_name not in field_type_map:
raise ValueError(f"{field_name} is not a field on {model_type}.")

@staticmethod
def _validate_field_value(
model_type: Type[SlimsBaseModelTypeVar],
field_name: str,
field_value: Any,
) -> None:
"""Check if field_value is a compatible with
the type associated with that field. Raises a ValueError if it is not.
"""
field_type_map = get_type_hints(model_type)
field_type = field_type_map[field_name]
if not isinstance(field_value, field_type):
raise ValueError(
f"{field_value} is incompatible with {field_type}"
f" for field {field_name}"
)

@staticmethod
def _validate_criteria(
model_type: Type[SlimsBaseModelTypeVar], criteria: Criterion
) -> None:
"""Validates that the types used in a criterion are compatible with the
types on the model. Raises a ValueError if they are not.
"""
if isinstance(criteria, Junction):
for sub_criteria in criteria.members:
SlimsClient._validate_criteria(model_type, sub_criteria)
elif isinstance(criteria, Expression):
if criteria.criterion["fieldName"] == "isNaFilter":
SlimsClient._validate_field_name(
model_type,
criteria.criterion["value"],
)
elif criteria.criterion["operator"] in ["inSet", "notInSet"]:
for value in criteria.criterion["value"]:
SlimsClient._validate_field_name(
model_type,
criteria.criterion["fieldName"],
)
SlimsClient._validate_field_value(
model_type,
criteria.criterion["fieldName"],
value,
)
elif criteria.criterion["operator"] == "betweenInclusive":
SlimsClient._validate_field_name(
model_type,
criteria.criterion["fieldName"],
)
SlimsClient._validate_field_value(
model_type,
criteria.criterion["fieldName"],
criteria.criterion["start"],
)
SlimsClient._validate_field_value(
model_type,
criteria.criterion["fieldName"],
criteria.criterion["end"],
)
else:
SlimsClient._validate_field_name(
model_type,
criteria.criterion["fieldName"],
)
SlimsClient._validate_field_value(
model_type,
criteria.criterion["fieldName"],
criteria.criterion["value"],
)
else:
raise ValueError(f"Invalid criterion type: {type(criteria)}")

@staticmethod
def _resolve_filter_args(
model: Type[SlimsBaseModelTypeVar],
*args: Criterion,
sort: list[str] = [],
start: Optional[int] = None,
end: Optional[int] = None,
**kwargs,
) -> tuple[list[Criterion], list[str], Optional[int], Optional[int]]:
"""Validates filter arguments and resolves field names to SLIMS API
column names.
"""
criteria = deepcopy(list(args))
criteria.extend(map(lambda item: equals(item[0], item[1]), kwargs.items()))
resolved_criteria: list[Criterion] = []
for criterion in criteria:
SlimsClient._validate_criteria(model, criterion)
resolved_criteria.append(SlimsClient._resolve_criteria(model, criterion))
resolved_sort = [
SlimsClient.resolve_model_alias(model, sort_key) for sort_key in sort
]
if start is not None and end is None or end is not None and start is None:
raise ValueError("Must provide both start and end or neither for fetch.")
return resolved_criteria, resolved_sort, start, end

def fetch_models(
self,
model: Type[SlimsBaseModelTypeVar],
Expand All @@ -307,7 +143,7 @@ def fetch_models(
if isinstance(sort, str):
sort = [sort]

criteria, resolved_sort, start, end = self._resolve_filter_args(
criteria, resolved_sort, start, end = resolve_filter_args(
model,
*args,
sort=sort,
Expand Down Expand Up @@ -391,7 +227,7 @@ def fetch_attachments(
if isinstance(sort, str):
sort = [sort]

criteria, sort, start, end = self._resolve_filter_args(
criteria, sort, start, end = resolve_filter_args(
SlimsAttachment,
*args,
sort=sort,
Expand Down
74 changes: 33 additions & 41 deletions src/aind_slims_api/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Optional, Type, TypeVar, get_type_hints

from slims.criteria import Criterion, Expression, Junction, equals

from aind_slims_api.models.base import SlimsBaseModel

logger = logging.getLogger(__name__)
Expand All @@ -17,7 +18,8 @@ def resolve_model_alias(
model: Type[SlimsBaseModelTypeVar],
attr_name: str,
) -> str:
"""Given a SlimsBaseModel object, resolve its pk to the actual value
"""Given a SlimsBaseModel object, resolve an alias name to the actual slims
column name.
Notes
-----
Expand All @@ -42,19 +44,22 @@ def resolve_model_alias(
raise ValueError(f"Cannot resolve alias for {attr_name} on {model}")


def _resolve_criteria(
CriteriaType = TypeVar("CriteriaType", Junction, Expression)


def resolve_criteria(
model_type: Type[SlimsBaseModelTypeVar],
criteria: Criterion,
) -> Criterion:
criteria: Type[CriteriaType],
) -> Type[CriteriaType]:
"""Resolves criterion field name to serialization alias in a criterion."""

if isinstance(criteria, Junction):
criteria.members = [
_resolve_criteria(model_type, sub_criteria)
resolve_criteria(model_type, sub_criteria)
for sub_criteria in criteria.members
]
return criteria
elif isinstance(criteria, Expression):
else: # Expression
if criteria.criterion["fieldName"] == "isNaFilter":
criteria.criterion["value"] = resolve_model_alias(
model_type,
Expand All @@ -66,29 +71,14 @@ def _resolve_criteria(
criteria.criterion["fieldName"],
)
return criteria
else:
raise ValueError(f"Invalid criterion type: {type(criteria)}")


def validate_criterion(
model_type: Type[SlimsBaseModelTypeVar],
field_name: str,
) -> None:
"""Check if field_name is a field on a model. Raises a ValueError if it
is not.
"""
field_type_map = get_type_hints(model_type)
if field_name not in field_type_map:
raise ValueError(f"{field_name} is not a field on {model_type}.")



def _validate_field_name(
model_type: Type[SlimsBaseModelTypeVar],
field_name: str,
) -> None:
"""Check if field_name is a field on a model. Raises a ValueError if it
is not.
"""Check if field_name is a field on a model. Raises an Attribute error if
it is not.
"""
field_type_map = get_type_hints(model_type)
if field_name not in field_type_map:
Expand All @@ -100,8 +90,8 @@ def _validate_field_value(
field_name: str,
field_value: Any,
) -> None:
"""Check if field_value is a compatible with
the type associated with that field. Raises a ValueError if it is not.
"""Check if field_value is a compatible with the type associated with that
field. Raises a ValueError if it is not.
"""
field_type_map = get_type_hints(model_type)
field_type = field_type_map[field_name]
Expand All @@ -112,11 +102,18 @@ def _validate_field_value(
)


def _validate_criteria(
model_type: Type[SlimsBaseModelTypeVar], criteria: Criterion
def validate_criteria(
model_type: Type[SlimsBaseModelTypeVar], criteria: Type[CriteriaType]
) -> None:
"""Validates that the types used in a criterion are compatible with the
types on the model. Raises a ValueError if they are not.
"""Check if field_name is a field on a model. Raises a ValueError if it
is not.
Raises
------
AttributeError
If the field_name is not a field on the model_type.
ValueError
If the field_value is not compatible with the field type.
Notes
-----
Expand All @@ -125,8 +122,8 @@ def _validate_criteria(
"""
if isinstance(criteria, Junction):
for sub_criteria in criteria.members:
_validate_criteria(model_type, sub_criteria)
elif isinstance(criteria, Expression):
validate_criteria(model_type, sub_criteria)
else: # Expression
if criteria.criterion["fieldName"] == "isNaFilter":
_validate_field_name(
model_type,
Expand Down Expand Up @@ -168,8 +165,6 @@ def _validate_criteria(
criteria.criterion["fieldName"],
criteria.criterion["value"],
)
else:
raise ValueError(f"Invalid criterion type: {type(criteria)}")


def resolve_filter_args(
Expand All @@ -180,19 +175,16 @@ def resolve_filter_args(
end: Optional[int] = None,
**kwargs,
) -> tuple[list[Criterion], list[str], Optional[int], Optional[int]]:
"""Validates filter arguments and resolves field names to SLIMS API
"""Validates filter arguments and resolves field name aliases to SLIMS API
column names.
"""
criteria = deepcopy(list(args))
criteria.extend(map(lambda item: equals(item[0], item[1]), kwargs.items()))
resolved_criteria: list[Criterion] = []
for criterion in criteria:
_validate_criteria(model, criterion)
resolved_criteria.append(_resolve_criteria(model, criterion))
resolved_sort = [
resolve_model_alias(model, sort_key) for sort_key in sort
]
validate_criteria(model, criterion)
resolved_criteria.append(resolve_criteria(model, criterion))
resolved_sort = [resolve_model_alias(model, sort_key) for sort_key in sort]
if start is not None and end is None or end is not None and start is None:
raise ValueError(
"Must provide both start and end or neither for fetch.")
raise ValueError("Must provide both start and end or neither for fetch.")
return resolved_criteria, resolved_sort, start, end
Loading

0 comments on commit db3c5e1

Please sign in to comment.