Skip to content

Commit

Permalink
Added member oid extension for each observation
Browse files Browse the repository at this point in the history
  • Loading branch information
kant committed Dec 3, 2024
1 parent b754368 commit 50c590b
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 22 deletions.
46 changes: 43 additions & 3 deletions prime-router/src/main/kotlin/azure/ConditionMapper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ import gov.cdc.prime.router.Metadata
import gov.cdc.prime.router.fhirengine.utils.getCodeSourcesMap
import gov.cdc.prime.router.metadata.ObservationMappingConstants
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.Extension
import org.hl7.fhir.r4.model.Observation
import org.hl7.fhir.r4.model.StringType

interface IConditionMapper {
/**
* Attempt to find diagnostic conditions for a series of test [codings]
* @return a map associating test [codings] to their diagnostic conditions as Coding's
*/
fun lookupConditions(codings: List<Coding>): Map<Coding, List<Coding>>

/**
* Lookup test code to Member OID mappings for the given [codings].
* @return a map associating test codes to their Member OIDs
*/
fun lookupMemberOid(codings: List<Coding>): Map<String, String>
}

class LookupTableConditionMapper(metadata: Metadata) : IConditionMapper {
Expand All @@ -34,16 +42,32 @@ class LookupTableConditionMapper(metadata: Metadata) : IConditionMapper {
acc
}
}

override fun lookupMemberOid(codings: List<Coding>): Map<String, String> {
return mappingTable.FilterBuilder()
.isIn(ObservationMappingConstants.TEST_CODE_KEY, codings.map { it.code })
.filter().caseSensitiveDataRowsMap.fold(mutableMapOf()) { acc, condition ->
val testCode = condition[ObservationMappingConstants.TEST_CODE_KEY] ?: ""
val memberOid = condition[ObservationMappingConstants.TEST_OID_KEY] ?: ""
if (testCode.isNotEmpty() && memberOid.isNotEmpty()) {
acc[testCode] = memberOid
}
acc
}
}
}

class ConditionStamper(private val conditionMapper: IConditionMapper) {
companion object {
const val conditionCodeExtensionURL = "https://reportstream.cdc.gov/fhir/StructureDefinition/condition-code"
const val CONDITION_CODE_EXTENSION_URL = "https://reportstream.cdc.gov/fhir/StructureDefinition/condition-code"
const val MEMBER_OID_EXTENSION_URL =
"https://reportstream.cdc.gov/fhir/StructureDefinition/test-performed-member-oid"

const val BUNDLE_CODE_IDENTIFIER = "observation.code.coding.code"
const val BUNDLE_VALUE_IDENTIFIER = "observation.valueCodeableConcept.coding.code"
const val MAPPING_CODES_IDENTIFIER = "observation.{code|valueCodeableConcept}.coding.code"
}

data class ObservationMappingFailure(val source: String, val failures: List<Coding>)

data class ObservationStampingResult(
Expand All @@ -52,31 +76,47 @@ class ConditionStamper(private val conditionMapper: IConditionMapper) {
)

/**
* Lookup condition codes for an [observation] and add them as custom extensions
* Lookup condition codes and member OIDs for an [observation] and add them as custom extensions
* @param observation the observation that will be stamped
* @return a [ObservationStampingResult] including stamping success and any mapping failures
*/
fun stampObservation(observation: Observation): ObservationStampingResult {
val codeSourcesMap = observation.getCodeSourcesMap().filterValues { it.isNotEmpty() }
if (codeSourcesMap.values.flatten().isEmpty()) return ObservationStampingResult(false)

// Lookup conditions and Member OIDs
val conditionsToCode = conditionMapper.lookupConditions(codeSourcesMap.values.flatten())
val memberOidMap = conditionMapper.lookupMemberOid(codeSourcesMap.values.flatten())

var mappedSomething = false

// Process condition mappings
val failures = codeSourcesMap.mapNotNull { codes ->
val unnmapped = codes.value.mapNotNull { code ->
val conditions = conditionsToCode.getOrDefault(code, emptyList())
if (conditions.isEmpty()) {
code
} else {
conditions.forEach { code.addExtension(conditionCodeExtensionURL, it) }
conditions.forEach { code.addExtension(CONDITION_CODE_EXTENSION_URL, it) }
mappedSomething = true
null
}
}
if (unnmapped.isEmpty()) null else ObservationMappingFailure(codes.key, unnmapped)
}

// Add the Member OID extension to the observation, based on the lookup
observation.code.coding.forEach { coding ->
val testCode = coding.code
val memberOid = memberOidMap[testCode]
if (memberOid != null) {
val memberOidExtension = Extension(MEMBER_OID_EXTENSION_URL)
memberOidExtension.setValue(StringType(memberOid))
observation.addExtension(memberOidExtension)
mappedSomething = true
}
}

return ObservationStampingResult(mappedSomething, failures)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package gov.cdc.prime.router.azure.observability.event

import gov.cdc.prime.router.azure.ConditionStamper.Companion.conditionCodeExtensionURL
import gov.cdc.prime.router.azure.ConditionStamper.Companion.CONDITION_CODE_EXTENSION_URL
import org.hl7.fhir.r4.model.Coding

data class TestSummary(
Expand All @@ -17,7 +17,7 @@ data class TestSummary(
*/
fun fromCoding(coding: Coding): TestSummary {
val conditions = coding.extension
.filter { it.url == conditionCodeExtensionURL }
.filter { it.url == CONDITION_CODE_EXTENSION_URL }
.map { it.castToCoding(it.value) }
.map(CodeSummary::fromCoding)
return TestSummary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import gov.cdc.prime.router.ReportStreamConditionFilter
import gov.cdc.prime.router.ReportStreamFilter
import gov.cdc.prime.router.azure.ConditionStamper.Companion.BUNDLE_CODE_IDENTIFIER
import gov.cdc.prime.router.azure.ConditionStamper.Companion.BUNDLE_VALUE_IDENTIFIER
import gov.cdc.prime.router.azure.ConditionStamper.Companion.conditionCodeExtensionURL
import gov.cdc.prime.router.azure.ConditionStamper.Companion.CONDITION_CODE_EXTENSION_URL
import gov.cdc.prime.router.codes
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
Expand Down Expand Up @@ -63,7 +63,7 @@ fun Observation.getMappedConditionExtensions(): List<Extension> {
return this.getCodeSourcesMap()
.flatMap { it.value }
.flatMap { it.extension }
.filter { it.url == conditionCodeExtensionURL }
.filter { it.url == CONDITION_CODE_EXTENSION_URL }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gov.cdc.prime.router.azure.observability.bundleDigest
import assertk.assertThat
import assertk.assertions.isDataClassEqualTo
import fhirengine.engine.CustomFhirPathFunctions
import gov.cdc.prime.router.azure.ConditionStamper.Companion.conditionCodeExtensionURL
import gov.cdc.prime.router.azure.ConditionStamper.Companion.CONDITION_CODE_EXTENSION_URL
import gov.cdc.prime.router.azure.observability.event.CodeSummary
import gov.cdc.prime.router.azure.observability.event.ObservationSummary
import gov.cdc.prime.router.azure.observability.event.TestSummary
Expand Down Expand Up @@ -152,7 +152,7 @@ class FhirPathBundleDigestExtractorStrategyTests {
val observation = Observation()
val coding = Coding()
val extension = Extension()
extension.url = conditionCodeExtensionURL
extension.url = CONDITION_CODE_EXTENSION_URL
extension.setValue(Coding())
coding.extension = listOf(extension)
observation.code.coding = listOf(coding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import gov.cdc.prime.router.SettingsProvider
import gov.cdc.prime.router.Topic
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.ConditionStamper
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.LookupTableConditionMapper
import gov.cdc.prime.router.azure.SubmissionTableService
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.Action
Expand Down Expand Up @@ -60,9 +62,12 @@ import io.mockk.verify
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.Observation
import org.hl7.fhir.r4.model.StringType
import org.jooq.tools.jdbc.MockConnection
import org.jooq.tools.jdbc.MockDataProvider
import org.jooq.tools.jdbc.MockResult
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Nested
Expand Down Expand Up @@ -359,7 +364,7 @@ class FhirConverterTests {
ObservationMappingConstants.TEST_CODE_KEY,
ObservationMappingConstants.CONDITION_CODE_KEY,
ObservationMappingConstants.CONDITION_CODE_SYSTEM_KEY,
ObservationMappingConstants.CONDITION_NAME_KEY
ObservationMappingConstants.CONDITION_NAME_KEY,
),
listOf(
"80382-5",
Expand Down Expand Up @@ -442,6 +447,67 @@ class FhirConverterTests {
}
}

@Test
fun `test condition code and member OID stamping`() {
@Suppress("ktlint:standard:max-line-length")
val fhirRecord =
"""{"resourceType":"Bundle","id":"1667861767830636000.7db38d22-b713-49fc-abfa-2edba9c12347","meta":{"lastUpdated":"2022-11-07T22:56:07.832+00:00"},"identifier":{"value":"1234d1d1-95fe-462c-8ac6-46728dba581c"},"type":"message","timestamp":"2021-08-03T13:15:11.015+00:00","entry":[{"fullUrl":"Observation/d683b42a-bf50-45e8-9fce-6c0531994f09","resource":{"resourceType":"Observation","id":"d683b42a-bf50-45e8-9fce-6c0531994f09","status":"final","code":{"coding":[{"system":"http://loinc.org","code":"80382-5"}],"text":"Flu A"},"subject":{"reference":"Patient/9473889b-b2b9-45ac-a8d8-191f27132912"},"performer":[{"reference":"Organization/1a0139b9-fc23-450b-9b6c-cd081e5cea9d"}],"valueCodeableConcept":{"coding":[{"system":"http://snomed.info/sct","code":"260373001","display":"Detected"}]},"interpretation":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v2-0078","code":"A","display":"Abnormal"}]}],"method":{"extension":[{"url":"https://reportstream.cdc.gov/fhir/StructureDefinition/testkit-name-id","valueCoding":{"code":"BD Veritor System for Rapid Detection of SARS-CoV-2 & Flu A+B_Becton, Dickinson and Company (BD)"}},{"url":"https://reportstream.cdc.gov/fhir/StructureDefinition/equipment-uid","valueCoding":{"code":"BD Veritor System for Rapid Detection of SARS-CoV-2 & Flu A+B_Becton, Dickinson and Company (BD)"}}],"coding":[{"display":"BD Veritor System for Rapid Detection of SARS-CoV-2 & Flu A+B*"}]},"specimen":{"reference":"Specimen/52a582e4-d389-42d0-b738-bee51cf5244d"},"device":{"reference":"Device/78dc4d98-2958-43a3-a445-76ceef8c0698"}}}]}"""

val memberOidExtensionURL = "https://reportstream.cdc.gov/fhir/StructureDefinition/test-performed-member-oid"

metadata.lookupTableStore += mapOf(
"observation-mapping" to LookupTable(
"observation-mapping",
listOf(
listOf(
ObservationMappingConstants.TEST_CODE_KEY,
ObservationMappingConstants.CONDITION_CODE_KEY,
ObservationMappingConstants.CONDITION_CODE_SYSTEM_KEY,
ObservationMappingConstants.CONDITION_NAME_KEY,
ObservationMappingConstants.TEST_OID_KEY
),
listOf(
"80382-5",
"6142004",
"SNOMEDCT",
"Influenza (disorder)",
"OID12345"
),
listOf(
"260373001",
"Some Condition Code",
"Condition Code System",
"Condition Name",
"OID67890"
)
)
)
)

val bundle = FhirContext.forR4().newJsonParser().parseResource(Bundle::class.java, fhirRecord)

bundle.entry.filter { it.resource is Observation }.forEach {
val observation = (it.resource as Observation)

// Add Condition and Member OID extensions
ConditionStamper(LookupTableConditionMapper(metadata)).stampObservation(observation)

// Assert condition extensions
val conditionExtension = observation.code.coding[0].extension.find {
it.url == "https://reportstream.cdc.gov/fhir/StructureDefinition/condition-code"
}
assertNotNull(conditionExtension)
assertEquals("6142004", (conditionExtension!!.value as Coding).code)

// Assert Member OID extension
val memberOidExtension = observation.extension.find {
it.url == memberOidExtensionURL
}
assertNotNull(memberOidExtension)
assertEquals("OID12345", (memberOidExtension!!.value as StringType).value)
}
}

@Test
fun `test fully unmapped condition code stamping logs errors`() {
val fhirData = File(BATCH_VALID_DATA_URL).readText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import gov.cdc.prime.router.TestSource
import gov.cdc.prime.router.Topic
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.ConditionStamper.Companion.conditionCodeExtensionURL
import gov.cdc.prime.router.azure.ConditionStamper.Companion.CONDITION_CODE_EXTENSION_URL
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
Expand Down Expand Up @@ -508,7 +508,7 @@ class FhirReceiverFilterTests {
val coding = it.code.coding.first()
if (coding.extension.isEmpty()) {
coding.addExtension(
conditionCodeExtensionURL,
CONDITION_CODE_EXTENSION_URL,
Coding(
"system", "AOE", "name"
)
Expand Down Expand Up @@ -569,11 +569,11 @@ class FhirReceiverFilterTests {
bundle.entry.filter { it.resource is Observation }.forEach {
val observation = (it.resource as Observation)
observation.code.coding[0].addExtension(
conditionCodeExtensionURL,
CONDITION_CODE_EXTENSION_URL,
Coding("SNOMEDCT", "6142004", "Influenza (disorder)")
)
observation.valueCodeableConcept.coding[0].addExtension(
conditionCodeExtensionURL,
CONDITION_CODE_EXTENSION_URL,
Coding("Condition Code System", "foobar", "Condition Name")
)
}
Expand Down Expand Up @@ -681,11 +681,11 @@ class FhirReceiverFilterTests {
val bundle = FhirContext.forR4().newJsonParser().parseResource(Bundle::class.java, fhirRecord)
bundle.getObservations().forEach { observation ->
observation.code.coding[0].addExtension(
conditionCodeExtensionURL,
CONDITION_CODE_EXTENSION_URL,
Coding("SNOMEDCT", "6142004", "Influenza (disorder)")
)
observation.valueCodeableConcept.coding[0].addExtension(
conditionCodeExtensionURL,
CONDITION_CODE_EXTENSION_URL,
Coding("Condition Code System", "Some Condition Code", "Condition Name")
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import gov.cdc.prime.router.Schema
import gov.cdc.prime.router.Topic
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.ConditionStamper
import gov.cdc.prime.router.azure.ConditionStamper.Companion.conditionCodeExtensionURL
import gov.cdc.prime.router.azure.ConditionStamper.Companion.CONDITION_CODE_EXTENSION_URL
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.LookupTableConditionMapper
import gov.cdc.prime.router.azure.QueueAccess
Expand Down Expand Up @@ -497,7 +497,7 @@ class FHIRBundleHelpersTests {
val fhirRecord = File(VALID_ROUTING_DATA_URL).readText()
val bundle = FhirContext.forR4().newJsonParser().parseResource(Bundle::class.java, fhirRecord)
bundle.getObservations()[0].code.coding[0].addExtension(
conditionCodeExtensionURL, Coding("SOMESYSTEM", "840539006", "SOMECONDITION")
CONDITION_CODE_EXTENSION_URL, Coding("SOMESYSTEM", "840539006", "SOMECONDITION")
)

val filteredBundle = bundle.filterMappedObservations(
Expand Down Expand Up @@ -800,7 +800,7 @@ class FHIRBundleHelpersTests {
assertThat(failure.failures.first().code).isEqualTo("some-unmapped-code")

val extension = code.coding.first().extension.first()
assertThat(extension.url).isEqualTo(conditionCodeExtensionURL)
assertThat(extension.url).isEqualTo(CONDITION_CODE_EXTENSION_URL)
assertThat((extension.value as? Coding)?.code).isEqualTo("6142004")
}

Expand Down Expand Up @@ -853,7 +853,7 @@ class FHIRBundleHelpersTests {
val extensions = entry.getMappedConditionExtensions()
assertThat(extensions)
.extracting { it.url }
.each { it.isEqualTo(conditionCodeExtensionURL) }
.each { it.isEqualTo(CONDITION_CODE_EXTENSION_URL) }
}

@Test
Expand Down Expand Up @@ -899,7 +899,7 @@ class FHIRBundleHelpersTests {
assertThat(result.failures).isEmpty()

val extension = code.coding.first().extension.first()
assertThat(extension.url).isEqualTo(conditionCodeExtensionURL)
assertThat(extension.url).isEqualTo(CONDITION_CODE_EXTENSION_URL)
assertThat(extension.value)
.isInstanceOf<Coding>()
.transform { it.code }
Expand Down Expand Up @@ -947,7 +947,7 @@ class FHIRBundleHelpersTests {
assertThat(result.failures).isEmpty()

val extension = code.coding.first().extension.first()
assertThat(extension.url).isEqualTo(conditionCodeExtensionURL)
assertThat(extension.url).isEqualTo(CONDITION_CODE_EXTENSION_URL)
assertThat(extension.value)
.isInstanceOf<Coding>()
.transform { it.code }
Expand Down

0 comments on commit 50c590b

Please sign in to comment.