From f99f5f8be62d1da0988a961771b3af5b7c2d3394 Mon Sep 17 00:00:00 2001 From: Jamie Albinson Date: Thu, 21 Nov 2024 16:20:32 -0500 Subject: [PATCH] PR feedback --- .../organizations/organizations.json | 3 + prime-router/src/main/kotlin/Sender.kt | 8 +- .../src/main/kotlin/azure/HttpUtilities.kt | 6 + .../src/main/kotlin/azure/ReportFunction.kt | 90 +++++------ .../translation/hl7/utils/HL7ACKUtils.kt | 15 +- .../test/kotlin/azure/ReportFunctionTests.kt | 146 +++++++++++++++--- .../translation/hl7/utils/HL7ACKUtilsTest.kt | 34 +++- 7 files changed, 232 insertions(+), 70 deletions(-) diff --git a/prime-router/metadata/json_schema/organizations/organizations.json b/prime-router/metadata/json_schema/organizations/organizations.json index 115efaa6ae0..8be19b987c1 100644 --- a/prime-router/metadata/json_schema/organizations/organizations.json +++ b/prime-router/metadata/json_schema/organizations/organizations.json @@ -213,6 +213,9 @@ "HL7_BATCH" ] }, + "hl7AcknowledgementEnabled": { + "type": "boolean" + }, "name": { "type": "string" }, diff --git a/prime-router/src/main/kotlin/Sender.kt b/prime-router/src/main/kotlin/Sender.kt index bec1d3c62ea..6f084889f93 100644 --- a/prime-router/src/main/kotlin/Sender.kt +++ b/prime-router/src/main/kotlin/Sender.kt @@ -31,6 +31,7 @@ import java.time.OffsetDateTime * @property allowDuplicates if false a duplicate submission will be rejected * @property senderType one of four broad sender categories * @property primarySubmissionMethod Sender preference for submission - manual or automatic + * @property hl7AcknowledgementEnabled should we return an HL7 ACK response if MSH.15 == "AL"? */ @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeInfo( @@ -59,6 +60,7 @@ abstract class Sender( val allowDuplicates: Boolean = true, val senderType: SenderType? = null, val primarySubmissionMethod: PrimarySubmissionMethod? = null, + val hl7AcknowledgementEnabled: Boolean = false, override var version: Int? = null, override var createdBy: String? = null, override var createdAt: OffsetDateTime? = null, @@ -194,6 +196,7 @@ class UniversalPipelineSender : Sender { allowDuplicates: Boolean = true, senderType: SenderType? = null, primarySubmissionMethod: PrimarySubmissionMethod? = null, + hl7AcknowledgementEnabled: Boolean = false, topic: Topic, ) : super( topic, @@ -205,7 +208,8 @@ class UniversalPipelineSender : Sender { processingType, allowDuplicates, senderType, - primarySubmissionMethod + primarySubmissionMethod, + hl7AcknowledgementEnabled ) constructor(copy: UniversalPipelineSender) : this( @@ -214,6 +218,7 @@ class UniversalPipelineSender : Sender { copy.format, copy.customerStatus, copy.schemaName, + hl7AcknowledgementEnabled = copy.hl7AcknowledgementEnabled, topic = copy.topic, ) @@ -355,6 +360,7 @@ class MonkeypoxSender : LegacyPipelineSender { allowDuplicates, senderType, primarySubmissionMethod + ) constructor(copy: MonkeypoxSender) : this( diff --git a/prime-router/src/main/kotlin/azure/HttpUtilities.kt b/prime-router/src/main/kotlin/azure/HttpUtilities.kt index a850ba02f59..67d5448fbaa 100644 --- a/prime-router/src/main/kotlin/azure/HttpUtilities.kt +++ b/prime-router/src/main/kotlin/azure/HttpUtilities.kt @@ -24,6 +24,7 @@ class HttpUtilities { companion object : Logging { const val jsonMediaType = "application/json" const val fhirMediaType = "application/fhir+json" + const val hl7V2MediaType = "application/hl7-v2" const val oldApi = "/api/reports" const val watersApi = "/api/waters" const val tokenApi = "/api/token" @@ -434,6 +435,11 @@ class HttpUtilities { return responseCode to response } } + + fun HttpStatus.isSuccessful(): Boolean { + val status = this.value() + return status in 200..299 + } } } diff --git a/prime-router/src/main/kotlin/azure/ReportFunction.kt b/prime-router/src/main/kotlin/azure/ReportFunction.kt index c1ee7a98d4f..1477e35be84 100644 --- a/prime-router/src/main/kotlin/azure/ReportFunction.kt +++ b/prime-router/src/main/kotlin/azure/ReportFunction.kt @@ -19,7 +19,6 @@ import com.microsoft.azure.functions.annotation.StorageAccount import gov.cdc.prime.router.ActionError import gov.cdc.prime.router.ActionLog import gov.cdc.prime.router.ActionLogLevel -import gov.cdc.prime.router.ActionLogger import gov.cdc.prime.router.CustomerStatus import gov.cdc.prime.router.InvalidParamMessage import gov.cdc.prime.router.InvalidReportMessage @@ -31,6 +30,7 @@ import gov.cdc.prime.router.Sender.ProcessingType import gov.cdc.prime.router.SubmissionReceiver import gov.cdc.prime.router.UniversalPipelineReceiver import gov.cdc.prime.router.azure.BlobAccess.Companion.getBlobContainer +import gov.cdc.prime.router.azure.HttpUtilities.Companion.isSuccessful import gov.cdc.prime.router.azure.db.enums.TaskAction import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService @@ -44,7 +44,7 @@ import gov.cdc.prime.router.common.Environment import gov.cdc.prime.router.common.JacksonMapperUtilities import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7ACKUtils import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder -import gov.cdc.prime.router.fhirengine.utils.HL7Reader +import gov.cdc.prime.router.history.DetailedSubmissionHistory import gov.cdc.prime.router.history.azure.SubmissionsFacade import gov.cdc.prime.router.tokens.AuthenticatedClaims import gov.cdc.prime.router.tokens.Scope @@ -75,10 +75,10 @@ class ReportFunction( ) : RequestFunction(workflowEngine), Logging { - enum class IngestionMethod { - SFTP, - REST, - } + enum class IngestionMethod { + SFTP, + REST, + } /** * POST a report to the router @@ -499,12 +499,6 @@ class ReportFunction( request: HttpRequestMessage, sender: Sender, ): HttpResponseMessage { - // check for ACK request - val maybeACKResponse = handleAckRequest(request) - if (maybeACKResponse != null) { - return maybeACKResponse - } - // determine if we should be following the sync or async workflow val isAsync = processingType(request, sender) == ProcessingType.async // allow duplicates 'override' param @@ -611,19 +605,7 @@ class ReportFunction( SubmissionsFacade.instance.findDetailedSubmissionHistory(txn, null, actionHistory.action) } - val response = request.createResponseBuilder(httpStatus) - .header(HttpHeaders.CONTENT_TYPE, "application/json") - .body( - JacksonMapperUtilities.allowUnknownsMapper - .writeValueAsString(submission) - ) - .header( - HttpHeaders.LOCATION, - request.uri.resolve( - "/api/waters/report/${submission?.reportId}/history" - ).toString() - ) - .build() + val response = buildResponse(request, httpStatus, submission, sender) // queue messages here after all task / action records are in actionHistory.queueMessages(workflowEngine) @@ -644,34 +626,56 @@ class ReportFunction( } } - private fun handleAckRequest(request: HttpRequestMessage): HttpResponseMessage? { - // why does Azure handle Headers case-sensitive??? + private fun buildResponse( + request: HttpRequestMessage, + responseStatus: HttpStatus, + submission: DetailedSubmissionHistory?, + sender: Sender, + ): HttpResponseMessage { + return handleAckRequest(request, responseStatus, sender) ?: run { + request.createResponseBuilder(responseStatus) + .header(HttpHeaders.CONTENT_TYPE, "application/json") + .body( + JacksonMapperUtilities.allowUnknownsMapper + .writeValueAsString(submission) + ) + .header( + HttpHeaders.LOCATION, + request.uri.resolve( + "/api/waters/report/${submission?.reportId}/history" + ).toString() + ) + .build() + } + } + + private fun handleAckRequest( + request: HttpRequestMessage, + responseStatus: HttpStatus, + sender: Sender, + ): HttpResponseMessage? { + // Azure handles all headers as lowercase val contentType = request.headers[HttpHeaders.CONTENT_TYPE.lowercase()] val requestBody = request.body - return if (contentType == "application/hl7-v2" && requestBody != null) { + return if ( + sender.hl7AcknowledgementEnabled && + responseStatus.isSuccessful() && + contentType == HttpUtilities.hl7V2MediaType && + requestBody != null + ) { try { - // Parse HL7 message - val maybeMessage = HL7Reader(ActionLogger()) - .getMessages(requestBody) - .firstOrNull() - - // Is the message an ACK? - if (maybeMessage != null && HL7Reader.isAckMessage(maybeMessage)) { + hl7ACKUtils.generateOutgoingACKMessageIfRequired(requestBody)?.let { responseBody -> logger.info("Creating HL7 ACK response") - request.createResponseBuilder(HttpStatus.OK) - .header(HttpHeaders.CONTENT_TYPE, "application/hl7-v2") - .body(hl7ACKUtils.generateOutgoingACKMessage(maybeMessage)) + request.createResponseBuilder(responseStatus) + .header(HttpHeaders.CONTENT_TYPE, HttpUtilities.hl7V2MediaType) + .body(responseBody) .build() - } else { - logger.trace("Not an HL7 ACK message. Continuing.") - null } } catch (ex: Exception) { - logger.warn("Error checking for HL7 ACK. Continuing normal pipeline execution.", ex) + logger.warn("Error checking for HL7 ACK.", ex) null } } else { - logger.trace("Not an HL7 ACK message. Continuing.") null } } diff --git a/prime-router/src/main/kotlin/fhirengine/translation/hl7/utils/HL7ACKUtils.kt b/prime-router/src/main/kotlin/fhirengine/translation/hl7/utils/HL7ACKUtils.kt index 29a5ac00b9d..16438c86d44 100644 --- a/prime-router/src/main/kotlin/fhirengine/translation/hl7/utils/HL7ACKUtils.kt +++ b/prime-router/src/main/kotlin/fhirengine/translation/hl7/utils/HL7ACKUtils.kt @@ -2,6 +2,7 @@ package gov.cdc.prime.router.fhirengine.translation.hl7.utils import ca.uhn.hl7v2.model.Message import ca.uhn.hl7v2.model.v251.message.ACK +import gov.cdc.prime.router.ActionLogger import gov.cdc.prime.router.common.Environment import gov.cdc.prime.router.fhirengine.utils.HL7Reader import java.time.Clock @@ -17,7 +18,19 @@ class HL7ACKUtils( private val clock: Clock = Clock.systemUTC(), ) { - fun generateOutgoingACKMessage(incomingACKMessage: Message): String { + fun generateOutgoingACKMessageIfRequired(rawHL7: String): String? { + val maybeMessage = HL7Reader(ActionLogger()) + .getMessages(rawHL7) + .firstOrNull() + + return if (maybeMessage != null && HL7Reader.isAckMessage(maybeMessage)) { + generateOutgoingACKMessage(maybeMessage) + } else { + null + } + } + + private fun generateOutgoingACKMessage(incomingACKMessage: Message): String { val outgoingAck = ACK() val ackMsh = outgoingAck.msh diff --git a/prime-router/src/test/kotlin/azure/ReportFunctionTests.kt b/prime-router/src/test/kotlin/azure/ReportFunctionTests.kt index 9cc00e643ea..e8ed38690c7 100644 --- a/prime-router/src/test/kotlin/azure/ReportFunctionTests.kt +++ b/prime-router/src/test/kotlin/azure/ReportFunctionTests.kt @@ -39,7 +39,9 @@ import gov.cdc.prime.router.UniversalPipelineReceiver import gov.cdc.prime.router.UniversalPipelineSender import gov.cdc.prime.router.azure.BlobAccess.BlobContainerMetadata import gov.cdc.prime.router.azure.db.enums.TaskAction +import gov.cdc.prime.router.azure.db.tables.pojos.Action import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile +import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService import gov.cdc.prime.router.cli.GetMultipleSettings import gov.cdc.prime.router.cli.PIIRemovalCommands import gov.cdc.prime.router.cli.ProcessFhirCommands @@ -50,8 +52,10 @@ import gov.cdc.prime.router.tokens.AuthenticatedClaims import gov.cdc.prime.router.tokens.AuthenticationType import gov.cdc.prime.router.unittest.UnitTestUtils import io.ktor.utils.io.core.toByteArray +import io.mockk.Runs import io.mockk.clearAllMocks import io.mockk.every +import io.mockk.just import io.mockk.mockk import io.mockk.mockkClass import io.mockk.mockkConstructor @@ -1163,7 +1167,8 @@ class ReportFunctionTests { true, Sender.SenderType.facility, Sender.PrimarySubmissionMethod.manual, - Topic.FULL_ELR + false, + Topic.FULL_ELR, ) val receiver = Receiver( "full-elr", @@ -1202,32 +1207,133 @@ class ReportFunctionTests { assert(receiverReturned!!.name == receiver.name) } - @Test - fun `return ack if requested`() { - val metadata = UnitTestUtils.simpleMetadata - val settings = FileSettings().loadOrganizations(oneOrganization) - val sender = CovidSender("Test Sender", "test", MimeFormat.CSV, schemaName = "one") + @Nested + inner class SyncAckTests { - val engine = makeEngine(metadata, settings) - val actionHistory = spyk(ActionHistory(TaskAction.receive)) - val reportFunc = ReportFunction(engine, actionHistory) + @BeforeEach + fun setUp() { + clearAllMocks() + } - every { engine.settings.findSender("Test Sender") } returns sender + private fun setupReportFunction(ackEnabled: Boolean): ReportFunction { + val mockEngine = mockk() + val mockActionHistory = mockk(relaxed = true) + val mockReportStreamEventService = mockk(relaxed = true) + val mockSettings = mockk() + val mockReceiver = mockk() + val mockAction = mockk() + val mockDb = mockk() + mockkObject(BlobAccess.Companion) + mockkObject(SubmissionReceiver.Companion) + + val sender = UniversalPipelineSender( + name = "Test Sender", + organizationName = "org", + format = MimeFormat.HL7, + hl7AcknowledgementEnabled = ackEnabled, + topic = Topic.FULL_ELR, + ) + val report = Report( + Schema(name = "one", topic = Topic.TEST, elements = listOf(Element("a"), Element("b"))), listOf(), + sources = listOf(ClientSource("myOrg", "myClient")), + metadata = UnitTestUtils.simpleMetadata + ) + val submission = DetailedSubmissionHistory( + 1, + TaskAction.receive, + OffsetDateTime.now(), + reports = mutableListOf(), + logs = emptyList() + ) + + every { mockEngine.settings } returns mockSettings + every { mockSettings.findSender(any()) } returns sender + every { BlobAccess.Companion.getBlobConnection(any()) } returns "testconnection" + every { SubmissionReceiver.getSubmissionReceiver(any(), any(), any()) } returns mockReceiver + every { + mockReceiver.validateAndMoveToProcessing( + any(), any(), any(), any(), any(), any(), any(), any(), any() + ) + } returns report + every { mockEngine.recordAction(any()) } just Runs + every { mockActionHistory.action } returns mockAction + every { mockAction.actionId } returns 5 + every { mockEngine.db } returns mockDb + // I don't agree with ktlint on this one + every { + mockDb.transactReturning( + any< + ( + DataAccessTransaction, + ) -> DetailedSubmissionHistory? + >() + ) + } returns submission + + return ReportFunction(mockEngine, mockActionHistory, mockReportStreamEventService) + } + + @Test + fun `return ack if requested and enabled`() { + val reportFunction = setupReportFunction(ackEnabled = true) - val body = """ + val body = """ MSH|^~\&|Epic|Hospital|LIMS|StatePHL|20241003000000||ORM^O01^ORM_O01|4AFA57FE-D41D-4631-9500-286AAAF797E4|T|2.5.1|||AL|NE """.trimIndent() - val req = MockHttpRequestMessage(body) - req.httpHeaders += mapOf( - "client" to "Test Sender", - "content-length" to body.length.toString(), - "content-type" to "application/hl7-v2" - ) + val req = MockHttpRequestMessage(body) + req.httpHeaders += mapOf( + "client" to "Test Sender", + "content-length" to body.length.toString(), + "content-type" to "application/hl7-v2" + ) + + val response = reportFunction.run(req) + + assertThat(response.status.value()).isEqualTo(201) + assertThat(response.getHeader("Content-Type")).isEqualTo("application/hl7-v2") + } + + @Test + fun `do not return ack if requested and disabled`() { + val reportFunction = setupReportFunction(ackEnabled = false) + + val body = """ + MSH|^~\&|Epic|Hospital|LIMS|StatePHL|20241003000000||ORM^O01^ORM_O01|4AFA57FE-D41D-4631-9500-286AAAF797E4|T|2.5.1|||AL|NE + """.trimIndent() - val response = reportFunc.run(req) + val req = MockHttpRequestMessage(body) + req.httpHeaders += mapOf( + "client" to "Test Sender", + "content-length" to body.length.toString(), + "content-type" to "application/hl7-v2" + ) + + val response = reportFunction.run(req) + + assertThat(response.status.value()).isEqualTo(201) + assertThat(response.getHeader("Content-Type")).isEqualTo("application/json") + } + + @Test + fun `do not return ack if not request and enabled`() { + val reportFunction = setupReportFunction(ackEnabled = true) - assertThat(response.status.value()).isEqualTo(200) - assertThat(response.getHeader("Content-Type")).isEqualTo("application/hl7-v2") + val body = """ + MSH|^~\&|Epic|Hospital|LIMS|StatePHL|20241003000000||ORM^O01^ORM_O01|4AFA57FE-D41D-4631-9500-286AAAF797E4|T|2.5.1|||NE|NE + """.trimIndent() + + val req = MockHttpRequestMessage(body) + req.httpHeaders += mapOf( + "client" to "Test Sender", + "content-length" to body.length.toString(), + "content-type" to "application/hl7-v2" + ) + + val response = reportFunction.run(req) + + assertThat(response.status.value()).isEqualTo(201) + assertThat(response.getHeader("Content-Type")).isEqualTo("application/json") + } } } \ No newline at end of file diff --git a/prime-router/src/test/kotlin/fhirengine/translation/hl7/utils/HL7ACKUtilsTest.kt b/prime-router/src/test/kotlin/fhirengine/translation/hl7/utils/HL7ACKUtilsTest.kt index 86d94c5f19c..d85b551c26e 100644 --- a/prime-router/src/test/kotlin/fhirengine/translation/hl7/utils/HL7ACKUtilsTest.kt +++ b/prime-router/src/test/kotlin/fhirengine/translation/hl7/utils/HL7ACKUtilsTest.kt @@ -2,6 +2,7 @@ package gov.cdc.prime.router.fhirengine.translation.hl7.utils import assertk.assertThat import assertk.assertions.isEqualTo +import assertk.assertions.isNull import gov.cdc.prime.router.ActionLogger import gov.cdc.prime.router.cli.helpers.HL7DiffHelper import gov.cdc.prime.router.fhirengine.utils.HL7Reader @@ -35,19 +36,18 @@ class HL7ACKUtilsTest { } @Test - fun `generates ACK response`() { + fun `generates properly formatted ACK response when required`() { val f = Fixture() val id = UUID.randomUUID() mockkStatic(UUID::class) every { UUID.randomUUID() } returns id - val incomingAckMessage = """ + val incomingMessage = """ MSH|^~\&|Epic|Hospital|LIMS|StatePHL|20241003000000||ORM^O01^ORM_O01|4AFA57FE-D41D-4631-9500-286AAAF797E4|T|2.5.1|||AL|NE """.trimIndent() - val message = f.hl7Reader.getMessages(incomingAckMessage).first() - val ack = f.utils.generateOutgoingACKMessage(message) + val ack = f.utils.generateOutgoingACKMessageIfRequired(incomingMessage) val expected = f.hl7Reader.getMessages( """ @@ -55,7 +55,7 @@ class HL7ACKUtilsTest { MSA|CA|4AFA57FE-D41D-4631-9500-286AAAF797E4 """ ).first() - val actual = f.hl7Reader.getMessages(ack).first() + val actual = f.hl7Reader.getMessages(ack!!).first() val diffs = f.hl7DiffHelper.diffHl7(expected, actual) if (diffs.isNotEmpty()) { @@ -63,4 +63,28 @@ class HL7ACKUtilsTest { } assertThat(diffs.size).isEqualTo(0) } + + @Test + fun `return null if not required`() { + val f = Fixture() + + val incomingMessage = """ + MSH|^~\&|Epic|Hospital|LIMS|StatePHL|20241003000000||ORM^O01^ORM_O01|4AFA57FE-D41D-4631-9500-286AAAF797E4|T|2.5.1|||NE|NE + """.trimIndent() + + val ack = f.utils.generateOutgoingACKMessageIfRequired(incomingMessage) + + assertThat(ack).isNull() + } + + @Test + fun `invalid HL7 will return null`() { + val f = Fixture() + + val incomingAckMessage = "I'm bad HL7" + + val ack = f.utils.generateOutgoingACKMessageIfRequired(incomingAckMessage) + + assertThat(ack).isNull() + } } \ No newline at end of file