Skip to content

Commit

Permalink
core: refactor medication tables to use modern denormalization
Browse files Browse the repository at this point in the history
core__medication:
- Template cleanup to use standard denorm tables rather than custom
  extraction code.
- If contained resources provide actual codes, we pull them out now
  (previously we only looked at the contained Concept.text)
- No longer pull Concept.text from contained Medications, until we
  decide on an approach for Concept.text across the board)

core__medicationrequest:
- No longer ignores rows without dosageInstruction.text

General CodeableConcept denormalization:
- Add userSelected boolean field to all denorm tables
- Allow the Python code to request extra fields to include along with
  the coding information (used for capturing contained.id fields)

Misc:
- Depend on cumulus-fhir-support 1.1 for its contained schema support
  • Loading branch information
mikix committed Apr 29, 2024
1 parent 83b8cda commit a4257a5
Show file tree
Hide file tree
Showing 27 changed files with 5,055 additions and 2,925 deletions.
22 changes: 12 additions & 10 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,6 @@ column_name = 'bar'
column_names = ['foo', 'bar']
conditions = ["1 > 0", "1 < 2"]
column_hierarchy = [('a', list),('b',dict)]
config =
{
"medication_datasources" : {
"by_contained_ref" : True,
"by_external_ref" : True
},
'has_userselected': False
}
count_ref = count_ref
count_table = count_table
dataset =
Expand All @@ -79,7 +71,11 @@ dataset =
db_type = duckdb
dependent_variable = is_flu
ext_systems = ["omb", "text"]
extra_alias_done = True
extra_field = ("field", "alias")
extra_fields = [("field", "alias")]
field = 'column_name'
field_alias = 'field'
filter_table = filter_table
filter_resource = True
fhir_extension = condition
Expand Down Expand Up @@ -158,7 +154,13 @@ schema =
'authoredOn': True,
'category': {
'code': True, 'system': True, 'display': False
},
},
'dosageInstruction': {
'text': True,
},
'medicationReference': {
'reference': True
},
'subject': {
'reference': True
},
Expand Down Expand Up @@ -199,7 +201,7 @@ schema =
'encounter': {
'reference': True
}
},
},
'patient': {
'id': True,
'gender': True,
Expand Down
158 changes: 46 additions & 112 deletions cumulus_library/studies/core/builder_medication.py
Original file line number Diff line number Diff line change
@@ -1,134 +1,68 @@
""" Module for generating core medication table"""

from cumulus_library import base_table_builder, base_utils, databases
from cumulus_library import base_table_builder, databases
from cumulus_library.studies.core.core_templates import core_templates
from cumulus_library.template_sql import base_templates, sql_utils
from cumulus_library.template_sql import sql_utils

expected_table_cols = {
"medicationrequest": {
"id": [],
"subject": sql_utils.REFERENCE,
"encounter": sql_utils.REFERENCE,
"medicationReference": sql_utils.REFERENCE,
}
}


class MedicationBuilder(base_table_builder.BaseTableBuilder):
display_text = "Creating Medication table..."

def _check_data_in_fields(self, cursor, parser, schema: str):
"""Validates whether either observed medication source is present
We opt to not use the core_templates.utils based version of
checking for data fields, since Medication can come in from
a few different sources - the format is unique to this FHIR
resource.
"""

data_types = {
"inline": False,
"by_contained_ref": False,
"by_external_ref": False,
}

table = "medicationrequest"
inline_col = "medicationcodeableconcept"
with base_utils.get_progress_bar(transient=True) as progress:
task = progress.add_task(
"Detecting available medication sources...",
total=3,
)

# inline medications from FHIR medication
data_types["inline"] = sql_utils.is_field_populated(
schema=schema,
source_table=table,
hierarchy=[(inline_col, dict), ("coding", list)],
cursor=cursor,
parser=parser,
)
if data_types["inline"]:
query = base_templates.get_column_datatype_query(
schema, table, [inline_col]
)
cursor.execute(query)
progress.advance(task)
if "userselected" not in str(cursor.fetchone()[0]):
has_userselected = False
else:
has_userselected = True
else:
has_userselected = False
progress.advance(task)
# Validating presence of FHIR medication requests
if not sql_utils.is_field_populated(
schema=schema,
source_table=table,
hierarchy=[("medicationreference", dict), ("reference", dict)],
expected=sql_utils.REFERENCE,
cursor=cursor,
parser=parser,
):
return data_types, has_userselected

# checking med ref contents for our two linkage cases
query = base_templates.get_is_table_not_empty_query(
"medicationrequest",
"medicationreference.reference",
conditions=["medicationreference.reference LIKE '#%'"],
)
cursor.execute(query)
progress.advance(task)
if cursor.fetchone() is not None:
data_types["by_contained_ref"] = True
query = base_templates.get_is_table_not_empty_query(
"medicationrequest",
"medicationreference.reference",
conditions=["medicationreference.reference LIKE 'Medication/%'"],
)
cursor.execute(query)
progress.advance(task)
if cursor.fetchone() is not None:
data_types["by_external_ref"] = True

return data_types, has_userselected

def prepare_queries(
self,
cursor: databases.DatabaseCursor,
schema: str,
parser: databases.DatabaseParser = None,
*args,
**kwargs,
) -> dict:
) -> None:
"""Constructs queries related to condition codeableConcept
:param cursor: A database cursor object
:param schema: the schema/db name, matching the cursor
:param parser: A database parser
"""
medication_datasources, has_userselected = self._check_data_in_fields(
cursor, parser, schema
code_sources = [
sql_utils.CodeableConceptConfig(
source_table="medication",
column_hierarchy=[("code", dict)],
target_table="core__medication_dn_code",
),
sql_utils.CodeableConceptConfig(
source_table="medicationrequest",
column_hierarchy=[("medicationCodeableConcept", dict)],
target_table="core__medicationrequest_dn_inline_code",
),
sql_utils.CodeableConceptConfig(
source_table="medicationrequest",
column_hierarchy=[("contained", list), ("code", dict)],
target_table="core__medicationrequest_dn_contained_code",
expected={
"code": sql_utils.CODEABLE_CONCEPT,
"id": {},
"resourceType": {},
},
extra_fields=[
("id", "contained_id"),
("resourceType", "resource_type"),
],
),
]
self.queries += sql_utils.denormalize_complex_objects(
schema, cursor, parser, code_sources
)
validated_schema = sql_utils.validate_schema(
cursor, schema, expected_table_cols, parser
)
if (
medication_datasources["inline"]
or medication_datasources["by_contained_ref"]
or medication_datasources["by_external_ref"]
):
self.queries.append(
core_templates.get_core_template(
"medication",
config={
"medication_datasources": medication_datasources,
"has_userselected": has_userselected,
},
)
)
else:
self.queries.append(
base_templates.get_ctas_empty_query(
schema,
"core__medication",
[
"id",
"encounter_ref",
"patient_ref",
"code",
"display",
"code_system",
"userselected",
],
)
)
self.queries += [
core_templates.get_core_template("medication", validated_schema),
]
6 changes: 0 additions & 6 deletions cumulus_library/studies/core/builder_medicationrequest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ def prepare_queries(
column_hierarchy=[("category", list)],
target_table="core__medicationrequest_dn_category",
),
sql_utils.CodeableConceptConfig(
source_table="medicationrequest",
source_id="id",
column_hierarchy=[("medicationcodeableconcept", dict)],
target_table="core__medicationrequest_dn_medication",
),
]
self.queries += sql_utils.denormalize_complex_objects(
schema, cursor, parser, code_sources
Expand Down
Loading

0 comments on commit a4257a5

Please sign in to comment.