Skip to content

Commit

Permalink
[WIP] Changed HL7Reader.getMessages to use the same logic as FHIRConv…
Browse files Browse the repository at this point in the history
…erter.getBundles which calls HL7Reader.parseHL7Message but still sorting through the tests
  • Loading branch information
adegolier committed Dec 6, 2024
1 parent e564319 commit 2dc6a1e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 154 deletions.
76 changes: 38 additions & 38 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.FhirPathUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.fhirengine.utils.HL7Reader.Companion.maybeParallelize
import gov.cdc.prime.router.fhirengine.utils.HL7Reader.Companion.parseHL7Message
import gov.cdc.prime.router.fhirengine.utils.getObservations
import gov.cdc.prime.router.logging.LogMeasuredTime
Expand All @@ -62,7 +63,6 @@ import org.jooq.Field
import java.time.OffsetDateTime
import java.util.UUID
import java.util.stream.Collectors
import java.util.stream.Stream

/**
* Process a message off of the raw-elr azure queue, convert it into FHIR, and store for next step.
Expand Down Expand Up @@ -387,7 +387,7 @@ class FHIRConverter(
)
}

FHIREngineRunResult(
FHIREngineRunResult(
routeEvent,
report,
blobInfo.blobUrl,
Expand Down Expand Up @@ -417,7 +417,7 @@ class FHIRConverter(
report,
TaskAction.convert,
"Submitted report was either empty or could not be parsed into HL7"
) {
) {
parentReportId(input.reportId)
params(
mapOf(
Expand Down Expand Up @@ -517,11 +517,11 @@ class FHIRConverter(
} else {
logger.warn(
result.failures.map {
UnmappableConditionMessage(
it.failures.map { it.code },
it.source
)
}
UnmappableConditionMessage(
it.failures.map { it.code },
it.source
)
}
)
}
}
Expand Down Expand Up @@ -689,30 +689,30 @@ class FHIRConverter(
)
}

/**
* Returns a parallelized stream when the number of items being processed is greater
* than the [sequentialLimit]
*
* @param streamSize the number of items in the stream
* @param stream the stream to optionally parallelize
*
*/
private fun <ProcessedItemType> maybeParallelize(
streamSize: Int,
stream: Stream<ProcessedItemType>,
message: String,
): Stream<ProcessedItemType> =
if (streamSize > sequentialLimit) {
withLoggingContext(mapOf("numberOfItems" to streamSize.toString())) {
logger.info("$message parallel")
}
stream.parallel()
} else {
withLoggingContext(mapOf("numberOfItems" to streamSize.toString())) {
logger.info("$message serial")
}
stream
}
// /**
// * Returns a parallelized stream when the number of items being processed is greater
// * than the [sequentialLimit]
// *
// * @param streamSize the number of items in the stream
// * @param stream the stream to optionally parallelize
// *
// */
// private fun <ProcessedItemType> maybeParallelize(
// streamSize: Int,
// stream: Stream<ProcessedItemType>,
// message: String,
// ): Stream<ProcessedItemType> =
// if (streamSize > sequentialLimit) {
// withLoggingContext(mapOf("numberOfItems" to streamSize.toString())) {
// logger.info("$message parallel")
// }
// stream.parallel()
// } else {
// withLoggingContext(mapOf("numberOfItems" to streamSize.toString())) {
// logger.info("$message serial")
// }
// stream
// }

/**
* Action log detail for tracking an error while processing an HL7 or FHIR item
Expand Down Expand Up @@ -748,13 +748,13 @@ class FHIRConverter(
* transformer in tests.
*/
fun getTransformerFromSchema(schemaName: String): FhirTransformer? = if (schemaName.isNotBlank()) {
withLoggingContext(mapOf("schemaName" to schemaName)) {
logger.info("Apply a sender transform to the items in the report")
}
FhirTransformer(schemaName)
} else {
null
withLoggingContext(mapOf("schemaName" to schemaName)) {
logger.info("Apply a sender transform to the items in the report")
}
FhirTransformer(schemaName)
} else {
null
}
}

/**
Expand Down
174 changes: 58 additions & 116 deletions prime-router/src/main/kotlin/fhirengine/utils/HL7Reader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,30 @@ import ca.uhn.hl7v2.DefaultHapiContext
import ca.uhn.hl7v2.ErrorCode
import ca.uhn.hl7v2.HL7Exception
import ca.uhn.hl7v2.HapiContext
import ca.uhn.hl7v2.model.AbstractMessage
import ca.uhn.hl7v2.model.Message
import ca.uhn.hl7v2.parser.ParserConfiguration
import ca.uhn.hl7v2.util.Hl7InputStreamMessageIterator
import ca.uhn.hl7v2.util.Hl7InputStreamMessageStringIterator
import ca.uhn.hl7v2.util.Terser
import ca.uhn.hl7v2.validation.ValidationException
import ca.uhn.hl7v2.validation.impl.ValidationContextFactory
import fhirengine.engine.ProcessedHL7Item
import fhirengine.translation.hl7.structures.fhirinventory.message.OML_O21
import fhirengine.translation.hl7.structures.fhirinventory.message.ORM_O01
import fhirengine.translation.hl7.structures.fhirinventory.message.ORU_R01
import fhirengine.utils.ReportStreamCanonicalModelClassFactory
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.InvalidReportMessage
import gov.cdc.prime.router.common.BaseEngine.Companion.sequentialLimit
import gov.cdc.prime.router.fhirengine.engine.FHIRConverter.InvalidItemActionLogDetail
import io.github.oshai.kotlinlogging.withLoggingContext
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.kotlin.Logging
import java.util.Date
import ca.uhn.hl7v2.model.v251.message.ORU_R01 as v251_ORU_R01
import java.util.stream.Collectors
import java.util.stream.Stream
import ca.uhn.hl7v2.model.v251.segment.MSH as v251_MSH
import ca.uhn.hl7v2.model.v27.message.ORU_R01 as v27_ORU_R01
import ca.uhn.hl7v2.model.v27.segment.MSH as v27_MSH
import fhirengine.translation.hl7.structures.nistelr251.message.ORU_R01 as NIST_ELR_ORU_R01
import fhirengine.translation.hl7.structures.nistelr251.segment.MSH as NIST_MSH

private const val MSH_SEGMENT_NAME = "MSH"
Expand Down Expand Up @@ -65,96 +66,38 @@ class HL7Reader(private val actionLogger: ActionLogger) : Logging {
* @return one or more HL7 messages
* @throws IllegalArgumentException if the raw data cannot be parsed or no messages were read
*/
fun getMessages(rawMessage: String): List<Message> {
val messageModelsToTry = getMessageModelClasses(rawMessage)
val messages: MutableList<Message> = mutableListOf()
if (rawMessage.isBlank()) {
fun getMessages(rawReport: String): List<Message> {
if (rawReport.isBlank()) {
actionLogger.error(InvalidReportMessage("Provided raw data is empty."))
} else if (messageModelsToTry.isEmpty()) {
try {
val iterator = Hl7InputStreamMessageIterator(rawMessage.byteInputStream())
while (iterator.hasNext()) {
messages.add(iterator.next())
}
} catch (e: Hl7InputStreamMessageStringIterator.ParseFailureError) {
logHL7ParseFailure(e)
}
return emptyList()
} else {
val validationContext = ValidationContextFactory.noValidation()
val parseError = mutableListOf<Hl7InputStreamMessageStringIterator.ParseFailureError>()
run modelLoop@{
messageModelsToTry.forEach { model ->
val context = DefaultHapiContext(ReportStreamCanonicalModelClassFactory(model))
context.validationContext = validationContext
val itemStream =
Hl7InputStreamMessageStringIterator(rawReport.byteInputStream()).asSequence()
.mapIndexed { index, rawItem ->
ProcessedHL7Item(rawItem, index)
}.toList()

val messages = maybeParallelize(itemStream.size, itemStream.stream(), "Getting parsed HL7")
.map { item ->
try {
val iterator = Hl7InputStreamMessageIterator(rawMessage.byteInputStream(), context)
while (iterator.hasNext()) {
messages.add(iterator.next())
}
} catch (e: Hl7InputStreamMessageStringIterator.ParseFailureError) {
messages.clear()
parseError.add(e)
}

if (messages.isNotEmpty()) {
// Don't try other message models if we were able to parse
return@modelLoop
}
}
}

// if it was able to parse the message through one of the models, then we do not want to log it as an error
val parseLogLevel = if (parseError.size == messageModelsToTry.size) Level.ERROR else Level.WARN
parseError.forEach { currentError ->
logHL7ParseFailure(currentError, messages.isEmpty(), parseLogLevel)
}
}

if (messages.isEmpty() && !actionLogger.hasErrors()) {
actionLogger.error(InvalidReportMessage("Unable to find HL7 messages in provided data."))
}

return messages
}

/**
* Extracts the message type from the MSH segment and returns the list of message models to use to
* try to parse the messages.
*
* This function assumes all the message types will be the same if this is a HL7 batch.
*/
private fun getMessageModelClasses(rawMessage: String): List<Class<out AbstractMessage>> {
try {
val messageProfile = getMessageProfile(rawMessage)
if (messageProfile != null) {
when (messageProfile.typeID) {
"ORU" -> {
return when (messageProfile.profileID) {
// TODO: NIST ELR conformance profile to be enabled in a future PR (rename to "NIST_ELR")
"NIST_ELR_TEST" -> listOf(
NIST_ELR_ORU_R01::class.java
val message = parseHL7Message(item.rawItem, null)
item.updateParsed(message)
} catch (e: HL7Exception) {
recordError(e)
item.updateParsed(
InvalidItemActionLogDetail(
gov.cdc.prime.router.ErrorCode.INVALID_MSG_PARSE,
item.index,
"exception while parsing HL7: ${ExceptionUtils.getRootCause(e).message}",
)
else -> listOf(
v27_ORU_R01::class.java,
v251_ORU_R01::class.java
)
}
}
else -> {
logger.warn(
"${messageProfile.typeID} did not have any mapped message model classes, " +
"using default behavior"
)
return emptyList()
}
}
}.map { processedItem -> processedItem.parsedItem }.collect(Collectors.toList()).filterNotNull()
if (messages.isEmpty() && !actionLogger.hasErrors()) {
actionLogger.error(InvalidReportMessage("Unable to find HL7 messages in provided data."))
}
} catch (ex: Hl7InputStreamMessageStringIterator.ParseFailureError) {
logHL7ParseFailure(ex)
return emptyList()
return messages
}
actionLogger.error(InvalidReportMessage("String did not contain any HL7 messages"))
return emptyList()
}

/**
Expand All @@ -166,29 +109,7 @@ class HL7Reader(private val actionLogger: ActionLogger) : Logging {
return rawMessage.startsWith("FHS") || numMessages > 1
}

/**
* Takes an [exception] thrown by the HL7 HAPI library, gets the root cause and logs the error into [actionLogger].
* Sample error messages returned by the HAPI library are:
* Error Code = DATA_TYPE_ERROR-102: 'test' in record 3 is invalid for version 2.5.1
* Error Code = REQUIRED_FIELD_MISSING-101: Can't find version ID - MSH.12 is null
* This functions only logs messages that contain meaningful data.
*
*/
private fun logHL7ParseFailure(
exception: Hl7InputStreamMessageStringIterator.ParseFailureError,
isError: Boolean = true,
logLevel: Level = Level.ERROR,
) {
logger.log(logLevel, "Failed to parse message: ${exception.message}")

// Get the exception root cause and log it accordingly
when (val rootCause = ExceptionUtils.getRootCause(exception)) {
is AbstractHL7Exception -> recordError(rootCause, isError)
else -> throw rootCause
}
}

private fun recordError(exception: AbstractHL7Exception, isError: Boolean) {
private fun recordError(exception: AbstractHL7Exception) {
val errorMessage: String = when (exception) {
is ValidationException -> "Validation Failed: ${exception.message}"

Expand All @@ -202,14 +123,10 @@ class HL7Reader(private val actionLogger: ActionLogger) : Logging {

else -> "Failed to parse message"
}
if (isError) {
actionLogger.error(InvalidReportMessage(errorMessage))
} else {
actionLogger.warn(InvalidReportMessage(errorMessage))
}
actionLogger.error(InvalidReportMessage(errorMessage))
}

companion object {
companion object : Logging {

// This regex is used to replace \n with \r while not replacing \r\n
val newLineRegex = Regex("(?<!\r)\n")
Expand Down Expand Up @@ -480,5 +397,30 @@ class HL7Reader(private val actionLogger: ActionLogger) : Logging {
else -> null
}
}

/**
* Returns a parallelized stream when the number of items being processed is greater
* than the [sequentialLimit]
*
* @param streamSize the number of items in the stream
* @param stream the stream to optionally parallelize
*
*/
fun <ProcessedItemType> maybeParallelize(
streamSize: Int,
stream: Stream<ProcessedItemType>,
message: String,
): Stream<ProcessedItemType> =
if (streamSize > sequentialLimit) {
withLoggingContext(mapOf("numberOfItems" to streamSize.toString())) {
logger.info("$message parallel")
}
stream.parallel()
} else {
withLoggingContext(mapOf("numberOfItems" to streamSize.toString())) {
logger.info("$message serial")
}
stream
}
}
}

0 comments on commit 2dc6a1e

Please sign in to comment.