Skip to content

Commit

Permalink
Merge pull request #16710 from CDCgov/platform/bill/16143
Browse files Browse the repository at this point in the history
[16143] Implement Convert Step Updates
  • Loading branch information
wcutshall authored Dec 13, 2024
2 parents 6cb62ec + 5ccea0d commit 608556b
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 50 deletions.
51 changes: 31 additions & 20 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.common.BaseEngine
import gov.cdc.prime.router.fhirengine.translation.HL7toFhirTranslator
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirTransformer
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
Expand All @@ -52,6 +53,8 @@ import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.fhirengine.utils.HL7Reader.Companion.parseHL7Message
import gov.cdc.prime.router.fhirengine.utils.getObservations
import gov.cdc.prime.router.fhirengine.utils.getRSMessageType
import gov.cdc.prime.router.fhirengine.utils.isElr
import gov.cdc.prime.router.logging.LogMeasuredTime
import gov.cdc.prime.router.report.ReportService
import gov.cdc.prime.router.validation.IItemValidator
Expand Down Expand Up @@ -261,7 +264,7 @@ class FHIRConverter(
// TODO: https://github.com/CDCgov/prime-reportstream/issues/14287
FhirPathUtils

val processedItems = process(format, input.blobURL, input.blobDigest, input.topic, actionLogger)
val processedItems = process(format, input, actionLogger)

// processedItems can be empty in three scenarios:
// - the blob had no contents, i.e. an empty file was submitted
Expand Down Expand Up @@ -339,6 +342,12 @@ class FHIRConverter(
nextAction = TaskAction.destination_filter
)

logger.info(
"Applied transform - parentReportId=[${input.reportId}]" +
", childReportId=[${report.id}], schemaName=[${input.schemaName}]" +
", trackingId=[${processedItem.getTrackingId()}]"
)

// create route event
val routeEvent = ProcessEvent(
Event.EventAction.DESTINATION_FILTER,
Expand Down Expand Up @@ -385,7 +394,8 @@ class FHIRConverter(
mapOf(
ReportStreamEventProperties.BUNDLE_DIGEST
to bundleDigestExtractor.generateDigest(processedItem.bundle!!),
ReportStreamEventProperties.ITEM_FORMAT to format
ReportStreamEventProperties.ITEM_FORMAT to format,
ReportStreamEventProperties.ENRICHMENTS to input.schemaName
)
)
}
Expand Down Expand Up @@ -453,14 +463,12 @@ class FHIRConverter(
*/
internal fun process(
format: MimeFormat,
blobURL: String,
blobDigest: String,
topic: Topic,
input: FHIRConvertInput,
actionLogger: ActionLogger,
routeReportWithInvalidItems: Boolean = true,
): List<IProcessedItem<*>> {
val validator = topic.validator
val rawReport = BlobAccess.downloadBlob(blobURL, blobDigest)
val validator = input.topic.validator
val rawReport = BlobAccess.downloadBlob(input.blobURL, input.blobDigest)
return if (rawReport.isBlank()) {
actionLogger.error(InvalidReportMessage("Provided raw data is empty."))
emptyList()
Expand All @@ -474,7 +482,7 @@ class FHIRConverter(
"format" to format.name
)
) {
getBundlesFromRawHL7(rawReport, validator, topic.hl7ParseConfiguration)
getBundlesFromRawHL7(rawReport, validator, input.topic.hl7ParseConfiguration)
}
} catch (ex: ParseFailureError) {
actionLogger.error(
Expand Down Expand Up @@ -511,21 +519,24 @@ class FHIRConverter(
}
// 'stamp' observations with their condition code
if (item.bundle != null) {
val isElr = item.bundle!!.getRSMessageType() == RSMessageType.LAB_RESULT
item.bundle!!.getObservations().forEach { observation ->
val result = stamper.stampObservation(observation)
if (!result.success) {
val logger = actionLogger.getItemLogger(item.index + 1, observation.id)
if (result.failures.isEmpty()) {
logger.warn(UnmappableConditionMessage())
} else {
logger.warn(
result.failures.map {
UnmappableConditionMessage(
it.failures.map { it.code },
it.source
if (isElr) {
val result = stamper.stampObservation(observation)
if (!result.success) {
val logger = actionLogger.getItemLogger(item.index + 1, observation.id)
if (result.failures.isEmpty()) {
logger.warn(UnmappableConditionMessage())
} else {
logger.warn(
result.failures.map {
UnmappableConditionMessage(
it.failures.map { it.code },
it.source
)
}
)
}
)
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions prime-router/src/main/kotlin/fhirengine/engine/RSMessageType.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package gov.cdc.prime.router.fhirengine.engine

/**
* This class represents a way to group message types from an RS perspective. As we add additional logical
* groupings, FHIRBundleHelpers.getRSMessageType will need to be updated.
*
*/
enum class RSMessageType {
LAB_RESULT,
UNKNOWN,
}
32 changes: 32 additions & 0 deletions prime-router/src/main/kotlin/fhirengine/utils/FHIRBundleHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import gov.cdc.prime.router.azure.ConditionStamper.Companion.BUNDLE_CODE_IDENTIF
import gov.cdc.prime.router.azure.ConditionStamper.Companion.BUNDLE_VALUE_IDENTIFIER
import gov.cdc.prime.router.azure.ConditionStamper.Companion.conditionCodeExtensionURL
import gov.cdc.prime.router.codes
import gov.cdc.prime.router.fhirengine.engine.RSMessageType
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
import gov.cdc.prime.router.fhirengine.utils.FHIRBundleHelpers.Companion.getChildProperties
import io.github.linuxforhealth.hl7.data.Hl7RelatedGeneralUtils
import org.hl7.fhir.r4.model.Base
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.CodeType
import org.hl7.fhir.r4.model.CodeableConcept
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.DateTimeType
Expand Down Expand Up @@ -116,6 +118,36 @@ fun Bundle.addProvenanceReference() {
}
}

/**
* Return true if Bundle contains an ELR in the MessageHeader.
*
* @return true if has a MesssageHeader that contains an R01 or ORU_R01, otherwise false.
*/
fun Bundle.isElr(): Boolean {
val code = FhirPathUtils.evaluate(
null,
this,
this,
"Bundle.entry.resource.ofType(MessageHeader).event.code"
)
.filterIsInstance<CodeType>()
.firstOrNull()
?.code
return ((code == "R01") || (code == "ORU_R01"))
}

/**
* Return RSMessageType based on grouping logic.
*
* @return RSMessageType of this Bundle.
*/
fun Bundle.getRSMessageType(): RSMessageType {
return when {
isElr() -> RSMessageType.LAB_RESULT
else -> RSMessageType.UNKNOWN
}
}

/**
* Gets all properties for a [Base] resource recursively and filters only its references
*
Expand Down
Loading

0 comments on commit 608556b

Please sign in to comment.