Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions fhircraft/fhir/resources/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pydantic import BaseModel

from fhircraft.config import get_config
from fhircraft.utils import ensure_list, get_all_models_from_field, merge_dicts
from fhircraft.utils import ensure_list, get_all_models_from_field, is_dict_subset

if TYPE_CHECKING:
from fhircraft.fhir.resources.base import FHIRBaseModel, FHIRSliceModel
Expand Down Expand Up @@ -261,16 +261,15 @@ def validate_FHIR_element_pattern(
if isinstance(pattern, list):
pattern = pattern[0]
_element = element[0] if isinstance(element, list) else element
_element = (
_element.model_dump() if isinstance(_element, FHIRBaseModel) else _element
)
_pattern = pattern.model_dump() if isinstance(pattern, FHIRBaseModel) else pattern
try:
if isinstance(_element, FHIRBaseModel):
assert (
merge_dicts(_element.model_dump(), pattern.model_dump())
== _element.model_dump()
)
elif isinstance(_element, dict) and isinstance(pattern, dict):
assert merge_dicts(_element, pattern) == _element
if isinstance(_pattern, dict):
assert is_dict_subset(_pattern, _element)
else:
assert _element == pattern
assert _element == _pattern
except AssertionError:
error = f"Value does not fulfill pattern:\n{pattern.model_dump_json(indent=2) if isinstance(pattern, FHIRBaseModel) else pattern}"
if config.mode == "lenient":
Expand Down
70 changes: 21 additions & 49 deletions fhircraft/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,55 +357,27 @@ def get_fhir_model_from_field(field: FieldInfo) -> type[BaseModel] | None:
return next(get_all_models_from_field(field), None)


def merge_dicts(dict1: dict, dict2: dict) -> dict:
"""
Merge two dictionaries recursively, merging lists element by element and dictionaries at the same index.

If a key exists in both dictionaries, the values are merged based on their types. If a key exists only in one dictionary, it is added to the merged dictionary.

Args:
dict1 (dict): The first dictionary to merge.
dict2 (dict): The second dictionary to merge.

Returns:
dict: The merged dictionary.

Example:
>>> dict1 = {'a': 1, 'b': {'c': 2, 'd': [3, 4]}, 'e': [5, 6]}
>>> dict2 = {'b': {'c': 3, 'd': [4, 5]}, 'e': [6, 7], 'f': 8}
>>> merge_dicts(dict1, dict2)
{'a': 1, 'b': {'c': 3, 'd': [3, 4, 5]}, 'e': [5, 6, 7], 'f': 8}
"""

def merge_lists(list1, list2):
# Merge two lists element by element
merged_list = []
for idx in range(max(len(list1), len(list2))):
if idx < len(list1) and idx < len(list2):
if isinstance(list1[idx], dict) and isinstance(list2[idx], dict):
# Merge dictionaries at the same index
merged_list.append(merge_dicts(list1[idx], list2[idx]))
else:
# If they are not dictionaries, choose the element from the first list
merged_list.append(list1[idx])
elif idx < len(list1):
merged_list.append(list1[idx])
else:
merged_list.append(list2[idx])
return merged_list

merged_dict = dict1.copy()
for key, value in dict2.items():
if key in merged_dict:
if isinstance(merged_dict[key], list) and isinstance(value, list):
merged_dict[key] = merge_lists(merged_dict[key], value)
elif isinstance(merged_dict[key], dict) and isinstance(value, dict):
merged_dict[key] = merge_dicts(merged_dict[key], value)
else:
merged_dict[key] = value
else:
merged_dict[key] = value
return merged_dict
def is_dict_subset(subset: dict, superset: dict) -> bool:
"""Return True if all keys/values in subset are present in superset."""
for key, value in subset.items():
if key not in superset:
return False
sup_value = superset[key]
if isinstance(value, dict) and isinstance(sup_value, dict):
if not is_dict_subset(value, sup_value):
return False
elif isinstance(value, list) and isinstance(sup_value, list):
if len(value) > len(sup_value):
return False
for sub_item, sup_item in zip(value, sup_value):
if isinstance(sub_item, dict) and isinstance(sup_item, dict):
if not is_dict_subset(sub_item, sup_item):
return False
elif sub_item != sup_item:
return False
elif value != sup_value:
return False
return True


def get_FHIR_release_from_version(
Expand Down
1 change: 0 additions & 1 deletion test/test_fhir_path_engine_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ def test_todatetime_returns_empty_for_invalid_type():
def test_todatetime_converts_correctly_for_valid_type(value, expected):
collection = [FHIRPathCollectionItem(value=value)]
result = ToDateTime().evaluate(collection, env)
print(result[0].value, expected)
assert result == [FHIRPathCollectionItem.wrap(expected)]


Expand Down
1 change: 0 additions & 1 deletion test/test_fhir_resources_factory_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ def deep_index():
def test_get_subtree_includes_root_and_descendants(deep_index: DefinitionIndex):
subtree_index = deep_index.get_subtree("Observation.component")
ids = {n.id for n in subtree_index}
print(ids)
assert subtree_index.root() is not None
assert ids == {
"Component",
Expand Down
4 changes: 0 additions & 4 deletions test/test_fhir_resources_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,6 @@ def test_regression_issue_263(factory):
structure_definition=structure_definition, mode="differential"
)

print(CodeGenerator().generate_resource_model_code(model))

from typing import get_args
import pydantic
from fhircraft.fhir.resources.base import FHIRSliceModel
Expand All @@ -494,7 +492,6 @@ def test_regression_issue_263(factory):
# -----------------------------------------------------------------------
coding_annotation = code_model.model_fields["coding"].annotation
list_type = next(a for a in get_args(coding_annotation) if a is not type(None))
print(coding_annotation)
annotated_item = get_args(list_type)[0]
union_type = get_args(annotated_item)[0]
union_members = get_args(union_type)
Expand Down Expand Up @@ -981,7 +978,6 @@ def test_regression_issue_279(factory):
cs_model = next(a for a in get_args(cs_annotation) if a is not type(None))

assert cs_model.__name__ == "MyConditionClinicalStatus"
print(cs_model.__bases__)
assert issubclass(cs_model, CodeableConcept)
assert "extension" in cs_model.model_fields

Expand Down
Loading
Loading